成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

Flinkwatermark-創(chuàng)新互聯(lián)

Flink中watermark主要解決保序問題. 而保序問題的根本原因是多個(gè)任務(wù)同時(shí)從流中并行處理數(shù)據(jù),順序無法保證.

創(chuàng)新互聯(lián)公司自2013年起,先為恒山等服務(wù)建站,恒山等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為恒山企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。

上游: 生成watermark
一般在WINDOW 操作之前生成WATERMARK, WATERMARK 有兩種:
AssignWithPeriodicWatermarks:
每隔N秒自動(dòng)向流里注入一個(gè)WATERMARK 時(shí)間間隔由ExecutionConfig.setAutoWatermarkInterval 決定. 每次調(diào)用getCurrentWatermark 方法, 如果得到的WATERMARK 不為空并且比之前的大就注入流中 (emitWatermark)
參考 TimestampsAndPeriodicWatermarksOperator.processElement

AssignWithPunctuatedWatermarks:
基于事件向流里注入一個(gè)WATERMARK,每一個(gè)元素都有機(jī)會(huì)判斷是否生成一個(gè)WATERMARK. 如果得到的WATERMARK 不為空并且比之前的大就注入流中 (emitWatermark)
參考 TimestampsAndPunctuatedWatermarksOperator.processElement

每次生成WATERMARK將覆蓋流中已有的WATERMARK

下游: 處理watermark
StatusWatermarkValve 負(fù)責(zé)將不同Channel 的Watermark 對(duì)齊,再傳給pipeline 下游,對(duì)齊的概念是當(dāng)前Channel的Watermark時(shí)間大于所有Channel最小的Watermark時(shí)間
Flink watermark

WindowOperator 的處理:
WindowOperator.processElement

  1. WindowAssigner.assignWindows 為當(dāng)前的消息分配滑動(dòng)窗口
    常用的有: TumblingEventTimeWindows: 按照消息的 EventTime 分配窗口 (每次生成單個(gè)窗口)
    TumblingProcessingTimeWindows 按照當(dāng)前的時(shí)間分配窗口 (每次生成單個(gè)窗口)
    需要配合StreamExecutionEnvironment.setStreamTimeCharacteristic 使用 (默認(rèn)是TimeCharacteristic.ProcessingTime), 這個(gè)必須匹配
    否則無法正常觸發(fā)滑動(dòng)窗口

實(shí)際觀察結(jié)果:

  • 如果使用ProcessingTimeWindows 即使Event 本身的時(shí)間落后于窗口時(shí)間很多也會(huì)被觸發(fā)
  • 無論是否使用WATERMARK,窗口中的數(shù)據(jù)會(huì)有亂序,即后到窗口中的數(shù)據(jù)早于先到窗口中的數(shù)據(jù)
  • 如果使用EventTimeWindow, 數(shù)據(jù)和窗口時(shí)間對(duì)齊不會(huì)亂序,同一窗口中的數(shù)據(jù)不能嚴(yán)格保證順序,需要SORT.
  • 最后一批數(shù)據(jù)有缺失,缺失的數(shù)據(jù)取決于WATERMARK的MAXOUTOFORDERNESS
  • 默認(rèn)的WATERMARK算法是根據(jù)元素的大時(shí)間決定的,當(dāng)沒有新的元素進(jìn)入流中的時(shí)候,水位不再上漲,再減去MAXOUTOFORDERNESS, 則最后一批數(shù)據(jù)無法落在水位之下,導(dǎo)致WINDOW無法觸發(fā)
  1. 將當(dāng)前的滑動(dòng)窗口和對(duì)象加入WindowState, 根據(jù)不同的應(yīng)用場景會(huì)使用不同的WindowState. WindowState 的類型由WindowedStream的具體操作決定, 生成對(duì)應(yīng)的StateDescriptor, 不同的WindowState 的 add/get 行為會(huì)不同. 比如HeapListWindowState 會(huì)把當(dāng)前的對(duì)象追加到currentNamespace (即Timewindow) 對(duì)應(yīng)的List 下. 比如HeapAggregateState 會(huì)對(duì)當(dāng)前的對(duì)象應(yīng)用Aggregate function 并更新結(jié)果

Window 觸發(fā)的條件
在 WindowOperator 中有兩個(gè)點(diǎn)會(huì)檢查窗口是否觸發(fā),兩者的檢查條件有所不同

  1. processElement 這是在新的流數(shù)據(jù)進(jìn)入時(shí)觸發(fā)
    檢查條件: watermark時(shí)間 >= 窗口大時(shí)間 參見 EventTimeTrigger.onElement
    如果窗口不能被觸發(fā)則調(diào)用InteralTimeService.registerEventTimeTimer 注冊(cè)一個(gè)定時(shí)器,以KEY+窗口大時(shí)間為條件觸發(fā), 到一定時(shí)間后定時(shí)器會(huì)被自動(dòng)銷毀. 時(shí)間為窗口大時(shí)間+WindowOperator.allowedLateness WindowOperator.allowedLateness 可以通過 Stream.window(...).allowedLateness(...) 設(shè)置. 一般應(yīng)該略大于WatermarkGenerator 的 maxOutOfOrderness

  2. onEventTime 或者 onProcessingTime 取決于Watermark的類型, 這是在Watermark更新的時(shí)候觸發(fā) (InteralTimeService.advanceWatermark). 這時(shí)會(huì)把當(dāng)前Watermark 的時(shí)間和之前注冊(cè)的定時(shí)器的時(shí)間做比較, 如果定時(shí)器還存在并且Watermark的時(shí)間大于定時(shí)器時(shí)間則可以觸發(fā)窗口. 參見 EventTimeTrigger.onEventTime
    Flink watermark
    參考 http://blog.csdn.net/lmalds/article/details/52704170

