這篇文章主要講解了“如何快速掌握Fink SQL”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“如何快速掌握Fink SQL”吧!
創(chuàng)新互聯(lián)作為成都網(wǎng)站建設(shè)公司,專注網(wǎng)站建設(shè)公司、網(wǎng)站設(shè)計(jì),有關(guān)成都企業(yè)網(wǎng)站建設(shè)方案、改版、費(fèi)用等問題,行業(yè)涉及自上料攪拌車等多個(gè)領(lǐng)域,已為上千家企業(yè)服務(wù),得到了客戶的尊重與認(rèn)可。
1、導(dǎo)入所需要的的依賴包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.10.1</version> </dependency>
flink-table-planner:planner 計(jì)劃器,是 table API 最主要的部分,提供了運(yùn)行時(shí)環(huán)境和生成程序執(zhí)行計(jì)劃的 planner; flink-table-api-scala-bridge:bridge 橋接器,主要負(fù)責(zé) table API 和 DataStream/DataSet API的連接支持,按照語言分 java 和 scala。
這里的兩個(gè)依賴,是 IDE 環(huán)境下運(yùn)行需要添加的;如果是生產(chǎn)環(huán)境,lib 目錄下默認(rèn)已經(jīng)有了 planner,就只需要有 bridge 就可以了。
當(dāng)然,如果想使用用戶自定義函數(shù),或是跟 kafka 做連接,需要有一個(gè) SQL client,這個(gè)包含在 flink-table-common 里。
2、兩種 planner(old& blink)的區(qū)別
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
批流統(tǒng)一:Blink 將批處理作業(yè),視為流式處理的特殊情況。所以,blink 不支持表和DataSet 之間的轉(zhuǎn)換,批處理作業(yè)將不轉(zhuǎn)換為 DataSet 應(yīng)用程序,而是跟流處理一樣,轉(zhuǎn)換為 DataStream 程序來處理。
因 為 批 流 統(tǒng) 一 , Blink planner 也 不 支 持 BatchTableSource , 而 使 用 有 界 的
Blink planner 只支持全新的目錄,不支持已棄用的 ExternalCatalog。
舊 planner 和 Blink planner 的 FilterableTableSource 實(shí)現(xiàn)不兼容。舊的 planner 會把PlannerExpressions 下推到 filterableTableSource 中,而 blink planner 則會把 Expressions 下推。
基于字符串的鍵值配置選項(xiàng)僅適用于 Blink planner。
PlannerConfig 在兩個(gè) planner 中的實(shí)現(xiàn)不同。
Blink planner 會將多個(gè) sink 優(yōu)化在一個(gè) DAG 中(僅在 TableEnvironment 上受支持,而在 StreamTableEnvironment 上不受支持)。而舊 planner 的優(yōu)化總是將每一個(gè) sink 放在一個(gè)新的 DAG 中,其中所有 DAG 彼此獨(dú)立。
舊的 planner 不支持目錄統(tǒng)計(jì),而 Blink planner 支持。
3、表(Table)的概念
TableEnvironment 可以注冊目錄 Catalog,并可以基于 Catalog 注冊表。它會維護(hù)一個(gè)Catalog-Table 表之間的 map。 表(Table)是由一個(gè)標(biāo)識符來指定的,由 3 部分組成:Catalog 名、數(shù)據(jù)庫(database)名和對象名(表名)。如果沒有指定目錄或數(shù)據(jù)庫,就使用當(dāng)前的默認(rèn)值。
4、連接到文件系統(tǒng)(Csv 格式)
連接外部系統(tǒng)在 Catalog 中注冊表,直接調(diào)用 tableEnv.connect()就可以,里面參數(shù)要傳入一個(gè) ConnectorDescriptor,也就是 connector 描述器。對于文件系統(tǒng)的 connector 而言,flink內(nèi)部已經(jīng)提供了,就叫做 FileSystem()。
5、測試案例 (新)
需求: 將一個(gè)txt文本文件作為輸入流讀取數(shù)據(jù)過濾id不等于sensor_1的數(shù)據(jù)實(shí)現(xiàn)思路: 首先我們先構(gòu)建一個(gè)table的env環(huán)境通過connect提供的方法來讀取數(shù)據(jù)然后設(shè)置表結(jié)構(gòu)將數(shù)據(jù)注冊為一張表就可進(jìn)行我們的數(shù)據(jù)過濾了(使用sql或者流處理方式進(jìn)行解析)
準(zhǔn)備數(shù)據(jù)
sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9
代碼實(shí)現(xiàn)
import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{DataTypes} import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} /** * @Package * @author 大數(shù)據(jù)老哥 * @date 2020/12/12 21:22 * @version V1.0 * 第一個(gè)Flinksql測試案例 */ object FlinkSqlTable { def main(args: Array[String]): Unit = { // 構(gòu)建運(yùn)行流處理的運(yùn)行環(huán)境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 構(gòu)建table環(huán)境 val tableEnv = StreamTableEnvironment.create(env) //通過 connect 讀取數(shù)據(jù) tableEnv.connect(new FileSystem().path("D:\\d12\\Flink\\FlinkSql\\src\\main\\resources\\sensor.txt")) .withFormat(new Csv()) //設(shè)置類型 .withSchema(new Schema() // 給數(shù)據(jù)添加元數(shù)信息 .field("id", DataTypes.STRING()) .field("time", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ).createTemporaryTable("inputTable") // 創(chuàng)建一個(gè)臨時(shí)表 val resTable = tableEnv.from("inputTable") .select("*").filter('id === "sensor_1") // 使用sql的方式查詢數(shù)據(jù) var resSql = tableEnv.sqlQuery("select * from inputTable where id='sensor_1'") // 將數(shù)據(jù)轉(zhuǎn)為流進(jìn)行輸出 resTable.toAppendStream[(String, Long, Double)].print("resTable") resSql.toAppendStream[(String, Long, Double)].print("resSql") env.execute("FlinkSqlWrodCount") } }
6、TableEnvironment 的作用
注冊 catalog
在內(nèi)部 catalog 中注冊表
執(zhí)行 SQL 查詢
注冊用戶自定義函數(shù)
注冊用戶自定義函數(shù)
保存對 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
在創(chuàng)建 TableEnv 的時(shí)候,可以多傳入一個(gè) EnvironmentSettings 或者 TableConfig 參數(shù),可以用來配置 TableEnvironment 的一些特性。
7、 老版本創(chuàng)建流處理批處理
7.1老版本流處理
val settings = EnvironmentSettings.newInstance() .useOldPlanner() // 使用老版本 planner .inStreamingMode() // 流處理模式 .build() val tableEnv = StreamTableEnvironment.create(env, settings)
7.2 老版本批處理
val batchEnv = ExecutionEnvironment.getExecutionEnvironment val batchTableEnv = BatchTableEnvironment.create(batchEnv)
7.3 blink 版本的流處理環(huán)境
val bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
7.4 blink 版本的批處理環(huán)境
val bbSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode().build() val bbTableEnv = TableEnvironment.create(bbSettings)
感謝各位的閱讀,以上就是“如何快速掌握Fink SQL”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對如何快速掌握Fink SQL這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!
網(wǎng)站標(biāo)題:如何快速掌握FinkSQL
轉(zhuǎn)載源于:http://jinyejixie.com/article32/iicjpc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站排名、動態(tài)網(wǎng)站、外貿(mào)網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)公司、網(wǎng)站內(nèi)鏈、ChatGPT
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)