成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

Flink中Watermarks怎么用

這篇文章將為大家詳細(xì)講解有關(guān)Flink中Watermarks怎么用,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

成都創(chuàng)新互聯(lián)是專(zhuān)業(yè)的訥河網(wǎng)站建設(shè)公司,訥河接單;提供網(wǎng)站制作、做網(wǎng)站,網(wǎng)頁(yè)設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專(zhuān)業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行訥河網(wǎng)站開(kāi)發(fā)網(wǎng)頁(yè)制作和功能擴(kuò)展;專(zhuān)業(yè)做搜索引擎喜愛(ài)的網(wǎng)站,專(zhuān)業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來(lái)合作!

Watermarks水?。簽檩斎氲臄?shù)據(jù)流的設(shè)置一個(gè)時(shí)間事件(時(shí)間戳),對(duì)窗口內(nèi)的數(shù)據(jù)輸入流無(wú)序與延遲提供解決方案

示例環(huán)境

java.version: 1.8.xflink.version: 1.11.1

TimestampsAndWatermarks.java

import com.flink.examples.DataSource;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

/**
 * @Description Watermarks水?。簽檩斎氲臄?shù)據(jù)流的設(shè)置一個(gè)時(shí)間事件(時(shí)間戳),對(duì)窗口內(nèi)的數(shù)據(jù)輸入流無(wú)序與延遲提供解決方案
 */
public class TimestampsAndWatermarks {

    /**
     * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html
     */

    /**
     * 遍歷集合,分別打印不同性別的信息,對(duì)于執(zhí)行超時(shí),自動(dòng)觸發(fā)定時(shí)器
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        /*
        TimeCharacteristic有三種時(shí)間類(lèi)型:
            ProcessingTime:以operator處理的時(shí)間為準(zhǔn),它使用的是機(jī)器的系統(tǒng)時(shí)間來(lái)作為data stream的時(shí)間;
            IngestionTime:以數(shù)據(jù)進(jìn)入flink streaming data flow的時(shí)間為準(zhǔn);
            EventTime:以數(shù)據(jù)自帶的時(shí)間戳字段為準(zhǔn),應(yīng)用程序需要指定如何從record中抽取時(shí)間戳字段;需要實(shí)現(xiàn)assignTimestampsAndWatermarks方法,并設(shè)置時(shí)間水位線;
         */
        //使用event time,需要指定事件的時(shí)間戳
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        //設(shè)置自動(dòng)生成水印的時(shí)間周期,避免數(shù)據(jù)流量大的情況下,頻繁添加水印導(dǎo)致計(jì)算性能降低。
        env.getConfig().setAutoWatermarkInterval(1000L);
        List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();

