成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

Spark閉包中driver及executor程序代碼是怎樣執(zhí)行的

Spark閉包中driver及executor程序代碼是怎樣執(zhí)行的,針對這個(gè)問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。

侯馬ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場景,ssl證書未來市場廣闊!成為成都創(chuàng)新互聯(lián)公司的ssl證書銷售渠道,可以享受市場價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:13518219792(備注:SSL證書合作)期待與您的合作!

Spark中的閉包

閉包的作用可以理解為:函數(shù)可以訪問函數(shù)外部定義的變量,但是函數(shù)內(nèi)部對該變量進(jìn)行的修改,在函數(shù)外是不可見的,即對函數(shù)外源變量不會產(chǎn)生影響。

Spark閉包中driver及executor程序代碼是怎樣執(zhí)行的

其實(shí),在學(xué)習(xí)Spark時(shí),一個(gè)比較難理解的點(diǎn)就是,在集群模式下,定義的變量和方法作用域的范圍和生命周期。這在你操作RDD時(shí),比如調(diào)用一些函數(shù)map、foreach時(shí),訪問其外部變量進(jìn)行操作時(shí),很容易產(chǎn)生疑惑。為什么我本地程序運(yùn)行良好且結(jié)果正確,放到集群上卻得不到想要的結(jié)果呢?

首先通過下邊對RDD中的元素進(jìn)行求和的示例,來看相同的代碼本地模式和集群模式運(yùn)行結(jié)果的區(qū)別:

Spark閉包中driver及executor程序代碼是怎樣執(zhí)行的

Spark為了執(zhí)行  任務(wù),會將RDD的操作分解為多個(gè)task,并且這些task是由executor執(zhí)行的。  在執(zhí)行之前,Spark會計(jì)算task的閉包即定義的一些變量和方法,比如例子中的counter變量和foreach方法,并且閉包必須對executor而言是可見的,這些閉包會被序列化發(fā)送到每個(gè)executor。
在集群  模式下,driver和executor運(yùn)行在不同的JVM進(jìn)程中,發(fā)送給每個(gè)executor的閉包中的變量是driver端變量的副本。  因此,當(dāng)foreach函數(shù)內(nèi)引用counter時(shí),其實(shí)處理的只是driver端變量的副本,與driver端本身的counter無關(guān)。  driver節(jié)點(diǎn)的內(nèi)存中仍有一個(gè)計(jì)數(shù)器,但該變量對executor是不可見的!  executor只能看到序列化閉包的副本。  因此,上述例子輸出的counter最終值仍然為零,因?yàn)閏ounter上的所有操作都只是引用了序列化閉包內(nèi)的值。
在本地模式下,往往driver和executor運(yùn)行在同一JVM進(jìn)程中。那么這些閉包將會被共享,executor操作的counter和driver持有的counter是同一個(gè),那么counter在處理后最終值為6。

但是在生產(chǎn)中,我們的任務(wù)都是在集群模式下運(yùn)行,如何能滿足這種業(yè)務(wù)場景呢?

這就必須引出一個(gè)后續(xù)要重點(diǎn)講解的概念:Accumulator即累加器。Spark中的累加器專門用于提供一種機(jī)制,用于在集群中的各個(gè)worker節(jié)點(diǎn)之間執(zhí)行時(shí)安全地更新變量。

Spark閉包中driver及executor程序代碼是怎樣執(zhí)行的

一般來  說,closures - constructs比如循環(huán)或本地定義的方法,就不應(yīng)該被用來改變一些全局狀態(tài),Spark并沒有定義或保證對從閉包外引用的對象進(jìn)行更新的行為。  如果你這樣操作只會導(dǎo)致一些代碼在本地模式下能夠達(dá)到預(yù)期的效果,但是在分布式環(huán)境下卻事與愿違。  如果需要某些全局聚合,請改用累加器。  對于其他的業(yè)務(wù)場景,我們適時(shí)考慮引入外部存儲系統(tǒng)、廣播變量等。  
 
閉包函數(shù)從產(chǎn)生到在executor執(zhí)行經(jīng)歷了什么?

首先,對RDD相關(guān)的操作需要傳入閉包函數(shù),如果這個(gè)函數(shù)需要訪問外部定義的變量,就需要滿足一定條件(比如必須可被序列化),否則會拋出運(yùn)行時(shí)異常。閉包函數(shù)在最終傳入到executor執(zhí)行,需要經(jīng)歷以下步驟:

1.driver通過反射,運(yùn)行時(shí)找到閉包訪問的變量,并封裝成一個(gè)對象,然后序列化該對象

2.將序列化后的對象通過網(wǎng)絡(luò)傳輸?shù)絯orker節(jié)點(diǎn)

