這篇文章將為大家詳細講解有關Flink 1.10中SQL、HiveCatalog與事件時間整合的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
渦陽網(wǎng)站建設公司創(chuàng)新互聯(lián),渦陽網(wǎng)站設計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為渦陽超過千家提供企業(yè)網(wǎng)站建設服務。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站建設要多少錢,請找那個售后服務好的渦陽做網(wǎng)站的公司定做!
添加依賴項
Maven 下載:
https://maven.aliyun.com/mvn/search
<properties>
<scala.bin.version>2.11</scala.bin.version>
<flink.version>1.10.0</flink.version>
<hive.version>1.1.0</hive.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka-0.11_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
</dependencies>
最后,找到 Hive 的配置文件 hive-site.xml,準備工作就完成了。
注冊 HiveCatalog、創(chuàng)建數(shù)據(jù)庫
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(5)
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
val catalog = new HiveCatalog(
"rtdw", // catalog name
"default", // default database
"/Users/lmagic/develop", // Hive config (hive-site.xml) directory
"1.1.0" // Hive version
)
tableEnv.registerCatalog("rtdw", catalog)
tableEnv.useCatalog("rtdw")
val createDbSql = "CREATE DATABASE IF NOT EXISTS rtdw.ods"
tableEnv.sqlUpdate(createDbSql)
創(chuàng)建 Kafka 流表并指定事件時間
"eventType": "clickBuyNow", "userId": "97470180", "shareUserId": "", "platform": "xyz", "columnType": "merchDetail", "merchandiseId": "12727495", "fromType": "wxapp", "siteId": "20392", "categoryId": "", "ts": 1585136092541
CREATE TABLE rtdw.ods.streaming_user_active_log ( eventType STRING COMMENT '...', userId STRING, shareUserId STRING, platform STRING, columnType STRING, merchandiseId STRING, fromType STRING, siteId STRING, categoryId STRING, ts BIGINT, procTime AS PROCTIME(), -- 處理時間 eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件時間 WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'ng_log_par_extracted', 'connector.startup-mode' = 'latest-offset', -- 指定起始offset位置 'connector.properties.zookeeper.connect' = 'zk109:2181,zk110:2181,zk111:2181', 'connector.properties.bootstrap.servers' = 'kafka112:9092,kafka113:9092,kafka114:9092', 'connector.properties.group.id' = 'rtdw_group_test_1', 'format.type' = 'json', 'format.derive-schema' = 'true', -- 由表schema自動推導解析JSON 'update-mode' = 'append')
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'n' TIME_UNIT
https://www.jianshu.com/p/c612e95a5028
val createTableSql = """ |上文的SQL語句 |...... """.stripMargin tableEnv.sqlUpdate(createTableSql)
開窗計算 PV、UV
SELECT eventType,TUMBLE_START(eventTime, INTERVAL '30' SECOND) AS windowStart,TUMBLE_END(eventTime, INTERVAL '30' SECOND) AS windowEnd,COUNT(userId) AS pv,COUNT(DISTINCT userId) AS uvFROM rtdw.ods.streaming_user_active_logWHERE platform = 'xyz'GROUP BY eventType, TUMBLE(eventTime, INTERVAL '30' SECOND)
SQL 文檔:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows
val queryActiveSql =
"""
|......
|......
""".stripMargin
val result = tableEnv.sqlQuery(queryActiveSql)
result
.toAppendStream[Row]
.print()
.setParallelism(1)
關于“Flink 1.10中SQL、HiveCatalog與事件時間整合的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
網(wǎng)頁題目:Flink1.10中SQL、HiveCatalog與事件時間整合的示例分析
分享URL:http://jinyejixie.com/article6/igosog.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站建設、建站公司、云服務器、手機網(wǎng)站建設、營銷型網(wǎng)站建設、品牌網(wǎng)站設計
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)