Spark Streaming的事務(wù)處理和關(guān)系型數(shù)據(jù)庫的事務(wù)的概念有所不同,關(guān)系型數(shù)據(jù)庫事務(wù)關(guān)注的是語句級別的一致性,例如銀行轉(zhuǎn)賬。而Spark Streaming的事務(wù)關(guān)注的是某次job執(zhí)行的一致性。也就是如何保證Job在處理數(shù)據(jù)的過程中做到如下兩點(diǎn):
創(chuàng)新互聯(lián)專注于企業(yè)營銷型網(wǎng)站建設(shè)、網(wǎng)站重做改版、壽光網(wǎng)站定制設(shè)計、自適應(yīng)品牌網(wǎng)站建設(shè)、H5響應(yīng)式網(wǎng)站、電子商務(wù)商城網(wǎng)站建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計等建站業(yè)務(wù),價格優(yōu)惠性價比高,為壽光等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。不丟失數(shù)據(jù)
不重復(fù)處理數(shù)據(jù)
SparkStreaming程序執(zhí)行架構(gòu)大致如下:
一、我們先來說說丟失數(shù)據(jù)的情況:
Receiver接收到數(shù)據(jù)后,首先會在Executor級別上保存數(shù)據(jù)(根據(jù)StorageLevel的設(shè)置),例如socketTextStream的Receiver。在內(nèi)存和磁盤上保留2份副本數(shù)據(jù)
def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) }
如果StorageLevel設(shè)置的是只進(jìn)行內(nèi)存級別的存儲,那么當(dāng)程序崩潰后,即便對Driver進(jìn)行了Checkpoint,然后重新啟動程序。該部分?jǐn)?shù)據(jù)也會丟失。因?yàn)镈river的Checkpoint并不對計算數(shù)據(jù)進(jìn)行保存。
我們假設(shè)StorageLevel設(shè)置了磁盤級別的存儲,也不能完全保證數(shù)據(jù)不被丟失,因?yàn)镽eceiver并不是接收一條數(shù)據(jù)寫一次磁盤,而是按照數(shù)據(jù)塊為單位寫數(shù)據(jù)。然后將數(shù)據(jù)塊的元數(shù)據(jù)信息發(fā)送給Driver,Driver的Checkpoint記錄的數(shù)Block的元數(shù)據(jù)信息。當(dāng)數(shù)據(jù)塊寫到一半的時候,或者是元數(shù)據(jù)還沒有發(fā)送給Driver的時候,Executor崩潰了,數(shù)據(jù)也就丟失啦。
解決方案:為了減少這種情況的發(fā)送,可以在Receiver端引入WAL寫機(jī)制,因?yàn)閃AL寫的頻率要比數(shù)據(jù)塊的頻率高的多。這樣,當(dāng)Executor恢復(fù)的時候,可以讀取WAL日志恢復(fù)數(shù)據(jù)塊。
但是通過WAL方式會極大的損傷Spark Streaming中Receivers接受數(shù)據(jù)的性能;
WAL也不能完全的解決數(shù)據(jù)丟失的問題,就像Oracle一樣,日志文件的寫,也是先寫到內(nèi)存中,然后根據(jù)一定的觸發(fā)條件再將數(shù)據(jù)寫到磁盤。如果還沒有來的及寫WAL日志,此時數(shù)據(jù)也會有不一致的情況(數(shù)據(jù)已經(jīng)接收,但是還沒有寫到WAL的這部分?jǐn)?shù)據(jù)是恢復(fù)不出來的。)。
Spark Streaming 1.3的時候?yàn)榱吮苊釽AL的性能損失和實(shí)現(xiàn)Exactly Once而提供了Kafka Direct API,把Kafka作為文件存儲系統(tǒng)?。?!此時兼具有流的優(yōu)勢和文件系統(tǒng)的優(yōu)勢,至此,Spark Streaming+Kafka就構(gòu)建了完美的流處理世界?。?!所有的Executors通過Kafka API直接消費(fèi)數(shù)據(jù),直接管理Offset,所以也不會重復(fù)消費(fèi)數(shù)據(jù);事務(wù)實(shí)現(xiàn)啦!?。?/p>
2. Driver崩潰,此時Job正在處理的數(shù)據(jù),包括Receiver已經(jīng)接收到還未被處理的數(shù)據(jù)將全部丟失。
解決方案:對Driver進(jìn)行Checkpoint,此處的Checkpoint和RDD的Checkpoint并不一樣。
我們看看Checkpoint都包含哪些屬性:
private[streaming] class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) extends Logging with Serializable { val master = ssc.sc.master val framework = ssc.sc.appName val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll
其中g(shù)raph是DStreamGraph的實(shí)例化,它里面包含了InputDStream
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
我們以DirectKafkaInputDStream為例,其中包含了checkpointData
protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData
其中只是包含:
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) { def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = { data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]] }
就是每個batch 的唯一標(biāo)識 time 對象,以及每個KafkaRDD對應(yīng)的的Kafka偏移信息。
所以:
checkpoint 是非常高效的。沒有涉及到實(shí)際數(shù)據(jù)的存儲。一般大小只有幾十K,因?yàn)橹淮媪薑afka的偏移量等信息。
checkpoint 采用的是序列化機(jī)制,尤其是DStreamGraph的引入,里面包含了可能如ForeachRDD等,而ForeachRDD里面的函數(shù)應(yīng)該也會被序列化。如果采用了CheckPoint機(jī)制,而你的程序包做了做了變更,恢復(fù)后可能會有一定的問題。
二、關(guān)于數(shù)據(jù)重復(fù)處理涉及兩個方面:
數(shù)據(jù)被重復(fù)讀?。涸谑褂肒afka的情況下,Receiver收到數(shù)據(jù)且保存到了HDFS等持久化引擎但是沒有來得及進(jìn)行updateOffsets,此時Receiver崩潰后重新啟動就會通過管理Kafka的ZooKeeper中元數(shù)據(jù)再次重復(fù)讀取數(shù)據(jù),但是此時SparkStreaming認(rèn)為是成功的,但是Kafka認(rèn)為是失敗的(因?yàn)闆]有更新offset到ZooKeeper中),此時就會導(dǎo)致數(shù)據(jù)重新消費(fèi)的情況。
數(shù)據(jù)輸出多次重寫
為什么會有這個問題,因?yàn)镾park Streaming在計算的時候基于Spark Core,Spark Core天生會做以下事情導(dǎo)致Spark Streaming的部分結(jié)果重復(fù)輸出(例如數(shù)據(jù)輸出后,該Task的后續(xù)程序發(fā)生錯誤,而任務(wù)發(fā)生錯誤,Spark Core會進(jìn)入如下程序):
Task重試;慢任務(wù)推測(兩個相同任務(wù)可能會同時執(zhí)行),Stage重復(fù);Job重試;
具體解決方案:
設(shè)置spark.task.maxFailures次數(shù)為1;
設(shè)置spark.speculation為關(guān)閉狀態(tài)(因?yàn)槁蝿?wù)推測其實(shí)非常消耗性能,所以關(guān)閉后可以顯著提高Spark Streaming處理性能)
Spark Streaming on Kafka的話,Job失敗后可以設(shè)置auto.offset.reset為“l(fā)argest”的方式;
Exactly Once的事務(wù)處理必須滿足:
Receiver數(shù)據(jù)零丟失:必須有可靠的數(shù)據(jù)來源和可靠的Receiver,且通過WAL來保證數(shù)據(jù)安全。
整個應(yīng)用程序的metadata必須進(jìn)行checkpoint;
最后再次強(qiáng)調(diào)可以通過transform和foreachRDD基于業(yè)務(wù)邏輯代碼進(jìn)行邏輯控制來實(shí)現(xiàn)數(shù)據(jù)不重復(fù)消費(fèi)和輸出不重復(fù)!這兩個方式類似于Spark Streaming的后門,可以做任意想象的控制操作!
備注:
1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark
2、IMF晚8點(diǎn)大數(shù)據(jù)實(shí)戰(zhàn)YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。
網(wǎng)站欄目:第4課:SparkStreaming的Exactly-One的事務(wù)處理-創(chuàng)新互聯(lián)
文章源于:http://jinyejixie.com/article14/ccsode.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供營銷型網(wǎng)站建設(shè)、網(wǎng)站維護(hù)、網(wǎng)站制作、微信小程序、用戶體驗(yàn)、服務(wù)器托管
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容