程序员社区

Spark中BlockManager的存储原理

一.整体架构

Spark的存储介质包括磁盘和内存。

Spark的存储采用了主从模型,存储模块使用了基于Netty的RPC消息通信方式。BlockManagerMaster负责整个应用程序运行期间的数据块的元数据管理和维护。BlockManager(Slave)负责将本地数据块的状态信息上报给BlockManagerMaster,同时接受从BlockManagerMaster传过来的执行命令,如获取数据块状态,删除数据块等命令。每个BlockManager中都存在数据传输通道,根据需要进行远程数据的读取和写入。

在应用程序启动期间,SparkContext会创建Driver端的SparkEnv,在该SparkEnv中实例化BlockManagerMaster,在BlockManagerMaster内部创建消息通信的端点BlockManagerMasterEndpoint。

同理,在Executor启动时,也会创建SparkEnv,在该SparkEnv中实例化BlockManager和负责网络数据传输服务的BlockTransferService。BlockManager初始化过程中,会加入BlockManagerMasterEndpoint端点的引用,同时也会创建BlockManagerSlaveEndpoint端点,并把该端点的引用注册到Driver中,这样Driver和Executor互相持有通信端点的引用,可以在应用程序执行过程中进行消息通信。BlockTransferService中使用了基于Netty的数据传输方式,该传输方式隐藏了集群间不同节点的消息传输操作,可以类似于本地数据操作方式进行数据读写,大大简化了网络间数据传输的复杂程度。

Spark存储整体架构图如下:

Spark中BlockManager的存储原理插图
BlockManager存储系统架构

1.数据块位置信息的获取
应用程序在完成数据存储后,后续的task在获取远程节点数据,获取rdd执行的首选位置等操作需要根据数据块的编号查询出数据块所处的位置,此时发送getLocation或者getLocationMultipleBlockIds等消息给BlockManagerMasterEndpoint端点,通过对元数据的查询获取数据块的位置信息。

2.数据块的删除
Spark提供删除RDD,数据块和广播变量的方式,当数据需要删除时,提交删除。当数据需要删除时,提交删除消息给BlockManagerSlaveEndpoint端点,在该端点对应发起删除操作,删除操作一方面需要先通知Driver端删除数据块的元数据信息,另一方面需要发送消息通知Executor,删除对应的物理数据。这里需要注意这个先后顺序,以免物理数据删除完毕之后,还存在元数据未删除。

Spark中BlockManager的存储原理插图1
Block数据的删除

3.数据块的读取
Executor的BlockManager接受到读取数据的请求时候,根据数据块所在的节点是否在本地,调用BlockManager的不同方法去处理,如果在本地则调用MemoryStore或者DiskStore中的取方法,进行读取,如果在远程,则调用BlockTransferService的服务去远程节点上获取数据。

Spark中BlockManager的存储原理插图2
Block数据的读取

4.数据块的写入
当Executor的BlockManager接受到写数据的请求时,如果不需要创建副本,则调用BlockStore的接口方法去进行处理,根据数据写入的存储类型,决定调用对应的写入方法。

Spark中BlockManager的存储原理插图3
Block数据的写入

二.存储级别

Spark的内部维护的存储,还支持以内存或者磁盘存储的方式存储RDD的数据集。通过调用RDD的persist或者cache来完成对应的操作。

在RDD第一次被计算时,persist方法会根据StorageLevel的参数值来采用不同的缓存(持久化)策略。当RDD的原本存储级别为None或者新传递进来的存储级别值与原来的存储级别相等时才进行操作。

persist操作是控制操作的一种,它只是改变了RDD的元数据信息,并没由真正的进行数据的存储操作,真正进行的是RDD的iterator方法,对应cache来说,他是persist的一个特例,即persist中的StorageLevel的值为MEMERY-ONLY的情形。

在StorageLevel类中,根据useDisk,useMemory,useOffHeap,deserialized,replication这5个参数的组合,Spark提供了12种存储级别的持久化策略,可以将RDD持久化到内存,磁盘和外部存储系统,或者以序列化的方式持久化到内存,还能够在集群的不同节点之间存储多个副本。

Spark存储级别如下所示

Storage Level 描述
MEMORY_ONLY 默认选项,RDD的(分区)数据直接以Java对象的形式存储于JVM的内存中,如果内存空间不足,某些分区的数据将不会被缓存,需要在使用的时候重新计算。
MEMORY_AND_DISK RDD的数据直接以Java对象的形式存储于JVM的内存中,如果内存空间不中,某些分区的数据会被存储至磁盘,使用的时候从磁盘读取。
MEMORY_ONLY_SER RDD的数据(Java对象)序列化之后存储于JVM的内存中(一个分区的数据为内存中的一个字节数组),相比于MEMORY_ONLY能够有效节约内存空间(特别是使用一个快速序列化工具的情况下),但读取数据时需要更多的CPU开销;如果内存空间不足,处理方式与MEMORY_ONLY相同。
MEMORY_AND_DISK_SER 相比于MEMORY_ONLY_SER,在内存空间不足的情况下,将序列化之后的数据存储于磁盘。
DISK_ONLY 仅仅使用磁盘存储RDD的数据(未经序列化)。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 以MEMORY_ONLY_2为例,MEMORY_ONLY_2相比于MEMORY_ONLY存储数据的方式是相同的,不同的是会将数据备份到集群中两个不同的节点,其余情况类似。
OFF_HEAP (experimental) 与MEMORY_ONLY_SER类似,但是存储在非堆的内存中,需要开启非堆内存。

这里需要注意,即使没有使用persisit,Spark在进行shuffle的操作的时候也会自动的持久化某些中间数据。这样可以避免在有节点丢失的情况下,重复计算input。对于 resulting RDD如果需要重复使用的话, 强烈建议使用persist进行持久化。

如何选择存储级别?

默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。

如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

赞(0) 打赏
未经允许不得转载:IDEA激活码 » Spark中BlockManager的存储原理

相关推荐

  • 暂无文章

一个分享Java & Python知识的社区