WATERMARK和普通數(shù)據(jù)分開處理
如果一個(gè)元素來的過晚 element.getTimestamp + allowedLateness < currentWatermark
會(huì)有一個(gè)特殊的OutputTag 和正常的流數(shù)據(jù)區(qū)分開
參考 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html

如果窗口來的過晚, window.maxTimestamp + allowedLateness < currentWatermark, 則窗口會(huì)被直接丟棄

Watermark 的問題:
默認(rèn)的Watermark機(jī)制是數(shù)據(jù)驅(qū)動(dòng)的,新的數(shù)據(jù)進(jìn)入才會(huì)觸發(fā)水位上升, 而由于maxOutOfOrderness 的存在, watermark < 大流數(shù)據(jù)時(shí)間 < 當(dāng)前窗口結(jié)束時(shí)間
根據(jù)之前的分析,最新的時(shí)間窗口總是不會(huì)被觸發(fā),除非更新的數(shù)據(jù)進(jìn)入再次提高水位到當(dāng)前窗口結(jié)束時(shí)間以后, 如果數(shù)據(jù)進(jìn)入的頻率低或者沒有新的數(shù)據(jù)進(jìn)入流,那最新的時(shí)間窗口被處理的延時(shí)會(huì)非常高甚至永遠(yuǎn)不會(huì)被觸發(fā),這在實(shí)時(shí)性要求高的流式系統(tǒng)是很致命的. 比如一個(gè)銀行系統(tǒng),要做客戶賬號(hào)層面的保序,每個(gè)賬號(hào)的交易可能一天只有幾筆甚至一筆,如果我們?cè)赪indow 處理的時(shí)候KEY BY 賬號(hào)就會(huì)引起上述問題. 我們可以考慮KEY BY的條件改為 HASH(賬號(hào)) 再取模,然后在窗口處理中再次根據(jù)賬號(hào)分組,這樣雖然處理復(fù)雜一些,但是保證了窗口中數(shù)據(jù)的頻次

另外一種方案是優(yōu)化WATERMARK生成的機(jī)制,如果一段時(shí)間后WATERMARK仍然沒有變化,那就將WATERMARK自動(dòng)上漲一次到當(dāng)前窗口的結(jié)束時(shí)間,這樣保證窗口處理的延時(shí)有個(gè)上限

public abstract class AbstractWatermarkGenerator<T> implements AssignerWithPeriodicWatermarks<T> {
    private static final long serialVersionUID = -2006930231735705083L;
    private static final Logger logger = LoggerFactory.getLogger(AbstractWatermarkGenerator.class);
    private final long maxOutOfOrderness; // 10 seconds
    private long windowSize;
    private long currentMaxTimestamp;
    private long lastTimestamp = 0;
    private long lastWatermarkChangeTime = 0;
    private long windowPurgeTime = 0;

    public AbstractWatermarkGenerator(long maxOutOfOrderness, long windowSize) {
        this.maxOutOfOrderness = maxOutOfOrderness;
        this.windowSize = windowSize;
    }

    public AbstractWatermarkGenerator() {
        this(10000, 10000);
    }

    protected abstract long extractCurTimestamp(T element) throws Exception;

    public long extractTimestamp(T element,
            long previousElementTimestamp) {
        try {
            long curTimestamp = extractCurTimestamp(element);
            lastWatermarkChangeTime = new Date().getTime();
            currentMaxTimestamp = Math.max(curTimestamp, currentMaxTimestamp);
            windowPurgeTime = Math.max(windowPurgeTime, getWindowExpireTime(currentMaxTimestamp));
            if (logger.isDebugEnabled()) {
                logger.debug("Extracting timestamp: {}", currentMaxTimestamp);
            }
            return curTimestamp;
        } catch (Exception e) {
            logger.error("Error extracting timestamp", e);      
        }

        return 0;
    }

    protected long getWindowExpireTime(long currentMaxTimestamp) {
        long windowStart = TimeWindow.getWindowStartWithOffset(currentMaxTimestamp, 0, windowSize);
        long windowEnd = windowStart + windowSize;
        return windowEnd + maxOutOfOrderness;
    }

