Kafka數(shù)據(jù)如何同步至MaxCompute,針對(duì)這個(gè)問題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡(jiǎn)單易行的方法。
創(chuàng)新互聯(lián)主營津市網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,app軟件開發(fā),津市h5小程序定制開發(fā)搭建,津市網(wǎng)站營銷推廣歡迎津市等地區(qū)企業(yè)咨詢
一、背景介紹
1. 實(shí)驗(yàn)?zāi)康?/strong>
在日常工作中,很多企業(yè)將APP或網(wǎng)站產(chǎn)生的行為日志和業(yè)務(wù)數(shù)據(jù)通過Kafka收集之后做兩方面的處理。一方面是離線處理,一方面是實(shí)時(shí)處理。并且一般會(huì)投遞到MaxCompute中作為模型的構(gòu)建,進(jìn)行相關(guān)的業(yè)務(wù)處理,如用戶的特征、銷售排名、訂單地區(qū)分布等。這些數(shù)據(jù)形成之后會(huì)在數(shù)據(jù)報(bào)表中作為展示。
2. 方案說明
Kafka數(shù)據(jù)同步到DataWorks有兩條鏈路。一條鏈路是業(yè)務(wù)數(shù)據(jù)和行為日志通過Kafka,再通過Flume 上傳到Datahub,以及Max Compute,最終在QuickBI進(jìn)行展示。另一條鏈路是業(yè)務(wù)數(shù)據(jù)和行為日志通過Kafka以及DataWorks,MaxCompute,最終在QuickBI當(dāng)中展示。
本次展示Kafka通過DataWorks上傳到MaxCompute的流程。從DataWorks上傳到MaxCompute是通過兩種方案進(jìn)行上傳數(shù)據(jù)同步的。方案一是自定義資源組,方案二是獨(dú)享資源組。自定義資源組一般適用于復(fù)雜網(wǎng)絡(luò)的數(shù)據(jù)上云場(chǎng)景。獨(dú)享資源組操作方式主要針對(duì)集成資源不足的情況。
cdn.com/4b5c9287751761510b5df37aabb73b67974bb430.png">
二、具體操作流程
1.Kafka消息隊(duì)列使用及其原理
Kafka產(chǎn)品概述:消息隊(duì)列 for Apache Kafka 是阿里云提供的分布式、高吞吐、可擴(kuò)展的消息隊(duì)列服務(wù)。消息隊(duì)列for Apache Kafka一般用于日志收集、監(jiān)控?cái)?shù)據(jù)聚合、流式數(shù)據(jù)處理、在線離線分析等大數(shù)據(jù)領(lǐng)域。消息隊(duì)列 for Apache Kafka 針對(duì)開源的 Apache Kafka 提供全托管服務(wù),徹底解決開源產(chǎn)品長(zhǎng)期以來的痛點(diǎn)。云上Kafka具有低成本、更彈性、更可靠的優(yōu)勢(shì),用戶只需專注于業(yè)務(wù)開發(fā),無需部署運(yùn)維。
Kafka架構(gòu)介紹:一個(gè)典型的Kafka集群主要分為四部分。Producer生產(chǎn)數(shù)據(jù)并通過 push 模式向消息隊(duì)列 for Apache Kafka 的 Kafka Broker 發(fā)送消息。發(fā)送的消息可以是網(wǎng)站的頁面訪問、服務(wù)器日志,也可以是 CPU 和內(nèi)存相關(guān)的系統(tǒng)資源信息。Kafka Broker用于存儲(chǔ)消息的服務(wù)器。Kafka Broker 支持水平擴(kuò)展。 Kafka Broker 節(jié)點(diǎn)的數(shù)量越多,Kafka 集群的吞吐率越高。Kafka Broker針對(duì)topic會(huì)partition一個(gè)概念,partition有l(wèi)eader、follower的角色分配。Consumer通過 pull 模式從消息隊(duì)列 for Apache Kafka Broker 訂閱并消費(fèi)leader的信息數(shù)據(jù)。其中partition內(nèi)部有offset作為消息的消費(fèi)點(diǎn)位。通過ZooKeeper管理集群的配置、選舉 leader 分區(qū),并且在Consumer Group 發(fā)生變化時(shí),管理partition_leader的負(fù)載均衡。
Kafka消息隊(duì)列購買以及部署:用戶首先可以到Kafka消息隊(duì)列產(chǎn)品頁面點(diǎn)擊購買,根據(jù)個(gè)人情況選擇對(duì)應(yīng)包年、包月等消費(fèi)方式、地區(qū)、實(shí)例類型、磁盤、流量以及消息存放時(shí)間。其中較為重要的一點(diǎn)是要選擇對(duì)應(yīng)地區(qū),如果用戶的MaxCompute在華北,那么盡量選擇華北地區(qū)。選擇開通完成后需要進(jìn)行部署。點(diǎn)擊部署,選擇合適的VPC及其交換機(jī)進(jìn)行部署。
部署完成后進(jìn)入Kafka Topic管理頁面,點(diǎn)擊創(chuàng)建Topic輸入自己的Topic。Topic命名下面有三條注意信息,命名盡量跟自己的業(yè)務(wù)一致,比如是財(cái)經(jīng)業(yè)務(wù)或者是商務(wù)業(yè)務(wù),盡量進(jìn)行區(qū)分。第四步進(jìn)入Consumer Group管理,點(diǎn)擊創(chuàng)建Consumer Group創(chuàng)建自己所需要的Consumer Group。Consumer Group的命名也需要規(guī)范,如果是財(cái)經(jīng)或商務(wù)業(yè)務(wù),盡量和自己的Topic相對(duì)應(yīng)。
Kafka白名單配置:Kafka安裝部署完成之后確認(rèn)需要訪問Kafka的服務(wù)器或產(chǎn)品的白名單。下圖中的默認(rèn)接入點(diǎn)即為訪問接口。
2.資源組介紹及其配置
自定義資源組的使用背景:自定義資源組一般針對(duì)IDC之間的網(wǎng)絡(luò)問題。本地網(wǎng)絡(luò)和云上網(wǎng)絡(luò)存在差異,如DataWorks可以通過免費(fèi)傳輸能力(默認(rèn)任務(wù)資源組)進(jìn)行海量數(shù)據(jù)上云,但默認(rèn)資源組無法實(shí)現(xiàn)傳輸速度存在較高要求或復(fù)雜環(huán)境中的數(shù)據(jù)源同步上云的需求。此時(shí)用戶可以使用自定義資源組可實(shí)現(xiàn)復(fù)雜環(huán)境同步上云的需求,解決DataWorks默 認(rèn)資源組與您的數(shù)據(jù)源不通的問題,或?qū)崿F(xiàn)更高速度的傳輸能力。然而,自定義資源組主要解決的還是復(fù)雜網(wǎng)絡(luò)環(huán)境上云同步問題,打通任意網(wǎng)絡(luò)環(huán)境之間的數(shù)據(jù)傳輸同步。
自定義資源組的配置:自定義資源組的配置需要六步操作,首先點(diǎn)擊進(jìn)入DataWorks控制臺(tái),點(diǎn)開工作空間的列表,選擇用戶需要的項(xiàng)目空間,點(diǎn)擊進(jìn)入數(shù)據(jù)集成,即確認(rèn)自己的數(shù)據(jù)集成是要在哪個(gè)空間項(xiàng)目下進(jìn)行添加。之后,點(diǎn)擊進(jìn)入數(shù)據(jù)源界面,點(diǎn)擊新增自定義資源組。要注意頁面右上角的新增自定義資源組是只有項(xiàng)目管理員有權(quán)限添加。
第三步是確認(rèn)Kafka與需要添加的自定義資源組屬于同一個(gè)VPC下。本次實(shí)驗(yàn)是ECS向Kafka發(fā)送消息,二者的VPC應(yīng)該一致。第四步登錄ECS,即個(gè)人的自定義資源組。執(zhí)行命令dmidecode|grep UUID得到ECS的UUID。
第五步是將添加服務(wù)器UUID以及自定義資源組的IP或機(jī)器CPU和內(nèi)存填寫進(jìn)來。最后是在ECS上執(zhí)行相關(guān)命令,Agent安裝共5步,做一一確認(rèn),在第4小步完成后點(diǎn)擊刷新查看服務(wù)是否為可用狀態(tài)。添加完成后進(jìn)行檢查連通測(cè)試,檢查是否添加成功。
獨(dú)享資源組的使用背景:一些客戶反映在Kafka同步到MaxCompute時(shí)會(huì)報(bào)資源不足的問題,可以通過新增獨(dú)享資源組的方式進(jìn)行數(shù)據(jù)同步。獨(dú)享資源模式下,機(jī)器的物理資源(網(wǎng)絡(luò)、磁盤、CPU和內(nèi)存等)完全獨(dú)享。不僅可以隔離用戶間的資源使用,也可以隔離不同工作空間任務(wù)的資源使用。此外,獨(dú)享資源也支持靈活的擴(kuò)容、縮容功能,可以滿足資源獨(dú)
享、靈活配置等需求。獨(dú)享資源組可以訪問在同一地域下的VPC數(shù)據(jù)源,同時(shí)也可以訪問跨地域的公網(wǎng)RDS地址。
獨(dú)享資源組的配置:獨(dú)享資源組的配置主要需要兩步操作,首先進(jìn)入DataWorks控制臺(tái)的資源列表,點(diǎn)擊新增獨(dú)享資源組,包括獨(dú)享集成資源組和獨(dú)享調(diào)度資源組。此處選擇新增獨(dú)享集成資源組,點(diǎn)擊購買時(shí)仍要注意選擇對(duì)應(yīng)的購買方式、區(qū)域、資源、內(nèi)存、時(shí)間期限、數(shù)量等。
購買完成后需要把獨(dú)享集成資源組綁定到與Kafka對(duì)應(yīng)的VPC,點(diǎn)擊專有網(wǎng)絡(luò)綁定,選擇與Kafka對(duì)應(yīng)的交換機(jī)(最明顯的是可用區(qū)的區(qū)別)、安全組。
3.同步過程及其注意事項(xiàng)
Kafka同步到MaxCompute的需要進(jìn)行相關(guān)參數(shù)配置同時(shí)需要注意以下幾個(gè)事項(xiàng)。
DataWorks數(shù)據(jù)集成操作:進(jìn)入DataWorks操作界面,點(diǎn)擊創(chuàng)建業(yè)務(wù)流程,在新建的業(yè)務(wù)流程添加數(shù)據(jù)同步節(jié)點(diǎn),再進(jìn)行命名。
進(jìn)入數(shù)據(jù)同步節(jié)點(diǎn),包括Reader端和Writer端,點(diǎn)擊Reader端數(shù)據(jù)源為Kafka,Writer端數(shù)據(jù)源為ODPS。點(diǎn)擊轉(zhuǎn)化為腳本模式。Reader或Writer端的一些同步參數(shù)可以在此處就近點(diǎn)擊,方便閱讀、操作和理解。
Kafka Reader的主要參數(shù):Kafka Reader的主要參數(shù)首先server,上文所述Kafka的默認(rèn)接入點(diǎn)就是其中一個(gè)server,ip:port。注意此處server是必填參數(shù)。topic,表示在Kafka部署完成之后,Kafka處理數(shù)據(jù)源的topic,此處也是必填參數(shù)。下一個(gè)參數(shù)是針對(duì)列column,column支持常量列、數(shù)據(jù)列、屬性列。常量列和數(shù)據(jù)列不太重要。同步的完整消息一般存放在屬性列 value 中,如果需要其它信息,如partition、offset、timestamp,也可以在屬性列中篩選。column是必填參數(shù)。
keyType、valueType各有6種類型,根據(jù)用戶同步的數(shù)據(jù),選擇相應(yīng)的信息,同步一個(gè)類型。需要注意同步方式是按消息時(shí)間同步,還是按消費(fèi)點(diǎn)位置同步的。按數(shù)據(jù)消費(fèi)點(diǎn)位置同步有四個(gè)場(chǎng)景,beginDateTime,endDateTime,beginOffset,endOffset。 beginDateTime 和beginOffset 二選其一,作為數(shù)據(jù)消費(fèi)起點(diǎn)。endDateTime 和endOffset 二選其一。需要注意beginDateTime、endDateTime 中需要Kafka0.10.2版本以上才支持按數(shù)據(jù)消費(fèi)點(diǎn)位置同步功能。另外需要注意beginOffset有三個(gè)比較特殊的形式:seekToBeginning,表示從開始點(diǎn)位消費(fèi)數(shù)據(jù);seekToLast,表示從上次消費(fèi)的偏移位置消費(fèi)數(shù)據(jù),按照beginOffset從上次偏移位置只能一次消費(fèi),如果使用beginDateTime則可以多次消費(fèi),這取決于消息存放時(shí)間;seekToEnd,表示從最后點(diǎn)位消費(fèi)數(shù)據(jù),會(huì)讀取到空數(shù)據(jù)。
skipExceeedRecord沒有太大作用,是不必填項(xiàng)。partition對(duì)topic所有分區(qū)共同讀消費(fèi)的,所以無需自定義一個(gè)分區(qū),是非必填項(xiàng)。kafkaConfig,如果有其它相關(guān)配置參數(shù)可以擴(kuò)展配置在kafkaConfig,kafkaConfig也是非必填項(xiàng)。
MaxCompute Writer的主要參數(shù):dataSource是數(shù)據(jù)源名稱,添加ODPS數(shù)據(jù)源。tables,表示所創(chuàng)建的數(shù)據(jù)表的表名稱,Kafka的數(shù)據(jù)要同步到哪張表中,相應(yīng)的字段也可以建立。
partition,如果表為分區(qū)表,則必須配置到最后一級(jí)分區(qū),確定同步位置。若為非分區(qū)表,則不必填。column,盡量與Kafka column中的相關(guān)字段做一一對(duì)應(yīng)的操作。同步的字段對(duì)應(yīng),信息同步才能確認(rèn)成功。truncate,寫入時(shí)同步的數(shù)據(jù)是選擇以追加模式寫還是以覆蓋模式寫,盡量避免多個(gè)DDL同時(shí)操作一個(gè)分區(qū),或者在多個(gè)并發(fā)作業(yè)啟動(dòng)前提前創(chuàng)建分區(qū)。
Kafka同步數(shù)據(jù)到MaxCompute:Kafka的Reader端,MaxCompute的Writer端以及限制參數(shù)。Reader包含server、endOffset、kafkaConfig、group.id、valueType、ByteArray、column字段、topic、beginOffset、seekToLast等。MaxCompute的Writer端包含覆蓋、追加、壓縮、查看源碼、同步到的表、字段要和Kafka的Reader端做一一對(duì)應(yīng),最重要的是value數(shù)據(jù)同步。限制參數(shù),主要有errorlimit,數(shù)據(jù)超過幾個(gè)錯(cuò)誤后會(huì)進(jìn)行報(bào)錯(cuò);speed,可以限制流速、并發(fā)度等。
參考Kafka生產(chǎn)者SDK編寫代碼:最終生產(chǎn)出的數(shù)據(jù)要發(fā)送到Kafka中,通過相關(guān)代碼可以查看用戶的生產(chǎn)數(shù)據(jù)。一段代碼表示配置信息的讀取,協(xié)議、序列化方式以及請(qǐng)求的等待時(shí)間,需要發(fā)送哪一個(gè)topic,發(fā)送什么樣的消息。發(fā)送完成后回傳一個(gè)信息。
代碼打包運(yùn)行在ECS上(與Kafka同一個(gè)可用區(qū)):如下圖所示,執(zhí)行crontab-e命令,每到17:00執(zhí)行一次。下圖為發(fā)送日志完成后的消息記錄。
在MaxCompute上創(chuàng)建表:進(jìn)入DataWorks業(yè)務(wù)流程頁面,創(chuàng)建目標(biāo)表,使用一個(gè)DDL語句創(chuàng)建同步的表,或根據(jù)用戶個(gè)人業(yè)務(wù)相應(yīng)創(chuàng)建不同的表的字段。
4.開發(fā)測(cè)試以及生產(chǎn)部署
選擇自定義資源組(或獨(dú)享集成資源組)進(jìn)行同步操作:下圖所示,選擇右上角“配置任務(wù)資源組”,根據(jù)用戶個(gè)人需求選擇資源組,點(diǎn)擊執(zhí)行。執(zhí)行完成后,會(huì)出現(xiàn)標(biāo)識(shí)顯示成功,同步數(shù)據(jù)記錄以及結(jié)果是否成功。同步過程基本結(jié)束。
查詢同步的數(shù)據(jù)結(jié)果:在DataWorks臨界面查看同步結(jié)果,在臨時(shí)節(jié)點(diǎn)點(diǎn)擊查詢命令,select * from testkafka3(表),查看數(shù)據(jù)同步結(jié)果。數(shù)據(jù)已經(jīng)同步過來,證明測(cè)試成功。
設(shè)置調(diào)度參數(shù):業(yè)務(wù)流程開發(fā)數(shù)據(jù)同步之后,會(huì)對(duì)相關(guān)模型進(jìn)行一些業(yè)務(wù)處理,最后設(shè)計(jì)一些SQL節(jié)點(diǎn)、同步節(jié)點(diǎn),進(jìn)行部署。如下圖所示,在右側(cè)點(diǎn)擊調(diào)度配置,輸入調(diào)度時(shí)間。
提交業(yè)務(wù)流程節(jié)點(diǎn),并打包發(fā)布:點(diǎn)擊業(yè)務(wù)流程,選擇所需要提交的節(jié)點(diǎn)并提交。一些業(yè)務(wù)流程提交之后不需要放到生產(chǎn)環(huán)境當(dāng)中。然后進(jìn)入任務(wù)發(fā)布界面,將節(jié)點(diǎn)添加到待發(fā)布進(jìn)行任務(wù)部署。
確認(rèn)業(yè)務(wù)流程發(fā)布成功:最后在運(yùn)維中心頁面,確認(rèn)發(fā)布是否在生產(chǎn)環(huán)境中存在。至此Kafka同步數(shù)據(jù)到MaxCompute過程結(jié)束。到了對(duì)應(yīng)的調(diào)度時(shí)間,在各個(gè)節(jié)點(diǎn)或者右上角會(huì)有節(jié)點(diǎn)的日志展示,可以查看日志運(yùn)行情況是否正常,或是否需要進(jìn)行后續(xù)操作,部署數(shù)據(jù)或是相關(guān)命令。
關(guān)于Kafka數(shù)據(jù)如何同步至MaxCompute問題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。
文章標(biāo)題:Kafka數(shù)據(jù)如何同步至MaxCompute
分享網(wǎng)址:http://jinyejixie.com/article10/gpshgo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供手機(jī)網(wǎng)站建設(shè)、面包屑導(dǎo)航、微信公眾號(hào)、建站公司、定制開發(fā)、品牌網(wǎng)站設(shè)計(jì)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)