程序员社区

Spark的数据本地化和延迟调度策略

一.概述

Spark数据本地化即计算向数据移动,但数据块所在的Executor不一定有足够的的计算资源提供,为了让task能尽可能的以最优本地化级别(Locality Levels)来启动,Spark的延迟调度应运而生,资源不够可在该Locality Levels对应的限制时间内重试,超过限制时间后还无法启动则降低Locality Levels再尝试启动。

二.本地化级别(Locality Levels)

Spark目前支持以下几种本地化级别:

  • 1.PROCESS_LOCAL:进程本地化,表示 task 要计算的数据在同一个 Executor 中。
  • 2.NODE_LOCAL: 节点本地化,速度稍慢,因为数据需要在不同的进程之间传递或从文件中读取。分为两种情况,第一种:task 要计算的数据是在同一个 worker 的不同 Executor 进程中。第二种:task 要计算的数据是在同一个 worker 的磁盘上,或在 HDFS 上恰好有 block 在同一个节点上。如果 Spark 要计算的数据来源于 HDFS 上,那么最好的本地化级别就是 NODE_LOCAL。
  • 3.NO_PREF: 没有最佳位置,数据从哪访问都一样快,不需要位置优先。比如 Spark SQL 从 Mysql 中读取数据。
  • 4.RACK_LOCAL:机架本地化,数据在同一机架的不同节点上。需要通过网络传输数据以及文件 IO,比 NODE_LOCAL 慢。
  • 5.ANY:跨机架,数据在非同一机架的网络上,速度最慢。

三.Spark 的数据本地化由谁来负责

DAGScheduler 切割Job,划分Stage, 通过调用 submitStage 来提交一个Stage 对应的 Tasks,submitStage 会调用 submitMissingTasks, submitMissingTasks 确定每个需要计算的 task 的preferredLocations,通过调用 getPreferrdeLocations方法得到 partition 的优先位置,就是这个 partition 对应的 task 的优先位置,对于要提交到 TaskScheduler 的 TaskSet 中的每一个Task ,该 Task 优先位置与其对应的 partition 对应的优先位置一致。

TaskScheduler 接收到了 TaskSet 后,TaskScheduler会为每个TaskSet创建一个TaskSetMagager来对其Task进行管理,TaskSetMagager中包含TaskSet 所有 Task,并管理这些 Task 的执行,在初始化TaskSetMagager的时候就会通过computeValidLocalityLevels计算该TaskSet包含的Locality Levels,以便在调度和延迟调度 tasks 时发挥作用。

总的来说,Spark 中的数据本地化是由 DAGScheduler 和 TaskScheduler 共同负责的。

四.Spark是如何进行调度

Locality Levels表示了计算节点与输入数据位置的关系,下面以一个图来展开 Spark 是如何进行调度的。这一个过程会涉及 RDD, DAGScheduler , TaskScheduler。

Spark的数据本地化和延迟调度策略插图

1.PROCESS_LOCAL
TaskScheduler 根据数据的位置向数据节点发送 Task 任务。如果这个任务在 worker1 的 Executor 中等待了 3 秒。(默认的,可以通过spark.locality.wait 来设置),可以通过 SparkConf() 来修改,重试了 5 次之后,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 PROCESS_LOCAL 降到 NODE_LOCAL。

2.NODE_LOCAL
TaskScheduler 重新发送 task 到 worker1 中的 Executor2 中执行,如果 Task 在worker1 的 Executor2 中等待了 3 秒,重试了 5 次,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 NODE_LOCAL 降到 RACK_LOCAL。

3.RACK_LOCAL
TaskScheduler重新发送 Task 到 worker2 中的 Executor1 中执行。

4.获取数据执行
当 Task 分配完成之后,Task 会通过所在的 worker 的 Executor 中的 BlockManager 来获取数据。如果 BlockManager 发现自己没有数据,那么它会调用 getRemoteValues 方法,通过 BlockManagerSlaveEndpoint与Driver所在节点的BlockManagerMaster中的BlockManagerMasterEndpoint先建立连接,获取数据所在的BlockManager的地址,然后通过BlockTransferService(网络传输组件)获取数据,通过网络传输回Task所在节点(这时候性能大幅下降,大量的网络IO占用资源),之后就开始计算流程。

四.优化建议

TaskScheduler在发送 Task 的时候,会根据数据所在的节点发送 Task ,这时候的数据本地化的级别是最高的,如果这个 Task 在这个Executor中等待了3秒,重试发射了5次还是依然无法执行,那么TaskScheduler就会认为这个Executor的计算资源满了,TaskScheduler会降低 1 级数据本地化的级别,重新发送 Task 到其他的Executor中执行,如果还是依然无法执行,那么继续降低数据本地化的级别...

如果想让每一个 Task 都能拿到最好的数据本地化级别,那么调优点就是等待时间加长。注意!如果过度调大等待时间,虽然为每一个 Task 都拿到了最好的数据本地化级别,但是我们 Job 执行的时间也会随之延长。

属性名称 默认值 含义
spark.locality.wait 3000 以下几个参数是关于Spark数据本地性的。本参数是以毫秒为单位启动本地数据task的等待时间,如果超出就启动下一本地优先级别的task。该设置同样可以应用到各优先级别的本地性之间(本地进程 -> 本地节点 -> 本地机架 -> 任意节点 ),当然,也可以通过spark.locality.wait.node等参数设置不同优先级别的本地性
spark.locality.wait.process spark.locality.wait 本地进程级别的本地等待时间
spark.locality.wait.node spark.locality.wait 本地节点级别的本地等待时间
spark.locality.wait.rack spark.locality.wait 本地机架级别的本地等待时间

可以在代码里面这样设置:

new SparkConf.set("spark.locality.wait","1000")

五.参考资料

1.Spark 的 数据本地化,提供最佳的计算节点,终于入门了
2.Spark性能调优篇六之调节数据本地化等待时长

赞(0) 打赏
未经允许不得转载:IDEA激活码 » Spark的数据本地化和延迟调度策略

相关推荐

  • 暂无文章

一个分享Java & Python知识的社区