        DataStream<Tuple3<String, String, Integer>> inStream = env.addSource(new MyRichSourceFunction());
        DataStream<Tuple2<String, Integer>> dataStream = inStream
                //為一個(gè)水位線,這個(gè)Watermarks在不斷的變化,一旦Watermarks大于了某個(gè)window的end_time,就會(huì)觸發(fā)此window的計(jì)算,Watermarks就是用來(lái)觸發(fā)window計(jì)算的。
                //Duration.ofSeconds(2),到數(shù)據(jù)流到達(dá)flink后,再水位線中設(shè)置延遲時(shí)間,也就是在所有數(shù)據(jù)流的最大的事件時(shí)間比window窗口結(jié)束時(shí)間大或相等時(shí),再延遲多久觸發(fā)window窗口結(jié)束;
//                .assignTimestampsAndWatermarks(
//                        WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
//                                .withTimestampAssigner((element, timestamp) -> {
//                                    long times = System.currentTimeMillis() ;
//                                    System.out.println(element.f1 + ","+ element.f0 + "的水位線為:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
//                                    return times;
//                                })
//                )
                .assignTimestampsAndWatermarks(new MyWatermarkStrategy()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Integer>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, String, Integer> element, long timestamp) {
                                long times = System.currentTimeMillis();
                                System.out.println(element.f1 + "," + element.f0 + "的水位線為:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
                                return times;
                            }
                        }))
                //分區(qū)窗口
                .keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k -> k.f1)
                //觸發(fā)3s滾動(dòng)窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                //執(zhí)行窗口數(shù)據(jù),對(duì)keyBy數(shù)據(jù)流批量處理
                .apply(new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>(){
                    @Override
                    public void apply(String s, TimeWindow window, Iterable<Tuple3<String, String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
                        long times = System.currentTimeMillis() ;
                        System.out.println();
                        System.out.println("窗口處理時(shí)間:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
                        Iterator<Tuple3<String, String, Integer>> iterator = input.iterator();
                        int total = 0;
                        int size = 0;
                        String sex = "";
                        while (iterator.hasNext()){
                            Tuple3<String, String, Integer> tuple3 = iterator.next();
                            total += tuple3.f2;
                            size ++;
                            sex = tuple3.f1;
                        }
                        out.collect(new Tuple2<>(sex, total / size));
                    }
                });

        dataStream.print();
        env.execute("flink Filter job");
    }

    /**
     * 定期水印生成器
     */
    public static class MyWatermarkStrategy implements WatermarkStrategy<Tuple3<String, String, Integer>>{
        @Override
        public WatermarkGenerator<Tuple3<String, String, Integer>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Tuple3<String, String, Integer>>() {
                //設(shè)置固定的延遲量3.5 seconds
                private final long maxOutOfOrderness = 3500;
                private long currentMaxTimestamp;

                /**
                 * 事件處理
                 * @param event             數(shù)據(jù)流對(duì)象
                 * @param eventTimestamp    事件水位線時(shí)間
                 * @param output            輸出
                 */
                @Override
                public void onEvent(Tuple3<String, String, Integer> event, long eventTimestamp, WatermarkOutput output) {
                    currentMaxTimestamp = Math.max(System.currentTimeMillis(), eventTimestamp);
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    // 拿上一個(gè)水印時(shí)間 - 延遲量 = 等于給的窗口最終數(shù)據(jù)最后時(shí)間(如果在窗口到期內(nèi),未發(fā)生新的水印事件,則按window正常結(jié)束時(shí)間計(jì)算,當(dāng)在最后水印時(shí)間-延遲量的時(shí)間范圍內(nèi),有新的數(shù)據(jù)流進(jìn)入,則會(huì)重新觸發(fā)窗口內(nèi)對(duì)全部數(shù)據(jù)流計(jì)算)
                    output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
                }
            };
        }
    }

    /**
     * 模擬數(shù)據(jù)持續(xù)輸出
     */
    public static class MyRichSourceFunction extends RichSourceFunction<Tuple3<String, String, Integer>> {
        @Override
        public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
            List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
            int j = 0;
            for (int i=0;i<100;i++){
                if (i%6 == 0){
                    j=0;
                }
                ctx.collect(tuple3List.get(j));
                //1秒鐘輸出一個(gè)
                Thread.sleep(1 * 1000);
                j ++;
            }
        }
        @Override
        public void cancel() {
            try{
                super.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

}

打印結(jié)果

man,張三的水位線為:2020-12-27 10:28:20
girl,李四的水位線為:2020-12-27 10:28:21
man,王五的水位線為:2020-12-27 10:28:22
girl,劉六的水位線為:2020-12-27 10:28:23
girl,伍七的水位線為:2020-12-27 10:28:24

窗口處理時(shí)間:2020-12-27 10:28:25
(man,20)
man,吳八的水位線為:2020-12-27 10:28:25
man,張三的水位線為:2020-12-27 10:28:26
girl,李四的水位線為:2020-12-27 10:28:27

窗口處理時(shí)間:2020-12-27 10:28:28
(girl,28)

窗口處理時(shí)間:2020-12-27 10:28:28
(man,29)

關(guān)于“Flink中Watermarks怎么用”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

當(dāng)前標(biāo)題:Flink中Watermarks怎么用
URL標(biāo)題:http://jinyejixie.com/article14/ggside.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供動(dòng)態(tài)網(wǎng)站、電子商務(wù)、網(wǎng)站建設(shè)、搜索引擎優(yōu)化營(yíng)銷(xiāo)型網(wǎng)站建設(shè)、Google

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁(yè)設(shè)計(jì)公司
纳雍县| 竹山县| 肃南| 华蓥市| 上林县| 合水县| 满洲里市| 南丹县| 仁怀市| 图木舒克市| 南溪县| 龙山县| 钦州市| 桃源县| 清丰县| 榆社县| 和硕县| 剑河县| 赫章县| 凤城市| 合水县| 辽阳市| 水富县| 涿州市| 东阿县| 嘉峪关市| 海南省| 佛学| 北宁市| 慈溪市| 纳雍县| 马关县| 华亭县| 兴和县| 周宁县| 台江县| 平果县| 宁阳县| 尤溪县| 临西县| 宁德市|