3.worker節(jié)點(diǎn)反序列化閉包對象

4.worker節(jié)點(diǎn)的executor執(zhí)行閉包函數(shù)

簡而言之,就是要通過網(wǎng)絡(luò)傳遞函數(shù)、然后執(zhí)行,期間會經(jīng)歷序列化和反序列化,所以要求被傳遞的變量必須可以被序列化和反序列化,否則會拋類似Error:Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects這樣的異常。即使是本地執(zhí)行時(shí),也會按照上述的步驟執(zhí)行,這也是為什么不允許在RDD內(nèi)部直接操作RDD的原因(SparkContext不支持序列化)。同時(shí),在這些算子閉包內(nèi)修改外部定義的變量不會被反饋到driver端。
driver & executor

driver是運(yùn)行用戶編寫Application 的main()函數(shù)的地方,具體負(fù)責(zé)DAG的構(gòu)建、任務(wù)的劃分、task的生成與調(diào)度等。job,stage,task生成都離不開rdd自身,rdd的相關(guān)的操作不能缺少driver端的sparksession/sparkcontext。

executor是真正執(zhí)行task地方,而task執(zhí)行離不開具體的數(shù)據(jù),這些task運(yùn)行的結(jié)果可以是shuffle中間結(jié)果,也可以持久化到外部存儲系統(tǒng)。一般都是將結(jié)果、狀態(tài)等匯集到driver。但是,目前executor之間不能互相通信,只能借助第三方來實(shí)現(xiàn)數(shù)據(jù)的共享或者通信。
編寫的Spark程序代碼,運(yùn)行在driver端還是executor端呢?
先看個(gè)簡單例子:通常我們在本地測試程序的時(shí)候,要打印RDD中的數(shù)據(jù)。
在本地模式下,直接使用rdd.foreach(println)或rdd.map(println)在單臺機(jī)器上,能夠按照預(yù)期打印并輸出所有RDD的元素。
但是,在集群模式下,由executor執(zhí)行輸出寫入的是executor的stdout,而不是driver上的stdout,所以driver的stdout不會顯示這些!
要想在driver端打印所有元素,可以使用collect()方法先將RDD數(shù)據(jù)帶到driver節(jié)點(diǎn),然后在調(diào)用foreach(println)(但需要注意一點(diǎn),由于會把RDD中所有元素都加載到driver端,可能引起driver端內(nèi)存不足導(dǎo)致OOM。如果你只是想獲取RDD中的部分元素,可以考慮使用take或者top方法)
總之,在這里RDD中的元素即為具體的數(shù)據(jù),對這些數(shù)據(jù)的操作都是由負(fù)責(zé)task執(zhí)行的executor處理的,所以想在driver端輸出這些數(shù)據(jù)就必須先將數(shù)據(jù)加載到driver端進(jìn)行處理。

最后做個(gè)總結(jié):所有對RDD具體數(shù)據(jù)的操作都是在executor上執(zhí)行的,所有對rdd自身的操作都是在driver上執(zhí)行的。比如foreach、foreachPartition都是針對rdd內(nèi)部數(shù)據(jù)進(jìn)行處理的,所以我們傳遞給這些算子的函數(shù)都是執(zhí)行于executor端的。但是像foreachRDD、transform則是對RDD本身進(jìn)行一列操作,所以它的參數(shù)函數(shù)是執(zhí)行在driver端的,那么它內(nèi)部是可以使用外部變量,比如在SparkStreaming程序中操作offset、動態(tài)更新廣播變量等。

關(guān)于Spark閉包中driver及executor程序代碼是怎樣執(zhí)行的問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識。

網(wǎng)站名稱:Spark閉包中driver及executor程序代碼是怎樣執(zhí)行的
當(dāng)前URL:http://jinyejixie.com/article46/ggchhg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供軟件開發(fā)、移動網(wǎng)站建設(shè)、建站公司、網(wǎng)站策劃、虛擬主機(jī)動態(tài)網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

綿陽服務(wù)器托管
陕西省| 宜州市| 铜川市| 苏尼特右旗| 交口县| 抚顺县| 依安县| 托里县| 诸暨市| 大兴区| 蕉岭县| 集贤县| 奎屯市| 宁阳县| 基隆市| 寿宁县| 县级市| 勃利县| 宜阳县| 扎囊县| 乌鲁木齐县| 兴山县| 平江县| 双流县| 丰都县| 自治县| 岗巴县| 东兴市| 铜梁县| 博爱县| 太康县| 东宁县| 文水县| 共和县| 宕昌县| 崇左市| 获嘉县| 集贤县| 嘉黎县| 德州市| 清原|