Golang被證明非常適合并發(fā)編程,goroutine比異步編程更易讀、優(yōu)雅、高效。本文提出一個適合由Golang實現(xiàn)的Pipeline執(zhí)行模型,適合批量處理大量數(shù)據(jù)(ETL)的情景。
站在用戶的角度思考問題,與客戶深入溝通,找到蒲縣網(wǎng)站設計與蒲縣網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗,讓設計與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個性化、用戶體驗好的作品,建站類型包括:做網(wǎng)站、網(wǎng)站設計、企業(yè)官網(wǎng)、英文網(wǎng)站、手機端網(wǎng)站、網(wǎng)站推廣、域名注冊、虛擬主機、企業(yè)郵箱。業(yè)務覆蓋蒲縣地區(qū)。想象這樣的應用情景:
從數(shù)據(jù)庫A(Cassandra)加載用戶評論(量巨大,例如10億條);根據(jù)每條評論的用戶ID、從數(shù)據(jù)庫B(MySQL)關(guān)聯(lián)用戶資料;調(diào)用NLP服務(自然語言處理),處理每條評論;將處理結(jié)果寫入數(shù)據(jù)庫C(ElasticSearch)。
由于應用中遇到的各種問題,歸納出這些需求:
需求一:應分批處理數(shù)據(jù),例如規(guī)定每批100條。出現(xiàn)問題時(例如任意一個數(shù)據(jù)庫故障)則中斷,下次程序啟動時使用checkpoint從中斷處恢復。
需求二:每個流程設置合理的并發(fā)數(shù)、讓數(shù)據(jù)庫和NLP服務有合理的負載(不影響其它業(yè)務的基礎上,盡可能占用更多資源以提高ETL性能)。例如,步驟(1)-(4)分別設置并發(fā)數(shù)1、4、8、2。
這就是一個典型的Pipeline(流水線)執(zhí)行模型。把每一批數(shù)據(jù)(例如100條)看作流水線上的產(chǎn)品,4個步驟對應流水線上4個處理工序,每個工序處理完畢后就把半成品交給下一個工序。每個工序可以同時處理的產(chǎn)品數(shù)各不相同。
你可能首先想到啟用1+4+8+2個goroutine,使用channel來傳遞數(shù)據(jù)。我也曾經(jīng)這么干,結(jié)論就是這么干會讓程序員瘋掉:流程并發(fā)控制代碼非常復雜,特別是你得處理異常、執(zhí)行時間超出預期、可控中斷等問題,你不得不加入一堆channel,直到你自己都不記得有什么用。
可重用的Pipeline模塊
為了更高效完成ETL工作,我將Pipeline抽象成模塊。我先把代碼粘貼出來,再解析含義。模塊可以直接使用,主要使用的接口是:NewPipeline、Async、Wait。
使用這個Pipeline組件,我們的ETL程序?qū)唵?、高效、可靠,讓程序員從繁瑣的并發(fā)流程控制中解放出來:
package main import "log" func main() { //恢復上次執(zhí)行的checkpoint,如果是第一次執(zhí)行就獲取一個初始值。 checkpoint := loadCheckpoint() //工序(1)在pipeline外執(zhí)行,最后一個工序是保存checkpoint pipeline := NewPipeline(4, 8, 2, 1) for { //(1) //加載100條數(shù)據(jù),并修改變量checkpoint //data是數(shù)組,每個元素是一條評論,之后的聯(lián)表、NLP都直接修改data里的每條記錄。 data, err := extractReviewsFromA(&checkpoint, 100) if err != nil { log.Print(err) break } //這里有個Golang著名的坑。 //“checkpoint”是循環(huán)體外的變量,它在內(nèi)存中只有一個實例并在循環(huán)中不斷被修改,所以不能在異步中使用它。 //這里創(chuàng)建一個副本curCheckpoint,儲存本次循環(huán)的checkpoint。 curCheckpoint := checkpoint ok := pipeline.Async(func() error { //(2) return joinUserFromB(data) }, func() error { //(3) return nlp(data) }, func() error { //(4) return loadDataToC(data) }, func() error { //(5)保存checkpoint log.Print("done:", curCheckpoint) return saveCheckpoint(curCheckpoint) }) if !ok { break } if len(data) < 100 { break } //處理完畢 } err := pipeline.Wait() if err != nil { log.Print(err) } }
以上就是golang 如何處理大數(shù)據(jù)的詳細內(nèi)容,更多請關(guān)注創(chuàng)新互聯(lián)成都網(wǎng)站設計公司其它相關(guān)文章!
另外有需要云服務器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。
分享題目:go語言批量處理大量數(shù)據(jù)的方法-創(chuàng)新互聯(lián)
地址分享:http://jinyejixie.com/article30/ghoso.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供用戶體驗、網(wǎng)站設計、網(wǎng)站導航、響應式網(wǎng)站、網(wǎng)頁設計公司、靜態(tài)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容