成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

Kafka單線程Consumer及參數(shù)詳解

請(qǐng)使用0.9以后的版本:

成都創(chuàng)新互聯(lián)專注于焉耆企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站開發(fā),商城開發(fā)。焉耆網(wǎng)站建設(shè)公司,為焉耆等地區(qū)提供建站服務(wù)。全流程按需制作網(wǎng)站,專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,成都創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)

示例代碼
 Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("auto.offset.reset","earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
      try{  
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
         }
        }finally{
          consumer.close();
        }

1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定);

2、用這些Properties構(gòu)建consumer對(duì)象(KafkaConsumer還有其他構(gòu)造,可以把序列化傳進(jìn)去);

3、subscribe訂閱topic列表(可以用正則訂閱Pattern.compile("kafka.*")

使用正則必須指定一個(gè)listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重寫這個(gè)接口來實(shí)現(xiàn) 分區(qū)變更時(shí)的邏輯。如果設(shè)置了enable.auto.commit = true 就不用理會(huì)這個(gè)邏輯。

4、然后循環(huán)poll消息(這里的1000是超時(shí)設(shè)定,如果沒有很多數(shù)據(jù),也就等一秒);

5、處理消息(打印了offset key value 這里寫處理邏輯)。

6、關(guān)閉KafkaConsumer(可以傳一個(gè)timeout值 等待秒數(shù) 默認(rèn)是30)。

參數(shù)詳解

bootstrap.server(最好用主機(jī)名不用ip kafka內(nèi)部用的主機(jī)名 除非自己配置了ip)

deserializer 反序列化consumer從broker端獲取的是字節(jié)數(shù)組,還原回對(duì)象類型。

默認(rèn)有十幾種:StringDeserializer LongDeserializer DoubleDeserializer。。

也可以自定義:定義serializer格式 創(chuàng)建自定義deserializer類實(shí)現(xiàn)Deserializer 接口 重寫邏輯

?

除了四個(gè)必傳的 bootstrap.server group.id key.deserializer value.deserializer

還有session.timeout.ms "coordinator檢測(cè)失敗的時(shí)間"

是檢測(cè)consumer掛掉的時(shí)間 為了可以及時(shí)的rebalance 默認(rèn)是10秒 可以設(shè)置更小的值避免消息延遲。

max.poll.interval.ms "consumer處理邏輯最大時(shí)間"

處理邏輯比較復(fù)雜的時(shí)候 可以設(shè)置這個(gè)值 避免造成不必要的 rebalance ,因?yàn)閮纱蝡oll時(shí)間超過了這個(gè)參數(shù),kafka認(rèn)為這個(gè)consumer已經(jīng)跟不上了,會(huì)踢出組,而且不能提交offset,就會(huì)重復(fù)消費(fèi)。默認(rèn)是5分鐘。

auto.offset.reset "無位移或者位移越界時(shí)kafka的應(yīng)對(duì)策略"

所以如果啟動(dòng)了一個(gè)group從頭消費(fèi) 成功提交位移后 重啟后還是接著消費(fèi) 這個(gè)參數(shù)無效

所以3個(gè)值的解釋是:

earliset 當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無提交的offset時(shí),從最早的位移消費(fèi)

latest 當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù) none topic各分區(qū)都存在已提交的offset時(shí),從offset后開始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常

(注意kafka-0.10.1.X版本之前:?auto.offset.reset 的值為smallest,和,largest.(offest保存在zk中) 、

我們這是說的是新版本:kafka-0.10.1.X版本之后:?auto.offset.reset 的值更改為:earliest,latest,和none (offest保存在kafka的一個(gè)特殊的topic名為:__consumer_offsets里面))

enable.auto.commit 是否自動(dòng)提交位移

true 自動(dòng)提交 false需要用戶手動(dòng)提交 有只處理一次需要的 最近設(shè)置為false自己控制。

fetch.max.bytes consumer單次獲取最大字節(jié)數(shù)

max.poll.records 單次poll返回的最大消息數(shù)

默認(rèn)500條 如果消費(fèi)很輕量 可以適當(dāng)提高這個(gè)值 增加消費(fèi)速度。

hearbeat.interval.ms consumer其他組員感知rabalance的時(shí)間

該值必須小于 session.timeout.ms 如果檢測(cè)到 consumer掛掉 也就根本無法感知rabalance了

connections.max.idle.ms 定期關(guān)閉連接的時(shí)間

默認(rèn)是9分鐘 可以設(shè)置為-1 永不關(guān)閉

更多實(shí)時(shí)計(jì)算,Kafka等相關(guān)技術(shù)博文,歡迎關(guān)注實(shí)時(shí)流式計(jì)算

Kafka單線程Consumer及參數(shù)詳解

本文標(biāo)題:Kafka單線程Consumer及參數(shù)詳解
網(wǎng)址分享:http://jinyejixie.com/article28/ggeocp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供靜態(tài)網(wǎng)站Google軟件開發(fā)、網(wǎng)站排名網(wǎng)站導(dǎo)航、網(wǎng)站內(nèi)鏈

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站建設(shè)
威海市| 诏安县| 阿克陶县| 新绛县| 乳山市| 海口市| 水城县| 日喀则市| 赣榆县| 磐安县| 磐安县| 通辽市| 容城县| 印江| 封开县| 海口市| 阳高县| 定南县| 景谷| 绥芬河市| 保亭| 巫溪县| 四子王旗| 镇远县| 逊克县| 沁阳市| 渭南市| 台北市| 临漳县| 临城县| 二手房| 承德市| 灵丘县| 科技| 大同市| 江源县| 扬中市| 吴江市| 双辽市| 江城| 仙桃市|