本篇內(nèi)容介紹了“Exactly once事務(wù)的處理方法是什么”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
成都創(chuàng)新互聯(lián)公司是專業(yè)的臨淄網(wǎng)站建設(shè)公司,臨淄接單;提供網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì),網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行臨淄網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!
1,Exactly once 事務(wù)
什么事Exactly once 事務(wù)?
數(shù)據(jù)僅處理一次并且僅輸出一次,這樣才是完整的事務(wù)處理。
Spark在運(yùn)行出錯時不能保證輸出也是事務(wù)級別的。在Task執(zhí)行一半的時候出錯了,雖然在語義上做了事務(wù)處理,數(shù)據(jù)僅被處理一次,但是如果是輸出到數(shù)據(jù)庫中,那有空能將結(jié)果多次保存到數(shù)據(jù)庫中。Spark在任務(wù)失敗時會進(jìn)行重試,這樣會導(dǎo)致結(jié)果多次保存到數(shù)據(jù)庫中。
如下圖,當(dāng)運(yùn)行在Executor上的Receiver接收到數(shù)據(jù)通過BlockManager寫入內(nèi)存和磁盤,或者通過WAL機(jī)制寫記錄日志,然后把metedata信息匯報(bào)給Driver。在Driver端定期進(jìn)行checkpoint操作。Job的執(zhí)行還是基于Spark Core的調(diào)度模式在Executor上執(zhí)行Task。
Exactly once 事務(wù)的處理:
1,數(shù)據(jù)零丟失:必須有可靠的數(shù)據(jù)來源和可靠的Receiver,且整個應(yīng)用程序的metadata必須進(jìn)行checkpoint,且通過WAL來保證數(shù)據(jù)安全。
我們以數(shù)據(jù)來自Kafka為例,運(yùn)行在Executor上的Receiver在接收到來自Kafka的數(shù)據(jù)時會向Kafka發(fā)送ACK確認(rèn)收到信息并讀取下一條信息,kafka會updateOffset來記錄Receiver接收到的偏移,這種方式保證了在Executor數(shù)據(jù)零丟失。
在Driver端,定期進(jìn)行checkpoint操作,出錯時從Checkpoint的文件系統(tǒng)中把數(shù)據(jù)讀取進(jìn)來進(jìn)行恢復(fù),內(nèi)部會重新構(gòu)建StreamingContext(也就是構(gòu)建SparkContext)并啟動,恢復(fù)出元數(shù)據(jù)metedata,再次產(chǎn)生RDD,恢復(fù)的是上次的Job,然后再次提交到集群執(zhí)行。
那么數(shù)據(jù)可能丟失的地方有哪些呢和相應(yīng)的解決方式?
在Receiver收到數(shù)據(jù)且通過Driver的調(diào)度Executor開始計(jì)算數(shù)據(jù)的時候,如果Driver突然奔潰,則此時Executor會被殺死,那么Executor中的數(shù)據(jù)就會丟失(如果沒有進(jìn)行WAL的操作)。
解決方式:此時就必須通過例如WAL的方式,讓所有的數(shù)據(jù)都通過例如HDFS的方式首先進(jìn)行安全性容錯處理。此時如果Executor中的數(shù)據(jù)丟失的話,就可以通過WAL恢復(fù)回來。
這種方式的弊端是通過WAL的方式會極大額損傷SparkStreaming中Receivers接收數(shù)據(jù)的性能。
數(shù)據(jù)重復(fù)讀取的情況:
在Receiver收到數(shù)據(jù)保存到HDFS等持久化引擎但是沒有來得及進(jìn)行updateOffsets(以Kafka為例),此時Receiver崩潰后重新啟動就會通過管理Kafka的Zookeeper中元數(shù)據(jù)再次重復(fù)讀取數(shù)據(jù),但是此時SparkStreaming認(rèn)為是成功的,但是kafka認(rèn)為是失敗的(因?yàn)闆]有更新offset到ZooKeeper中),此時就會導(dǎo)致數(shù)據(jù)重新消費(fèi)的情況。
解決方式:以Receiver基于ZooKeeper的方式,當(dāng)讀取數(shù)據(jù)時去訪問Kafka的元數(shù)據(jù)信息,在處理代碼中例如foreachRDD或transform時,將信息寫入到內(nèi)存數(shù)據(jù)庫中(memorySet),在計(jì)算時讀取內(nèi)存數(shù)據(jù)庫信息,判斷是否已處理過,如果以處理過則跳過計(jì)算。這些元數(shù)據(jù)信息可以保存到內(nèi)存數(shù)據(jù)結(jié)構(gòu)或者memsql,sqllite中。
如果通過Kafka作為數(shù)據(jù)來源的話,Kafka中有數(shù)據(jù),然后Receiver接收的時候又會有數(shù)據(jù)副本,這個時候其實(shí)是存儲資源的浪費(fèi)。
Spark在1.3的時候?yàn)榱吮苊釽AL的性能損失和實(shí)現(xiàn)Exactly Once而提供了Kafka Direct API,把Kafka作為文件存儲系統(tǒng)。此時兼具有流的優(yōu)勢和文件系統(tǒng)的優(yōu)勢,至此Spark Streaming+Kafka就構(gòu)建了完美的流處理世界(1,數(shù)據(jù)不需要拷貝副本;2,不需要WAL對性能的損耗;3,Kafka使用ZeroCopy比HDFS更高效)。所有的Executors通過Kafka API直接消息數(shù)據(jù),直接管理Offset,所以也不會重復(fù)消費(fèi)數(shù)據(jù)。
2,輸出不重復(fù)
關(guān)于Spark Streaming數(shù)據(jù)輸出多次重寫及其解決方案:
1,為什么會有這個問題,因?yàn)镾park Streaming在計(jì)算的時候基于Spark Core天生會做以下事情導(dǎo)致Spark Streaming的結(jié)果(部分)重復(fù)輸出。Task重試,慢任務(wù)推測,Stage重試,Job重試。
2,具體解決方案:
設(shè)置spark.task.maxFailures次數(shù)為1,這樣就不會有Task重試了。設(shè)置spark.speculation為關(guān)閉狀態(tài),就不會有慢任務(wù)推測了,因?yàn)槁蝿?wù)推測非常消耗性能,所以關(guān)閉后可以顯著提高Spark Streaming處理性能。
Spark Streaming On Kafka的話,Job失敗后可以設(shè)置Kafka的參數(shù)auto.offset.reset為largest方式。
最后再次強(qiáng)調(diào)可以通過transform和foreachRDD基于業(yè)務(wù)邏輯代碼進(jìn)行邏輯控制來實(shí)現(xiàn)數(shù)據(jù)不重復(fù)消費(fèi)和輸出不重復(fù)。這兩個方法類似于Spark Streaming的后門,可以做任意想象的控制操作。
“Exactly once事務(wù)的處理方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
文章標(biāo)題:Exactlyonce事務(wù)的處理方法是什么
本文URL:http://jinyejixie.com/article8/ghphip.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供軟件開發(fā)、網(wǎng)站內(nèi)鏈、云服務(wù)器、虛擬主機(jī)、小程序開發(fā)、標(biāo)簽優(yōu)化
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)