本篇內(nèi)容主要講解“kafka消息丟失怎么辦”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“kafka消息丟失怎么辦”吧!
創(chuàng)新互聯(lián)建站堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都做網(wǎng)站、網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的個(gè)舊網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
在處理生產(chǎn)環(huán)境問(wèn)題的過(guò)程中發(fā)現(xiàn)12號(hào)那天某kafka集群有少量的數(shù)據(jù)丟失,概率大致在千萬(wàn)分之三。 數(shù)據(jù)寫(xiě)入kafka之后,就完全消失了,消費(fèi)者完全沒(méi)有消費(fèi)到這個(gè)數(shù)據(jù)。 通過(guò)找到那天的數(shù)據(jù),查看有問(wèn)題的數(shù)據(jù)在寫(xiě)入kafka的時(shí)候上下文應(yīng)用日志發(fā)現(xiàn)有少量以下報(bào)錯(cuò):
[2019-10-12 11:03:43,xxx] This is not the correct coordinator.
理論上正常情況下kafka是不太可能丟數(shù)據(jù)的,如果出現(xiàn)這種情況,必然是開(kāi)發(fā)人員或者硬件引發(fā)了什么問(wèn)題,因?yàn)閷?xiě)入日志是有的,看了下應(yīng)用配置
acks=1
馬上意識(shí)到,問(wèn)題突破口應(yīng)該在這里。
acks=0 生產(chǎn)者能夠通過(guò)網(wǎng)絡(luò)吧消息發(fā)送出去,那么就認(rèn)為消息已成功寫(xiě)入Kafka,一定會(huì)丟失一些數(shù)據(jù) acks=1 master在疏導(dǎo)消息并把它寫(xiě)到分區(qū)數(shù)據(jù)問(wèn)津是會(huì)返回確認(rèn)或者錯(cuò)誤響應(yīng),還是可能會(huì)丟數(shù)據(jù) acks=all master在返回確認(rèn)或錯(cuò)誤響應(yīng)之前,會(huì)等待所有同步副本都收到消息
可能以前是為了保證性能夠快,選擇了折中的應(yīng)用配置 acks=1
。
馬上想到去看下kafka的日志,猜測(cè)這個(gè)時(shí)間段必然出現(xiàn)出現(xiàn)了 master 不可用的情況才會(huì)導(dǎo)致數(shù)據(jù)丟失。
在kafka集群的57號(hào)節(jié)點(diǎn)的機(jī)器上看到這樣一段日志:
[2019-10-12 11:03:39,427] WARN Client session timed out, have not heard from server in 4034ms for sessionid 0x396aaaadbbxx00 (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:40,908] INFO Client session timed out, have not heard from server in 4034ms for sessionid 0x396aaaabbxx00, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:41,253] INFO zookeeper state changed (Disconnected) (org.I0Itec.zkclient.ZkClient) [2019-10-12 11:03:41,962] INFO Opening socket connection to server xx.xx.xx.59/xx.xx.xx.59:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:41,962] INFO Socket connection established to xx.xx.xx.59/10.xx.xx.xx:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,293] WARN Unable to reconnect to ZooKeeper service, session 0x396cf664cdbb0000 has expired (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,293] INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient) [2019-10-12 11:03:42,294] INFO Initiating client connection, connectString=xx.xx.xx.55:2181,xx.xx.xx.56:2181,xx.xx.xx.57:2181,xx.xx.xx.58:2181,xx.xx.xx.59:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@342xxx2d (org.apache.zookeeper.ZooKeeper) [2019-10-12 11:03:42,313] INFO Unable to reconnect to ZooKeeper service, session 0x396cxxxxxb0000 has expired, closing socket connection (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,323] INFO EventThread shut down for session: 0x396cxxxx000 (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,342] INFO Opening socket connection to server xx.xx.xx.58/xx.xx.xx.58:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,343] INFO Socket connection established to xx.xx.xx.58/xx.xx.xx.58:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:43,516] INFO Session establishment complete on server xx.xx.xx.58/xx.xx.xx.58:2181, sessionid = 0x3ax4xxxxx01, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:43,517] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
看來(lái)和猜測(cè)的情況是一樣的,這個(gè)kafka的節(jié)點(diǎn)機(jī)器出現(xiàn)了連接不到zk,所以這臺(tái)機(jī)器處于‘丟失’狀態(tài)。又去看了下kafka的controller日志:
[2019-10-12 11:03:42,671] INFO [Controller id=56] Newly added brokers: , deleted brokers: 57, all live brokers: ...,55,56,58,59,xx.... (kafka.controller.KafkaController) [2019-10-12 11:03:42,678] INFO [Controller-56-to-broker-57-send-thread]: Shutting down (kafka.controller.RequestSendThread) [2019-10-12 11:03:42,749] INFO [Controller-56-to-broker-57-send-thread]: Stopped (kafka.controller.RequestSendThread) [2019-10-12 11:03:42,749] INFO [Controller-56-to-broker-57-send-thread]: Shutdown completed (kafka.controller.RequestSendThread) [2019-10-12 11:03:42,828] INFO [Controller id=56] Broker failure callback for 57 (kafka.controller.KafkaController) [2019-10-12 11:03:42,836] INFO [Controller id=56] Removed ArrayBuffer() from list of shutting down brokers. (kafka.controller.KafkaController)
再次看了下這個(gè)節(jié)點(diǎn)系統(tǒng)的網(wǎng)絡(luò)層面監(jiān)控:
真相浮出水面,和猜測(cè)吻合,這個(gè)問(wèn)題和之前遇到的一個(gè)問(wèn)題有點(diǎn)類似,不一樣的情況是這次只丟失了一個(gè)節(jié)點(diǎn),上次的問(wèn)題是所有的網(wǎng)絡(luò)節(jié)點(diǎn)都短暫的丟失,詳情可以看我之前寫(xiě)的博客: https://my.oschina.net/110NotFound/blog/3105190
57號(hào)節(jié)點(diǎn)丟失之后,立馬進(jìn)行了選舉,這個(gè)時(shí)候數(shù)據(jù)在生產(chǎn)的時(shí)候到達(dá)了master,恰好選舉的時(shí)候變更了master,而 acks=1
,數(shù)據(jù)已經(jīng)進(jìn)入master,但是還沒(méi)有來(lái)得及同步到slave,這樣就導(dǎo)致了數(shù)據(jù)的丟失。
如果有對(duì)數(shù)據(jù)有強(qiáng)一致性的要求,一定要選擇 acks=all
否則指不定哪天硬件的輕微系統(tǒng)抖動(dòng)就會(huì)導(dǎo)致 kafka
集群重新選舉,丟失數(shù)據(jù)。
到此,相信大家對(duì)“kafka消息丟失怎么辦”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
當(dāng)前名稱:kafka消息丟失怎么辦
新聞來(lái)源:http://jinyejixie.com/article46/joheeg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供小程序開(kāi)發(fā)、定制開(kāi)發(fā)、網(wǎng)站導(dǎo)航、企業(yè)建站、標(biāo)簽優(yōu)化、網(wǎng)站設(shè)計(jì)公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)