成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

Giraph源碼分析(五)——加載數(shù)據(jù)+同步總結(jié)-創(chuàng)新互聯(lián)

作者|白松

創(chuàng)新互聯(lián)公司聯(lián)系電話:18980820575,為您提供成都網(wǎng)站建設(shè)網(wǎng)頁(yè)設(shè)計(jì)及定制高端網(wǎng)站建設(shè)服務(wù),創(chuàng)新互聯(lián)公司網(wǎng)頁(yè)制作領(lǐng)域10余年,包括成都地磅秤等多個(gè)行業(yè)擁有豐富的營(yíng)銷推廣經(jīng)驗(yàn),選擇創(chuàng)新互聯(lián)公司,為企業(yè)保駕護(hù)航。

關(guān)于Giraph 共有九個(gè)章節(jié),本文第五個(gè)章節(jié)。

環(huán)境:在單機(jī)上(機(jī)器名:giraphx)啟動(dòng)了2個(gè)workers。

輸入:SSSP文件夾,里面有1.txt和2.txt兩個(gè)文件。

1、在Worker向Master匯報(bào)健康狀況后,就開(kāi)始等待Master創(chuàng)建InputSplit。

方法:每個(gè)Worker通過(guò)檢某個(gè)Znode節(jié)點(diǎn)是否存在,同時(shí)在此Znode上設(shè)置Watcher。若不存在,就通過(guò)BSPEvent的waitForever()方法釋放當(dāng)前線程的鎖,陷入等待狀態(tài)。一直等到master創(chuàng)建該znode。此步驟位于BSPServiceWorker類中的startSuperStep方法中,等待代碼如下:


2、Master調(diào)用createInputSplits()方法創(chuàng)建InputSplit。

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

在generateInputSplits()方法中,根據(jù)用戶設(shè)定的VertexInputFormat獲得InputSplits。代碼如下:

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

其中minSplitCountHint為創(chuàng)建split的最小數(shù)目,其值如下:

minSplitCountHint = Workers數(shù)目 * NUM_INPUT_THREADS

NUM_INPUT_THREADS表示 每個(gè)Input split loading的線程數(shù)目,默認(rèn)值為1 。 經(jīng)查證,在TextVertexValueInputFormat抽象類中的getSplits()方法中的minSplitCountHint參數(shù)被忽略。用戶輸入的VertexInputFormat繼承TextVertexValueInputFormat抽象類。

如果得到的splits.size小于minSplitCountHint,那么有些worker就沒(méi)被用上。

得到split信息后,要把這些信息寫到Zookeeper上,以便其他workers訪問(wèn)。上面得到的split信息如下:

[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]

遍歷splits List,為每個(gè)split創(chuàng)建一個(gè)Znode,值為split的信息。如為split-0創(chuàng)建Znode,值為:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0

為split-1創(chuàng)建znode(如下),值為:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1

最后創(chuàng)建znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示所有splits都創(chuàng)建好了。

3、Master根據(jù)splits創(chuàng)建Partitions。首先確定partition的數(shù)目。

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

BSPServiceMaster中的MasterGraphPartitioner<I.V,E,M>對(duì)象默認(rèn)為HashMasterPartitioner。它的createInitialPartitionOwners()方法如下:

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

上面代碼中是在工具類PartitionUtils計(jì)算Partition的數(shù)目,計(jì)算公式如下:

partitionCount=PARTITION_COUNT_MULTIPLIER availableWorkerInfos.size() availableWorkerInfos.size() ,其中PARTITION_COUNT_MULTIPLIER表示Multiplier for the current workers squared,默認(rèn)值為1 。

可見(jiàn),partitionCount值為4(122)。創(chuàng)建的partitionOwnerList信息如下:

[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),

(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]

4、Master創(chuàng)建Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir,用于后面的exchange partition。

5、Master最后在assignPartitionOwners()方法中

把masterinfo,chosenWorkerInfoList,partitionOwners等信息寫入Znode中(作為Znode的data),該Znode的路徑為: /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions 。

Master調(diào)用barrierOnWorkerList()方法開(kāi)始等待各個(gè)Worker完成數(shù)據(jù)加載。調(diào)用關(guān)系如下:

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

barrierOnWorkerList中創(chuàng)建znode,path=/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir 。然后檢查該znode的子節(jié)點(diǎn)數(shù)目是否等于workers的數(shù)目,若不等于,則線程陷入等待狀態(tài)。后面某個(gè)worker完成數(shù)據(jù)加載后,會(huì)創(chuàng)建子node(如 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1)來(lái)激活該線程繼續(xù)判斷。

6、當(dāng)Master創(chuàng)建第5步的znode后,會(huì)激活worker。

每個(gè)worker從znode上讀出data,data包含masterInfo,WorkerInfoList和partitionOwnerList,然后各個(gè)worker開(kāi)始加載數(shù)據(jù)。

把partitionOwnerList復(fù)制給BSPServiceWorker類中的workerGraphPartitioner(默認(rèn)為HashWorkerPartitioner類型)對(duì)象的partitionOwnerList變量,后續(xù)每個(gè)頂點(diǎn)把根據(jù)vertexID通過(guò)workerGraphPartitioner對(duì)象獲取其對(duì)應(yīng)的partitionOwner。

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

每個(gè)Worker從znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir獲取子節(jié)點(diǎn),得到inputSplitPathList,內(nèi)容如下:

[/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1,

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0]

