一個(gè)StateBackEnd 包括以下幾個(gè)部分:
1.CheckPointStreamFactory 構(gòu)造流用于寫出Checkpoint 數(shù)據(jù)
不同的StateBackEnd會(huì)有不同的實(shí)現(xiàn),返回不同的CheckpointStateOutputStream實(shí)現(xiàn),比如 FsStateBackEnd 就會(huì)構(gòu)造文件流, 而MemoryStateBackEnd就會(huì)構(gòu)造ByteArraOutputStream
CheckpointStateOutputStream 會(huì)作為IO代理包含在KeyedStateCheckpointOutputStream 和 OperatorStateCheckpointOutputStream內(nèi).
KeyedStateCheckpointOutputStream 和 OperatorStateCheckpointOutputStream 分別需要記錄額外的狀態(tài). KeyedStateCheckpointOutputStream 需要記錄每個(gè)keyGroup起始在流中的位置, OperatorStateCheckpointOutputStream 需要記錄每個(gè)partition起始在流中的位置, 這些信息都會(huì)體現(xiàn)在對(duì)應(yīng)的StreamStateHandle中.
CheckpointStateOutputStream 定義了 closeAndGetHandle 方法返回了一個(gè) StreamStateHandle 的實(shí)現(xiàn),這個(gè)句柄會(huì)被序列化傳遞給JobManager, JobManager 會(huì)將句柄作為快照的一部分集中保存,那么在恢復(fù)數(shù)據(jù)的時(shí)候就能夠通過(guò)句柄反向獲得InputStream讀取數(shù)據(jù)
具體參考 AbstractStreamOperator.snapshotState
InternalTimerServiceSerializationProxy.write -> HeapInternalTimerService.snapshotTimersForKeyGroup
KeyedStateBackEnd.snapshot OperatorStateBackEnd.snapshot
2.KeyedStateBackEnd
KeyedStateBackEnd 在創(chuàng)建StreamTask 的時(shí)候創(chuàng)建,所以一個(gè)Task 對(duì)應(yīng)一個(gè)KeyedStateBackEnd.
KeyedStateBackEnd 定義了如何注冊(cè)和生成各種State 包括: ListState, MapState, ValueState, AggregatingState, FoldingState, ReducingState
KeyedStateBackEnd 目前有兩種實(shí)現(xiàn): HeapKeyedStateBackend 和 RocksDBKeyedStateBackend. 其中HeapKeyedStateBackend 把狀態(tài)存儲(chǔ)在內(nèi)部的一個(gè)StateTable中,每個(gè)State name 對(duì)應(yīng)StateTable 中的一個(gè)Entry StateTable 包含三元信息:Key, Namespace, Value. Key和Value 很容易理解, Namespace 目前好像僅僅用于Window 算子,記錄了當(dāng)前的Window 信息, 如果沒(méi)有Window 會(huì)給一個(gè)默認(rèn)的namespace (VoidNamespace.INSTANCE). RocksDBKeyedStateBackend 會(huì)根據(jù)StateDescription 生成一個(gè)RocksDB column family, 然后在每種State get/set 的時(shí)候直接對(duì)Rocks DB 進(jìn)行讀寫操作 *
異步State Snapshot: HeapKeyedStateBackend 和 RocksDBKeyedStateBackend 都支持異步Snapshot, 所謂異步Snapshot 就是起一根獨(dú)立線程向 CheckpointStateOutputStream 寫State 數(shù)據(jù). 但是對(duì)數(shù)據(jù)結(jié)構(gòu)有要求,因?yàn)樵谧鰏napshot 的過(guò)程中 state table 本身可能會(huì)繼續(xù)變化. 所以需要在snapshot 開始的時(shí)候?qū)?shù)據(jù)做一個(gè)快照. HeapKeyedStateBackend內(nèi)部用了CopyOnWriteStateTable保證線程安全性,使數(shù)據(jù)快照的數(shù)據(jù)不會(huì)corrupt. RocksDBKeyedStateBackend 思路是類似的. snapshot 開始的時(shí)候調(diào)用RocksDB.snapshot, 然后再通過(guò)線程異步向 CheckpointStateOutputStream 寫State 數(shù)據(jù).
增量 State Snapshot: RocksDBKeyedStateBackend 特有的特性. 具體的實(shí)現(xiàn)參考RocksDBIncrementalSnapshotOperation. 這里簡(jiǎn)單比較一下RocksDBFullSnapshotOperation和RocksDBIncrementalSnapshotOperation. RocksDBFullSnapshotOperation 會(huì)完整地讀取Snapshot中所有的KV數(shù)據(jù),然后向流中寫出所有的kvMetadata和kvData. 返回的StateHandle是KeyGroupsStateHandle, 和HeapKedStateBackend一致. 而RocksDBIncrementalSnapshotOperation則會(huì)遍歷RocksDB checkpoint目錄下的所有文件. 每次做Checkpoint的時(shí)候,RocksDBKeyedStateBackend會(huì)記錄當(dāng)前checkPointId對(duì)應(yīng)的RocksDB ssd文件.這樣在做一次新的Checkpoint的時(shí)候就可以比對(duì)文件獲取是否有新的數(shù)據(jù)文件.原有的數(shù)據(jù)文件不用再寫而是直接返回一個(gè)PlaceholderStreamStateHandle. Checkpoint不是逐條遍歷KV寫出,而是直接向流中寫出RocksDB數(shù)據(jù)文件的數(shù)據(jù). 返回的StateHandle是IncrementalKeyedStateHandle其中包含了一組RocksDB數(shù)據(jù)文件的句柄.
數(shù)據(jù)恢復(fù)的過(guò)程也同樣需要區(qū)分full/incremental. 分別對(duì)應(yīng)RocksDBFullRestoreOperation和RocksDBIncrementalRestoreOperation
3.OperatorStateBackEnd
主要管理OperatorState. 目前只有一種實(shí)現(xiàn): DefaultOperatorStateBackend. 構(gòu)造出一個(gè) PartitionableListState (屬于ListState). 這是一個(gè)In Memory的實(shí)現(xiàn). Add 操作追加到
內(nèi)存的一個(gè)List中. Snapshot 的過(guò)程和KeyedStateBackEnd大同小異,這里就不再贅述.
StateBackend 的類結(jié)構(gòu):
State 恢復(fù)的過(guò)程:
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。
分享標(biāo)題:FlinkStateBackend初探-創(chuàng)新互聯(lián)
新聞來(lái)源:http://jinyejixie.com/article22/dedhcc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈、靜態(tài)網(wǎng)站、定制開發(fā)、網(wǎng)站導(dǎo)航、移動(dòng)網(wǎng)站建設(shè)、虛擬主機(jī)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容