    public Watermark getCurrentWatermark() {
        long curTime = new Date().getTime();
        if (currentMaxTimestamp > lastTimestamp) {
            if (logger.isDebugEnabled()) {
                logger.debug("Current max timestamp has been increased since last");
            }
            lastTimestamp = currentMaxTimestamp;
            lastWatermarkChangeTime = curTime;
        }
        else {
            long diff = windowPurgeTime - currentMaxTimestamp;
            if (diff > 0 && curTime - lastWatermarkChangeTime > diff) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Increase current MaxTimestamp once");
                }
                currentMaxTimestamp = windowPurgeTime;
                lastTimestamp = currentMaxTimestamp;
                lastWatermarkChangeTime = curTime;
            }
        }

        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

實(shí)際測試中發(fā)現(xiàn) WATERMARK是否觸發(fā)和算子的并發(fā)度和WATERMARK生成的位置有關(guān)
測試結(jié)果如下:

  • Env default parallism 10: Source parallism 20, window parallism 6, watermark 生成定義在keyby 之前
    Source 為單獨(dú)的SUBTASK 并發(fā)度為20, 之后到WINDOW算子之前合成一個(gè)SUBTASK,并發(fā)度為10, WINDOW SUBTASK 并發(fā)度為6, 窗口可以正常觸發(fā)
  • Env default parallism 20, Source parallism 20, window parallism 6, watermark 生成定義在keyby 之前
    Source 到 WINDOW 算子之前 合成一個(gè)SUBTASK,并發(fā)度為20, WINDOW SUBTASK 并發(fā)度為6, 窗口可以正常觸發(fā)
  • Env default parallism 60, Source parallism 20, window parallism 10, watermark 生成定義在keyby 之前
    Source 為單獨(dú)的SUBTASK 并發(fā)度為20, 之后到WINDOW算子之前合成一個(gè)SUBTASK,并發(fā)度為60,WINDOW SUBTASK 并發(fā)度為10, 窗口不能正常觸發(fā) (個(gè)人理解原因是算子并發(fā)度擴(kuò)大,導(dǎo)致一些CHANNEL處理線程沒有數(shù)據(jù),根據(jù)上文的解釋,WATERMARK對(duì)齊會(huì)取所有CHANNEL最小的WATERMARK,導(dǎo)致水位無法上漲
    可以從FLINK CONSOLE的WATERMARKS看出)
  • Env default parallism 60, Source parallism 20, window parallism 10, watermark 生成定義在Source之后
    Source 為單獨(dú)的SUBTASK 并發(fā)度為20, 之后到WINDOW算子之前合成一個(gè)SUBTASK,并發(fā)度為60,WINDOW SUBTASK 并發(fā)度為10, 窗口可以正常觸發(fā)
  • Env default parallism 10, Source parallism 20, window parallism 20, watermark 生成定義在keyby 之前
    Source 為單獨(dú)的SUBTASK 并發(fā)度為20, 之后到WINDOW算子之前合成一個(gè)SUBTASK,并發(fā)度為10, WINDOW SUBTASK 并發(fā)度為20, 窗口可以正常觸發(fā)
  • Env default parallism 30, Source parallism 20, window parallism 20, watermark 生成定義在keyby 之前
    Source 為單獨(dú)的SUBTASK 并發(fā)度為20, 之后到WINDOW算子之前合成一個(gè)SUBTASK,并發(fā)度為30, WINDOW SUBTASK 并發(fā)度為20, 窗口不能正常觸發(fā)
  • Env default parallism 30, Source parallism 20, window parallism 20, watermark 生成定義在keyby 之前
    Source 為單獨(dú)的SUBTASK 并發(fā)度為20, 之后到WINDOW算子之前合成一個(gè)SUBTASK,并發(fā)度為30, WINDOW SUBTASK 并發(fā)度為20,窗口不能正常觸發(fā)
  • Env default parallism 30, Source parallism 20, window parallism 20, watermark 生成定義在Source 之后
    Source 為單獨(dú)的SUBTASK 并發(fā)度為20, 之后到WINDOW算子之前合成一個(gè)SUBTASK,并發(fā)度為30, WINDOW SUBTASK 并發(fā)度為20, 窗口可以正常觸發(fā)

所以注意WINDOW算子之前最好避免讓下游算子的并發(fā)度超過上游算子,否則就把WATERMARK的生成盡量放到DAG的前端,這樣WATERMARK可以被傳遞到下游算子

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。

本文標(biāo)題:Flinkwatermark-創(chuàng)新互聯(lián)
網(wǎng)站鏈接:http://jinyejixie.com/article38/hgppp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供關(guān)鍵詞優(yōu)化、虛擬主機(jī)、做網(wǎng)站網(wǎng)站營銷、靜態(tài)網(wǎng)站商城網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

小程序開發(fā)
通渭县| 丹棱县| 咸阳市| 临汾市| 巴彦县| 来凤县| 三门峡市| 金湖县| 双城市| 阳高县| 汝南县| 金沙县| 武定县| 乐至县| 高邮市| 佛学| 淮北市| 临江市| 吕梁市| 衡山县| 西盟| 石城县| 玛多县| 东至县| 山丹县| 蛟河市| 仁寿县| 仁寿县| 宣汉县| 盱眙县| 阿克陶县| 姜堰市| 宣化县| 饶阳县| 中西区| 冕宁县| 寻乌县| 霍林郭勒市| 朝阳县| 明水县| 清流县|