程序员社区

Spark中CartesianRDD依赖关系的特殊之处

前言

RDD之间的依赖关系一般分为两种,宽依赖和窄依赖。

Spark中CartesianRDD依赖关系的特殊之处插图
窄依赖和宽依赖

在网上好多博客中是这样描述宽依赖和窄依赖的特点

  • 窄依赖每个父RDD的一个Partition最多被子RDD的一个Partition所使用。如map,filter,union操作都会产生窄依赖。
  • 宽依赖一个父RDD的Partition会被多个子RDD的Partition所使用。如groupByKey,reduceByKey,sortByKey等操作都会产生宽依赖。宽依赖会产生Shuffle操作。

上面的描述在大多数情况下确实没错,但是对于Spark中的cartesian笛卡尔集的操作来说,好像不太符合上面的特点描述信息。

分析

首先,我们知道Cartesian 对两个 RDD 做笛卡尔集,笛卡尔集的特点是一个数据集中的每一条元素都和另一个数据集中的所有元素做匹配生成一个大的数据集。

在Spark中两个RDD经过Cartesian 转换操作生成的 CartesianRDD 中分区的数量为
partitionNum(RDD 3) = partitionNum(RDD 1) * partitionNum(RDD 2)

Spark中CartesianRDD依赖关系的特殊之处插图1

如上图所示,数据集RDD1和RDD2做笛卡尔集操作,生成一个RDD3,这样的过程根据一开始的宽窄依赖划分,你肯定会认为这个是一个宽依赖,存在Shuffle的过程。

其实不是上面的过程在并没有Shuffle的过程,而且这个过程完完全全是一个窄依赖,我们可以看看源码来分析。

/**
   * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
   * elements (a, b) where a is in `this` and b is in `other`.
   */
  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
    new CartesianRDD(sc, this, other)
  }

首先cartesian算子会创建一个CartesianRDD实例返回

接下来我们看看CartesianRDD是如何处理依赖关系的,首先看看构造

class CartesianRDD[T: ClassTag, U: ClassTag](
    sc: SparkContext,
    var rdd1 : RDD[T],
    var rdd2 : RDD[U])
  extends RDD[(T, U)](sc, Nil)

可以看到这里传入了一个Nil,也就是说父类RDD中的deps为Nil,而CartesianRDD类中重写了父类RDD的getDependencies函数。

  //重写的getPartitions函数
  override def getPartitions: Array[Partition] = {
    // create the cross product split
    val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
    for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
      val idx = s1.index * numPartitionsInRdd2 + s2.index
      array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
    }
    array
  }
  //重写的getDependencies函数
  override def getDependencies: Seq[Dependency[_]] = List(
    new NarrowDependency(rdd1) {
      def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
    },
    new NarrowDependency(rdd2) {
      def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
    }
  )

首先,我们可以看到重写的getPartitions函数则是一个笛卡尔集的过程,可以看到这里其实就是个双重for循环遍历了父RDD的Partition生成新的CartesianPartition。

重写的getDependencies函数返回的Dependency列表中是自己匿名实现NarrowDependency的两个实例对象,这个匿名实例中重写了NarrowDependency中定义的抽象函数getParents。

在getParents中我们看到他的具体实现其实就是根据当前CartesianRDD中Partition的Id来确定依赖于上一级RDD哪两个Partition。一个取除数,一个取余数,很巧妙的计算。

而且这里并不是ShuffleDependency,是不是很特别。

因此,原本我们认为的会是个Shuffle的过程,在源码中却并没有引入昂贵的Shuffle,而是一个窄依赖。

Spark中CartesianRDD依赖关系的特殊之处插图1

那么回到刚才的图中,可以看到一个父RDD的Partition被子RDD的多个Partition使用,但是子RDD的一个Partition至多依赖父RDD的两个Partition,因此这个现象已经推翻了我们大多数情况下对窄依赖和宽依赖的描述。

所以我们现在总结一下,宽依赖和窄依赖的描述

  • 窄依赖指的是子RDD依赖于父RDD中固定的Partitions
  • 宽依赖指的是子RDD对父RDD中的所有partition都有依赖,或者说依赖于父RDD的数量不能明确

当然,本篇文章只是我对这一点疑惑的探究,如果有另外的解答,还请多多指教。

赞(0) 打赏
未经允许不得转载:IDEA激活码 » Spark中CartesianRDD依赖关系的特殊之处

相关推荐

  • 暂无文章

一个分享Java & Python知识的社区