然后每個(gè)Worker創(chuàng)建N個(gè)InputsCallable線程讀取數(shù)據(jù)。N=Min(NUM_INPUT_THREADS,maxInputSplitThread),其中NUM_INPUT_THREADS默認(rèn)值為1,maxInputSplitThread=(InputSplitSize-1/maxWorkers +1

那么,默認(rèn)每個(gè)worker就是創(chuàng)建一個(gè)線程來(lái)加載數(shù)據(jù)。

在InputSplitsHandler類中的reserveInputSplit()方法中,每個(gè)worker都是遍歷inputSplitPathList,通過(guò)創(chuàng)建znode來(lái)保留(標(biāo)識(shí)要處理)的split。代碼及注釋如下:

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

當(dāng)用reserveInputSplit()方法獲取某個(gè)znode后,loadSplitsCallable類的loadInputSplit方法就開(kāi)始通過(guò)該znode獲取其HDFS的路徑信息,然后讀入數(shù)據(jù)、重分布數(shù)據(jù)。

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

VertexInputSplitsCallable類的readInputSplit()方法如下:

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

7、每個(gè)worker加載完數(shù)據(jù)后,調(diào)用waitForOtherWorkers()方法等待其他workers都處理完split。

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

策略如下,每個(gè)worker在/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir目錄下創(chuàng)建子節(jié)點(diǎn),后面追加自己的worker信息,如worker1、worker2創(chuàng)建的子節(jié)點(diǎn)分別如下:

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_2

創(chuàng)建完后,然后等待master創(chuàng)建/_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone。

8、從第5步驟可知,若master發(fā)現(xiàn)/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir下的子節(jié)點(diǎn)數(shù)目等于workers的總數(shù)目,就會(huì)在coordinateInputSplits()方法中創(chuàng)建

_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone,告訴每個(gè)worker,所有的worker都處理完了split。

9、最后就是就行全局同步。

master創(chuàng)建znode,path=/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir ,然后再調(diào)用barrierOnWorkerList方法檢查該znode的子節(jié)點(diǎn)數(shù)目是否等于workers的數(shù)目,若不等于,則線程陷入等待狀態(tài)。等待worker創(chuàng)建子節(jié)點(diǎn)來(lái)激活該線程繼續(xù)判斷。

每個(gè)worker獲取自身的Partition Stats,進(jìn)入finishSuperStep方法中,等待所有的Request都被處理完;把自身的Aggregator信息發(fā)送給master;創(chuàng)建子節(jié)點(diǎn),如/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir/giraphx_1,data為該worker的partitionStatsList和workerSentMessages統(tǒng)計(jì)量;

最后調(diào)用waitForOtherWorkers()方法等待master創(chuàng)建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節(jié)點(diǎn)。

master發(fā)現(xiàn)/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir的子節(jié)點(diǎn)數(shù)目等于workers數(shù)目后,根據(jù)/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir子節(jié)點(diǎn)上的data收集每個(gè)worker發(fā)送的aggregator信息,匯總為globalStats。

Master若發(fā)現(xiàn)全局信息中(1)所有頂點(diǎn)都voteHalt且沒(méi)有消息傳遞,或(2)達(dá)到大迭代次數(shù) 時(shí),設(shè)置 globalStats.setHaltComputation(true)。告訴works結(jié)束迭代。

master創(chuàng)建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節(jié)點(diǎn),data為globalStats。告訴所有workers當(dāng)前超級(jí)步結(jié)束。

每個(gè)Worker檢測(cè)到master創(chuàng)建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節(jié)點(diǎn)后,讀出該znode的數(shù)據(jù),即全局的統(tǒng)計(jì)信息。然后決定是否繼續(xù)下一次迭代。

10、同步之后開(kāi)始下一個(gè)超級(jí)步。

11、master和workers同步過(guò)程總結(jié)。

(1)master創(chuàng)建znode A,然后檢測(cè)A的子節(jié)點(diǎn)數(shù)目是否等于workers數(shù)目,不等于就陷入等待。某個(gè)worker創(chuàng)建一個(gè)子節(jié)點(diǎn)后,就會(huì)喚醒master進(jìn)行檢測(cè)一次。

(2)每個(gè)worker進(jìn)行自己的工作,完成后,創(chuàng)建A的子節(jié)點(diǎn)A1。然后等待master創(chuàng)建znode B。

(3)若master檢測(cè)到A的子節(jié)點(diǎn)數(shù)目等于workers的數(shù)目時(shí),創(chuàng)建Znode B

(4)master創(chuàng)建B 節(jié)點(diǎn)后,會(huì)激活各個(gè)worker。同步結(jié)束,各個(gè)worker就可以開(kāi)始下一個(gè)超步。

本質(zhì)是通過(guò)znode B來(lái)進(jìn)行全局同步的。

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。

新聞標(biāo)題:Giraph源碼分析(五)——加載數(shù)據(jù)+同步總結(jié)-創(chuàng)新互聯(lián)
當(dāng)前網(wǎng)址:http://jinyejixie.com/article2/egdoc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作、響應(yīng)式網(wǎng)站、面包屑導(dǎo)航、網(wǎng)站建設(shè)、軟件開(kāi)發(fā)、靜態(tài)網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)公司
金沙县| 壶关县| 英吉沙县| 博湖县| 夏河县| 夏河县| 甘孜县| 会昌县| 金塔县| 广河县| 安仁县| 澎湖县| 梓潼县| 新绛县| 沂水县| 长子县| 漳平市| 中西区| 化德县| 陆丰市| 德格县| 韶关市| 广元市| 汕头市| 南江县| 增城市| 邵阳市| 莱芜市| 崇义县| 乐清市| 卢龙县| 治多县| 浦北县| 来宾市| 嘉定区| 泗阳县| 罗城| 巴塘县| 筠连县| 霍山县| 彰化市|