成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

RDD血緣關(guān)系源碼詳解!

一、RDD的依賴關(guān)系

RDD的依賴關(guān)系分為兩類:寬依賴和窄依賴。我們可以這樣認(rèn)為:

創(chuàng)新互聯(lián)公司專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于網(wǎng)站制作、成都網(wǎng)站建設(shè)、武侯網(wǎng)絡(luò)推廣、小程序開發(fā)、武侯網(wǎng)絡(luò)營(yíng)銷、武侯企業(yè)策劃、武侯品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運(yùn)營(yíng)等,從售前售中售后,我們都將竭誠(chéng)為您服務(wù),您的肯定,是我們最大的嘉獎(jiǎng);創(chuàng)新互聯(lián)公司為所有大學(xué)生創(chuàng)業(yè)者提供武侯建站搭建服務(wù),24小時(shí)服務(wù)熱線:18980820575,官方網(wǎng)址:jinyejixie.com

  • (1)窄依賴:每個(gè)parent RDD 的 partition 最多被 child RDD 的一個(gè)partition 使用。
  • (2)寬依賴:每個(gè)parent RDD partition 被多個(gè) child RDD 的partition 使用。

窄依賴每個(gè) child RDD 的 partition 的生成操作都是可以并行的,而寬依賴則需要所有的 parent RDD partition shuffle 結(jié)果得到后再進(jìn)行。

二、org.apache.spark.Dependency.scala 源碼解析

Dependency是一個(gè)抽象類:

// Denpendency.scala
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

它有兩個(gè)子類:NarrowDependency 和 ShuffleDenpendency,分別對(duì)應(yīng)窄依賴和寬依賴。

(1)NarrowDependency也是一個(gè)抽象類

定義了抽象方法getParents,輸入partitionId,用于獲得child RDD 的某個(gè)partition依賴的parent RDD的所有 partitions。

// Denpendency.scala
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {  
/**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

窄依賴又有兩個(gè)具體的實(shí)現(xiàn):OneToOneDependency和RangeDependency。
(a)OneToOneDependency指child RDD的partition只依賴于parent RDD 的一個(gè)partition,產(chǎn)生OneToOneDependency的算子有map,filter,flatMap等??梢钥吹絞etParents實(shí)現(xiàn)很簡(jiǎn)單,就是傳進(jìn)去一個(gè)partitionId,再把partitionId放在List里面?zhèn)鞒鋈ァ?/p>

// Denpendency.scala
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
        (b)RangeDependency指child RDD partition在一定的范圍內(nèi)一對(duì)一的依賴于parent RDD partition,主要用于union。

// Denpendency.scala
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)  
  extends NarrowDependency[T](rdd) {//inStart表示parent RDD的開始索引,outStart表示child RDD 的開始索引
  override def getParents(partitionId: Int): List[Int] = {    
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)//表示于當(dāng)前索引的相對(duì)位置
    } else {
      Nil
    }
  }
}
(2)ShuffleDependency指寬依賴

表示一個(gè)parent RDD的partition會(huì)被child RDD的partition使用多次。需要經(jīng)過shuffle才能形成。

// Denpendency.scala
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],    
    val partitioner: Partitioner,    
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {  //shuffle都是基于PairRDD進(jìn)行的,所以傳入的RDD要是key-value類型的
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)  //獲取shuffleId
  val shuffleId: Int = _rdd.context.newShuffleId()  //向shuffleManager注冊(cè)shuffle信息
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

由于shuffle涉及到網(wǎng)絡(luò)傳輸,所以要有序列化serializer,為了減少網(wǎng)絡(luò)傳輸,可以map端聚合,通過mapSideCombine和aggregator控制,還有key排序相關(guān)的keyOrdering,以及重輸出的數(shù)據(jù)如何分區(qū)的partitioner,還有一些class信息。Partition之間的關(guān)系在shuffle處戛然而止,因此shuffle是劃分stage的依據(jù)。

三、兩種依賴的區(qū)分

首先,窄依賴允許在一個(gè)集群節(jié)點(diǎn)上以流水線的方式(pipeline)計(jì)算所有父分區(qū)。例如,逐個(gè)元素地執(zhí)行map、然后filter操作;而寬依賴則需要首先計(jì)算好所有父分區(qū)數(shù)據(jù),然后在節(jié)點(diǎn)之間進(jìn)行Shuffle,這與MapReduce類似。第二,窄依賴能夠更有效地進(jìn)行失效節(jié)點(diǎn)的恢復(fù),即只需重新計(jì)算丟失RDD分區(qū)的父分區(qū),而且不同節(jié)點(diǎn)之間可以并行計(jì)算;而對(duì)于一個(gè)寬依賴關(guān)系的Lineage圖,單個(gè)節(jié)點(diǎn)失效可能導(dǎo)致這個(gè)RDD的所有祖先丟失部分分區(qū),因而需要整體重新計(jì)算。

分享名稱:RDD血緣關(guān)系源碼詳解!
路徑分享:http://jinyejixie.com/article20/gpepjo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設(shè)計(jì)、域名注冊(cè)網(wǎng)站收錄、網(wǎng)站內(nèi)鏈、虛擬主機(jī)、建站公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)公司

網(wǎng)站設(shè)計(jì)公司知識(shí)

洱源县| 阿图什市| 沁阳市| 门源| 南雄市| 罗定市| 吴旗县| 南汇区| 江西省| 山阴县| 嘉义县| 镇远县| 来安县| 温州市| 大足县| 铜山县| 田林县| 静海县| 张家界市| 池州市| 丹巴县| 海门市| 从化市| 平安县| 泰兴市| 驻马店市| 女性| 沈阳市| 金乡县| 祁连县| 子长县| 嵩明县| 凤冈县| 修武县| 星子县| 安仁县| 湘阴县| 泉州市| 正蓝旗| 恭城| 丘北县|