事情是從公司前段時(shí)間的需求說起,大家知道宜信是一家金融科技公司,我們的很多數(shù)據(jù)與標(biāo)準(zhǔn)互聯(lián)網(wǎng)企業(yè)不同,大致來說就是:
目前成都創(chuàng)新互聯(lián)公司已為上1000家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)絡(luò)空間、網(wǎng)站改版維護(hù)、企業(yè)網(wǎng)站設(shè)計(jì)、獨(dú)山網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。玩數(shù)據(jù)的人都知道數(shù)據(jù)是非常有價(jià)值的,然后這些數(shù)據(jù)是保存在各個(gè)系統(tǒng)的數(shù)據(jù)庫中,如何讓需要數(shù)據(jù)的使用方得到一致性、實(shí)時(shí)的數(shù)據(jù)呢?
過去的通用做法有幾種,分別是:
這些方案都不算完美。我們?cè)诹私夂涂紤]了不同實(shí)現(xiàn)方式后,最后借鑒了 linkedin的思想,認(rèn)為要想同時(shí)解決數(shù)據(jù)一致性和實(shí)時(shí)性,比較合理的方法應(yīng)該是來自于log。
(此圖來自:https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/)
把增量的Log作為一切系統(tǒng)的基礎(chǔ)。后續(xù)的數(shù)據(jù)使用方,通過訂閱kafka來消費(fèi)log。
比如:
為什么使用log和kafka作為基礎(chǔ),而不使用Sqoop進(jìn)行抽取呢? 因?yàn)椋?/p>
為什么不使用dual write(雙寫)呢?,請(qǐng)參考https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/
這里就不多做解釋了。
于是我們提出了構(gòu)建一個(gè)基于log的公司級(jí)的平臺(tái)的想法。
下面解釋一下DWS平臺(tái),DWS平臺(tái)是有3個(gè)子項(xiàng)目組成:
圖中:
由于時(shí)間關(guān)系,我今天主要介紹DWS中的Dbus和Wormhole,在需要的時(shí)候附帶介紹一下Swifts。
如前面所說,Dbus主要解決的是將日志從源端實(shí)時(shí)的抽出。 這里我們以MySQL為例子,簡(jiǎn)單說明如何實(shí)現(xiàn)。
我們知道,雖然MySQL InnoDB有自己的log,MySQL主備同步是通過binlog來實(shí)現(xiàn)的。如下圖:
圖片來自:https://github.com/alibaba/canal
而binlog有三種模式:
他們各自的優(yōu)缺點(diǎn)如下:
此處來自:http://www.jquerycn.cn/a\_13625
由于statement 模式的缺點(diǎn),在與我們的DBA溝通過程中了解到,實(shí)際生產(chǎn)過程中都使用row 模式進(jìn)行復(fù)制。這使得讀取全量日志成為可能。
通常我們的MySQL布局是采用 2個(gè)master主庫(vip)+ 1個(gè)slave從庫 + 1個(gè)backup容災(zāi)庫 的解決方案,由于容災(zāi)庫通常是用于異地容災(zāi),實(shí)時(shí)性不高也不便于部署。
為了最小化對(duì)源端產(chǎn)生影響,顯然我們讀取binlog日志應(yīng)該從slave從庫讀取。
讀取binlog的方案比較多,github上不少,參考https://github.com/search?utf8=%E2%9C%93&q=binlog。最終我們選用了阿里的canal做位日志抽取方。
Canal最早被用于阿里中美機(jī)房同步, canal原理相對(duì)比較簡(jiǎn)單:
圖片來自:https://github.com/alibaba/canal
Dbus 的MySQL版主要解決方案如下:
對(duì)于增量的log,通過訂閱Canal Server的方式,我們得到了MySQL的增量日志:
在考慮使用Storm作為解決方案的時(shí)候,我們主要是認(rèn)為Storm有以下優(yōu)點(diǎn):
對(duì)于流水表,有增量部分就夠了,但是許多表需要知道最初(已存在)的信息。這時(shí)候我們需要initial load(第一次加載)。
對(duì)于initial load(第一次加載),同樣開發(fā)了全量抽取Storm程序通過jdbc連接的方式,從源端數(shù)據(jù)庫的備庫進(jìn)行拉取。initial load是拉全部數(shù)據(jù),所以我們推薦在業(yè)務(wù)低峰期進(jìn)行。好在只做一次,不需要每天都做。
全量抽取,我們借鑒了Sqoop的思想。將全量抽取Storm分為了2 個(gè)部分:
數(shù)據(jù)分片需要考慮分片列,按照配置和自動(dòng)選擇列將數(shù)據(jù)按照范圍來分片,并將分片信息保存到kafka中。
下面是具體的分片策略:
全量抽取的Storm程序是讀取kafka的分片信息,采用多個(gè)并發(fā)度并行連接數(shù)據(jù)庫備庫進(jìn)行拉取。因?yàn)槌槿〉臅r(shí)間可能很長(zhǎng)。抽取過程中將實(shí)時(shí)狀態(tài)寫到Zookeeper中,便于心跳程序監(jiān)控。
無論是增量還是全量,最終輸出到kafka中的消息都是我們約定的一個(gè)統(tǒng)一消息格式,稱為UMS(unified message schema)格式。
如下圖所示:
消息中schema部分,定義了namespace 是由 類型+數(shù)據(jù)源名+schema名+表名+版本號(hào)+分庫號(hào)+分表號(hào) 能夠描述整個(gè)公司的所有表,通過一個(gè)namespace就能唯一定位。
payload是指具體的數(shù)據(jù),一個(gè)json包里面可以包含1條至多條數(shù)據(jù),提高數(shù)據(jù)的有效載荷。
UMS中支持的數(shù)據(jù)類型,參考了Hive類型并進(jìn)行簡(jiǎn)化,基本上包含了所有數(shù)據(jù)類型。
在整個(gè)數(shù)據(jù)傳輸中,為了盡量的保證日志消息的順序性,kafka我們使用的是1個(gè)partition的方式。在一般情況下,基本上是順序的和唯一的。
但是我們知道寫kafka會(huì)失敗,有可能重寫,Storm也用重做機(jī)制,因此,我們并不嚴(yán)格保證exactly once和完全的順序性,但保證的是at least once。
因此_ums_id_變得尤為重要。
對(duì)于全量抽取,_ums_id_是唯一的,從zk中每個(gè)并發(fā)度分別取不同的id片區(qū),保證了唯一性和性能,填寫負(fù)數(shù),不會(huì)與增量數(shù)據(jù)沖突,也保證他們是早于增量消息的。
對(duì)于增量抽取,我們使用的是MySQL的日志文件號(hào) + 日志偏移量作為唯一id。Id作為64位的long整數(shù),高7位用于日志文件號(hào),低12位作為日志偏移量。
例如:000103000012345678。 103 是日志文件號(hào),12345678 是日志偏移量。
這樣,從日志層面保證了物理唯一性(即便重做也這個(gè)id號(hào)也不變),同時(shí)也保證了順序性(還能定位日志)。通過比較_ums_id_ 消費(fèi)日志就能通過比較_ums_id_知道哪條消息更新。
其實(shí)_ums_ts_與_ums_id_意圖是類似的,只不過有時(shí)候_ums_ts_可能會(huì)重復(fù),即在1毫秒中發(fā)生了多個(gè)操作,這樣就得靠比較_ums_id_了。
整個(gè)系統(tǒng)涉及到數(shù)據(jù)庫的主備同步,Canal Server,多個(gè)并發(fā)度Storm進(jìn)程等各個(gè)環(huán)節(jié)。
因此對(duì)流程的監(jiān)控和預(yù)警就尤為重要。
通過心跳模塊,例如每分鐘(可配置)對(duì)每個(gè)被抽取的表插入一條心態(tài)數(shù)據(jù)并保存發(fā)送時(shí)間,這個(gè)心跳表也被抽取,跟隨著整個(gè)流程下來,與被同步表在實(shí)際上走相同的邏輯(因?yàn)槎鄠€(gè)并發(fā)的的Storm可能有不同的分支),當(dāng)收到心跳包的時(shí)候,即便沒有任何增刪改的數(shù)據(jù),也能證明整條鏈路是通的。
Storm程序和心跳程序?qū)?shù)據(jù)發(fā)送公共的統(tǒng)計(jì)topic,再由統(tǒng)計(jì)程序保存到influxdb中,使用grafana進(jìn)行展示,就可以看到如下效果:
圖中是某業(yè)務(wù)系統(tǒng)的實(shí)時(shí)監(jiān)控信息。上面是實(shí)時(shí)流量情況,下面是實(shí)時(shí)延時(shí)情況??梢钥吹?,實(shí)時(shí)性還是很不錯(cuò)的,基本上1~2秒數(shù)據(jù)就已經(jīng)到末端kafka中。
Granfana提供的是一種實(shí)時(shí)監(jiān)控能力。
如果出現(xiàn)延時(shí),則是通過dbus的心跳模塊發(fā)送郵件報(bào)警或短信報(bào)警。
考慮到數(shù)據(jù)安全性,對(duì)于有脫敏需求的場(chǎng)景,Dbus的全量storm和增量storm程序也完成了實(shí)時(shí)脫敏的功能。脫敏方式有3種:
總結(jié)一下:簡(jiǎn)單的說,Dbus就是將各種源的數(shù)據(jù),實(shí)時(shí)的導(dǎo)出,并以UMS的方式提供訂閱, 支持實(shí)時(shí)脫敏,實(shí)際監(jiān)控和報(bào)警。
說完Dbus,該說一下Wormhole,為什么兩個(gè)項(xiàng)目不是一個(gè),而要通過kafka來對(duì)接呢?
其中很大一個(gè)原因就是解耦,kafka具有天然的解耦能力,程序直接可以通過kafka做異步的消息傳遞。Dbus和Wornhole內(nèi)部也使用了kafka做消息傳遞和解耦。
另外一個(gè)原因就是,UMS是自描述的,通過訂閱kafka,任何有能力的使用方來直接消費(fèi)UMS來使用。
雖然UMS的結(jié)果可以直接訂閱,但還需要開發(fā)的工作。Wormhole解決的是:提供一鍵式的配置,將kafka中的數(shù)據(jù)落地到各種系統(tǒng)中,讓沒有開發(fā)能力的數(shù)據(jù)使用方通過wormhole來實(shí)現(xiàn)使用數(shù)據(jù)。
如圖所示,Wormhole 可以將kafka中的UMS 落地到各種系統(tǒng),目前用的最多的HDFS,JDBC的數(shù)據(jù)庫和HBase。
在技術(shù)棧上, wormhole選擇使用spark streaming來進(jìn)行。
在Wormhole中,一條flow是指從一個(gè)namaspace從源端到目標(biāo)端。一個(gè)spark streaming服務(wù)于多條flow。
選用Spark的理由是很充分的:
這里補(bǔ)充說一下Swifts的作用:
Wormhole和Swifts對(duì)比如下:
通過Wormhole Wpark Streaming程序消費(fèi)kafka的UMS,首先UMS log可以被保存到HDFS上。
kafka一般只保存若干天的信息,不會(huì)保存全部信息,而HDFS中可以保存所有的歷史增刪改的信息。這就使得很多事情變?yōu)榭赡埽?/p>
可以說HDFS中的日志是很多的事情基礎(chǔ)。
介于Spark原生對(duì)parquet支持的很好,Spark SQL能夠?qū)arquet提供很好的查詢。UMS落地到HDFS上是保存到Parquet文件中的。Parquet的內(nèi)容是所有l(wèi)og的增刪改信息以及_ums_id_,_ums_ts_都存下來。
Wormhole spark streaming根據(jù)namespace 將數(shù)據(jù)分布存儲(chǔ)到不同的目錄中,即不同的表和版本放在不同目錄中。
由于每次寫的Parquet都是小文件,大家知道HDFS對(duì)于小文件性能并不好,因此另外還有一個(gè)job,每天定時(shí)將這些的Parquet文件進(jìn)行合并成大文件。
每個(gè)Parquet文件目錄都帶有文件數(shù)據(jù)的起始時(shí)間和結(jié)束時(shí)間。這樣在回灌數(shù)據(jù)時(shí),可以根據(jù)選取的時(shí)間范圍來決定需要讀取哪些Parquet文件,不必讀取全部數(shù)據(jù)。
常常我們遇到的需求是,將數(shù)據(jù)經(jīng)過加工落地到數(shù)據(jù)庫或HBase中。那么這里涉及到的一個(gè)問題就是,什么樣的數(shù)據(jù)可以被更新到數(shù)據(jù)?
這里最重要的一個(gè)原則就是數(shù)據(jù)的冪等性。
無論是遇到增刪改任何的數(shù)據(jù),我們面臨的問題都是:
對(duì)于第一個(gè)問題,其實(shí)就需要定位數(shù)據(jù)要找一個(gè)唯一的鍵,常見的有:
對(duì)于第二個(gè)問題,就涉及到_ums_id_了,因?yàn)槲覀円呀?jīng)保證了_ums_id_大的值更新,因此在找到對(duì)應(yīng)數(shù)據(jù)行后,根據(jù)這個(gè)原則來進(jìn)行替換更新。
之所以要軟刪除和加入_is_active_列,是為了這樣一種情況:
如果已經(jīng)插入的_ums_id_比較大,是刪除的數(shù)據(jù)(表明這個(gè)數(shù)據(jù)已經(jīng)刪除了), 如果不是軟刪除,此時(shí)插入一個(gè)_ums_id_小的數(shù)據(jù)(舊數(shù)據(jù)),就會(huì)真的插入進(jìn)去。
這就導(dǎo)致舊數(shù)據(jù)被插入了。不冪等了。所以被刪除的數(shù)據(jù)依然保留(軟刪除)是有價(jià)值的,它能被用于保證數(shù)據(jù)的冪等性。
插入數(shù)據(jù)到Hbase中,相當(dāng)要簡(jiǎn)單一些。不同的是HBase可以保留多個(gè)版本的數(shù)據(jù)(當(dāng)然也可以只保留一個(gè)版本)默認(rèn)是保留3個(gè)版本;
因此插入數(shù)據(jù)到HBase,需要解決的問題是:
Version的選擇很有意思,利用_ums_id_的唯一性和自增性,與version自身的比較關(guān)系一致:即version較大等價(jià)于_ums_id_較大,對(duì)應(yīng)的版本較新。
從提高性能的角度,我們可以將整個(gè)Spark Streaming的Dataset集合直接插入到HBase,不需要比較。讓HBase基于version自動(dòng)替我們判斷哪些數(shù)據(jù)可以保留,哪些數(shù)據(jù)不需要保留。
Jdbc的插入數(shù)據(jù):插入數(shù)據(jù)到數(shù)據(jù)庫中,保證冪等的原理雖然簡(jiǎn)單,要想提高性能在實(shí)現(xiàn)上就變得復(fù)雜很多,總不能一條一條的比較然后在插入或更新。
我們知道Spark的RDD/dataset都是以集合的方式來操作以提高性能,同樣的我們需要以集合操作的方式實(shí)現(xiàn)冪等性。
具體思路是:
A:不存在的數(shù)據(jù),即這部分?jǐn)?shù)據(jù)insert就可以;
B:存在的數(shù)據(jù),比較_ums_id_, 最終只將哪些_ums_id_更新較大row到目標(biāo)數(shù)據(jù)庫,小的直接拋棄。
使用Spark的同學(xué)都知道,RDD/dataset都是可以partition的,可以使用多個(gè)worker并進(jìn)行操作以提高效率。
在考慮并發(fā)情況下,插入和更新都可能出現(xiàn)失敗,那么還有考慮失敗后的策略。
比如:因?yàn)閯e的worker已經(jīng)插入,那么因?yàn)槲ㄒ恍约s束插入失敗,那么需要改為更新,還要比較_ums_id_看是否能夠更新。
對(duì)于無法插入其他情況(比如目標(biāo)系統(tǒng)有問題),Wormhole還有重試機(jī)制。插入到其他存儲(chǔ)中的就不多介紹了,總的原則是:根據(jù)各自存儲(chǔ)自身特性,設(shè)計(jì)基于集合的,并發(fā)的插入數(shù)據(jù)實(shí)現(xiàn)。這些都是Wormhole為了性能而做的努力,使用Wo(hù)rmhole的用戶不必關(guān)心 。
說了那么多,DWS有什么實(shí)際運(yùn)用呢?下面我來介紹某系統(tǒng)使用DWS實(shí)現(xiàn)了的實(shí)時(shí)營(yíng)銷。
如上圖所示:
系統(tǒng)A的數(shù)據(jù)都保存到自己的數(shù)據(jù)庫中,我們知道,宜信提供很多金融服務(wù),其中包括借款,而借款過程中很重要的就是信用審核。
借款人需要提供證明具有信用價(jià)值的信息,比如央行征信報(bào)告,是具有最強(qiáng)信用數(shù)據(jù)的數(shù)據(jù)。 而銀行流水,網(wǎng)購流水也是具有較強(qiáng)的信用屬性的數(shù)據(jù)。
借款人通過Web或手機(jī)APP在系統(tǒng)A中填寫信用信息時(shí),可能會(huì)某些原因無法繼續(xù),雖然可能這個(gè)借款人是一個(gè)優(yōu)質(zhì)潛在客戶,但以前由于無法或很久才能知道這個(gè)信息,所以實(shí)際上這樣的客戶是流失了。
應(yīng)用了DWS以后,借款人已經(jīng)填寫的信息已經(jīng)記錄到數(shù)據(jù)庫中,并通過DWS實(shí)時(shí)的進(jìn)行抽取、計(jì)算和落地到目標(biāo)庫中。根據(jù)對(duì)客戶的打分,評(píng)價(jià)出優(yōu)質(zhì)客戶。然后立刻將這個(gè)客戶的信息輸出到客服系統(tǒng)中。
客服人員在很短的時(shí)間(幾分鐘以內(nèi))就通過打電話的方式聯(lián)系上這個(gè)借款人(潛客),進(jìn)行客戶關(guān)懷,將這個(gè)潛客轉(zhuǎn)換為真正的客戶。我們知道借款是有時(shí)效性的,如果時(shí)間太久就沒有價(jià)值了。
如果沒有實(shí)時(shí)抽取/計(jì)算/落庫的能力,那么這一切都無法實(shí)現(xiàn)。
另外一個(gè)實(shí)時(shí)報(bào)表的應(yīng)用如下:
我們數(shù)據(jù)使用方的數(shù)據(jù)來自多個(gè)系統(tǒng),以前是通過T+1的方式獲得報(bào)表信息,然后指導(dǎo)第二天的運(yùn)營(yíng),這樣時(shí)效性很差。
通過DWS,將數(shù)據(jù)從多個(gè)系統(tǒng)中實(shí)時(shí)抽取,計(jì)算和落地,并提供報(bào)表展示,使得運(yùn)營(yíng)可以及時(shí)作出部署和調(diào)整,快速應(yīng)對(duì)。
作者:王東
來源:宜信技術(shù)學(xué)院
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。
網(wǎng)頁名稱:如何基于日志,同步實(shí)現(xiàn)數(shù)據(jù)的一致性和實(shí)時(shí)抽取?-創(chuàng)新互聯(lián)
地址分享:http://jinyejixie.com/article22/csohcc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站改版、用戶體驗(yàn)、定制網(wǎng)站、標(biāo)簽優(yōu)化、App開發(fā)、商城網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容