RDD的依賴關(guān)系分為兩類:寬依賴和窄依賴。我們可以這樣認(rèn)為:
創(chuàng)新互聯(lián)公司專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于網(wǎng)站制作、成都網(wǎng)站建設(shè)、武侯網(wǎng)絡(luò)推廣、小程序開發(fā)、武侯網(wǎng)絡(luò)營(yíng)銷、武侯企業(yè)策劃、武侯品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運(yùn)營(yíng)等,從售前售中售后,我們都將竭誠(chéng)為您服務(wù),您的肯定,是我們最大的嘉獎(jiǎng);創(chuàng)新互聯(lián)公司為所有大學(xué)生創(chuàng)業(yè)者提供武侯建站搭建服務(wù),24小時(shí)服務(wù)熱線:18980820575,官方網(wǎng)址:jinyejixie.com
窄依賴每個(gè) child RDD 的 partition 的生成操作都是可以并行的,而寬依賴則需要所有的 parent RDD partition shuffle 結(jié)果得到后再進(jìn)行。
Dependency是一個(gè)抽象類:
// Denpendency.scala
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
它有兩個(gè)子類:NarrowDependency 和 ShuffleDenpendency,分別對(duì)應(yīng)窄依賴和寬依賴。
定義了抽象方法getParents,輸入partitionId,用于獲得child RDD 的某個(gè)partition依賴的parent RDD的所有 partitions。
// Denpendency.scala
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
窄依賴又有兩個(gè)具體的實(shí)現(xiàn):OneToOneDependency和RangeDependency。
(a)OneToOneDependency指child RDD的partition只依賴于parent RDD 的一個(gè)partition,產(chǎn)生OneToOneDependency的算子有map,filter,flatMap等??梢钥吹絞etParents實(shí)現(xiàn)很簡(jiǎn)單,就是傳進(jìn)去一個(gè)partitionId,再把partitionId放在List里面?zhèn)鞒鋈ァ?/p>
// Denpendency.scala
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
(b)RangeDependency指child RDD partition在一定的范圍內(nèi)一對(duì)一的依賴于parent RDD partition,主要用于union。
// Denpendency.scala
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {//inStart表示parent RDD的開始索引,outStart表示child RDD 的開始索引
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)//表示于當(dāng)前索引的相對(duì)位置
} else {
Nil
}
}
}
表示一個(gè)parent RDD的partition會(huì)被child RDD的partition使用多次。需要經(jīng)過shuffle才能形成。
// Denpendency.scala
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] { //shuffle都是基于PairRDD進(jìn)行的,所以傳入的RDD要是key-value類型的
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName) //獲取shuffleId
val shuffleId: Int = _rdd.context.newShuffleId() //向shuffleManager注冊(cè)shuffle信息
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
由于shuffle涉及到網(wǎng)絡(luò)傳輸,所以要有序列化serializer,為了減少網(wǎng)絡(luò)傳輸,可以map端聚合,通過mapSideCombine和aggregator控制,還有key排序相關(guān)的keyOrdering,以及重輸出的數(shù)據(jù)如何分區(qū)的partitioner,還有一些class信息。Partition之間的關(guān)系在shuffle處戛然而止,因此shuffle是劃分stage的依據(jù)。
首先,窄依賴允許在一個(gè)集群節(jié)點(diǎn)上以流水線的方式(pipeline)計(jì)算所有父分區(qū)。例如,逐個(gè)元素地執(zhí)行map、然后filter操作;而寬依賴則需要首先計(jì)算好所有父分區(qū)數(shù)據(jù),然后在節(jié)點(diǎn)之間進(jìn)行Shuffle,這與MapReduce類似。第二,窄依賴能夠更有效地進(jìn)行失效節(jié)點(diǎn)的恢復(fù),即只需重新計(jì)算丟失RDD分區(qū)的父分區(qū),而且不同節(jié)點(diǎn)之間可以并行計(jì)算;而對(duì)于一個(gè)寬依賴關(guān)系的Lineage圖,單個(gè)節(jié)點(diǎn)失效可能導(dǎo)致這個(gè)RDD的所有祖先丟失部分分區(qū),因而需要整體重新計(jì)算。
分享名稱:RDD血緣關(guān)系源碼詳解!
路徑分享:http://jinyejixie.com/article20/gpepjo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設(shè)計(jì)、域名注冊(cè)、網(wǎng)站收錄、網(wǎng)站內(nèi)鏈、虛擬主機(jī)、建站公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)