請(qǐng)使用0.9以后的版本:
成都創(chuàng)新互聯(lián)專注于焉耆企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站開發(fā),商城開發(fā)。焉耆網(wǎng)站建設(shè)公司,為焉耆等地區(qū)提供建站服務(wù)。全流程按需制作網(wǎng)站,專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,成都創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
try{
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}finally{
consumer.close();
}
1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定);
2、用這些Properties構(gòu)建consumer對(duì)象(KafkaConsumer還有其他構(gòu)造,可以把序列化傳進(jìn)去);
3、subscribe訂閱topic列表(可以用正則訂閱Pattern.compile("kafka.*")
使用正則必須指定一個(gè)listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重寫這個(gè)接口來實(shí)現(xiàn) 分區(qū)變更時(shí)的邏輯。如果設(shè)置了enable.auto.commit = true 就不用理會(huì)這個(gè)邏輯。
4、然后循環(huán)poll消息(這里的1000是超時(shí)設(shè)定,如果沒有很多數(shù)據(jù),也就等一秒);
5、處理消息(打印了offset key value 這里寫處理邏輯)。
6、關(guān)閉KafkaConsumer(可以傳一個(gè)timeout值 等待秒數(shù) 默認(rèn)是30)。
bootstrap.server(最好用主機(jī)名不用ip kafka內(nèi)部用的主機(jī)名 除非自己配置了ip)
deserializer 反序列化consumer從broker端獲取的是字節(jié)數(shù)組,還原回對(duì)象類型。
默認(rèn)有十幾種:StringDeserializer LongDeserializer DoubleDeserializer。。
也可以自定義:定義serializer格式 創(chuàng)建自定義deserializer類實(shí)現(xiàn)Deserializer 接口 重寫邏輯
?
除了四個(gè)必傳的 bootstrap.server group.id key.deserializer value.deserializer
還有session.timeout.ms "coordinator檢測(cè)失敗的時(shí)間"
是檢測(cè)consumer掛掉的時(shí)間 為了可以及時(shí)的rebalance 默認(rèn)是10秒 可以設(shè)置更小的值避免消息延遲。
max.poll.interval.ms "consumer處理邏輯最大時(shí)間"
處理邏輯比較復(fù)雜的時(shí)候 可以設(shè)置這個(gè)值 避免造成不必要的 rebalance ,因?yàn)閮纱蝡oll時(shí)間超過了這個(gè)參數(shù),kafka認(rèn)為這個(gè)consumer已經(jīng)跟不上了,會(huì)踢出組,而且不能提交offset,就會(huì)重復(fù)消費(fèi)。默認(rèn)是5分鐘。
auto.offset.reset "無位移或者位移越界時(shí)kafka的應(yīng)對(duì)策略"
所以如果啟動(dòng)了一個(gè)group從頭消費(fèi) 成功提交位移后 重啟后還是接著消費(fèi) 這個(gè)參數(shù)無效
所以3個(gè)值的解釋是:
earliset 當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無提交的offset時(shí),從最早的位移消費(fèi)
latest 當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù) none topic各分區(qū)都存在已提交的offset時(shí),從offset后開始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常
(注意kafka-0.10.1.X版本之前:?auto.offset.reset 的值為smallest,和,largest.(offest保存在zk中) 、
我們這是說的是新版本:kafka-0.10.1.X版本之后:?auto.offset.reset 的值更改為:earliest,latest,和none (offest保存在kafka的一個(gè)特殊的topic名為:__consumer_offsets里面))
enable.auto.commit 是否自動(dòng)提交位移
true 自動(dòng)提交 false需要用戶手動(dòng)提交 有只處理一次需要的 最近設(shè)置為false自己控制。
fetch.max.bytes consumer單次獲取最大字節(jié)數(shù)
max.poll.records 單次poll返回的最大消息數(shù)
默認(rèn)500條 如果消費(fèi)很輕量 可以適當(dāng)提高這個(gè)值 增加消費(fèi)速度。
hearbeat.interval.ms consumer其他組員感知rabalance的時(shí)間
該值必須小于 session.timeout.ms 如果檢測(cè)到 consumer掛掉 也就根本無法感知rabalance了
connections.max.idle.ms 定期關(guān)閉連接的時(shí)間
默認(rèn)是9分鐘 可以設(shè)置為-1 永不關(guān)閉
更多實(shí)時(shí)計(jì)算,Kafka等相關(guān)技術(shù)博文,歡迎關(guān)注實(shí)時(shí)流式計(jì)算
本文標(biāo)題:Kafka單線程Consumer及參數(shù)詳解
網(wǎng)址分享:http://jinyejixie.com/article28/ggeocp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供靜態(tài)網(wǎng)站、Google、軟件開發(fā)、網(wǎng)站排名、網(wǎng)站導(dǎo)航、網(wǎng)站內(nèi)鏈
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)