成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

如何解析Kafka1.0.0多消費者示例

如何解析Kafka 1.0.0 多消費者示例,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

網(wǎng)站建設哪家好,找創(chuàng)新互聯(lián)!專注于網(wǎng)頁設計、網(wǎng)站建設、微信開發(fā)、小程序定制開發(fā)、集團企業(yè)網(wǎng)站建設等服務項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了西寧免費建站歡迎大家使用!

package kafka.demo;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
 * 
 *  <p>Description: kafka 1.0.0</p> 
 * @author guangshihao
 * @date 2018年9月19日 
 *
 */
public class KafkaProduderDemo {
	public static void main(String[] args) {
		Map<String,Object> props = new HashMap<>();
		/*
         * acks,設置發(fā)送數(shù)據(jù)是否需要服務端的反饋,有三個值0,1,-1
		 * 0,意味著producer永遠不會等待一個來自broker的ack,這就是0.7版本的行為。
		 * 這個選項提供了最低的延遲,但是持久化的保證是最弱的,當server掛掉的時候會丟失一些數(shù)據(jù)。
		 * 1,意味著在leader replica已經(jīng)接收到數(shù)據(jù)后,producer會得到一個ack。
		 * 這個選項提供了更好的持久性,因為在server確認請求成功處理后,client才會返回。
		 * 如果剛寫到leader上,還沒來得及復制leader就掛了,那么消息才可能會丟失。
		 * -1,意味著在所有的ISR都接收到數(shù)據(jù)后,producer才得到一個ack。
		 * 這個選項提供了最好的持久性,只要還有一個replica存活,那么數(shù)據(jù)就不會丟失
		 */
		props.put("acks", "1");
		//配置默認的分區(qū)方式
		props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
		//配置topic的序列化類
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		//配置value的序列化類
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*
		 * kafka broker對應的主機,格式為host1:port1,host2:port2
		 */
		props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
		//topic
		String topic = "test7";
		KafkaProducer< String, String> producer = new KafkaProducer< String, String>(props);
		for(int i = 1 ;i <= 100 ; i++) {
			String line = i+" this is a test ";
			ProducerRecord<String,String> record = new ProducerRecord<String,String>(topic,line );
			producer.send(record);
		}
		producer.close();
	}
}
package kafka.demo;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
public class MutilConsumerThread implements Runnable{
	private AtomicBoolean closed = new AtomicBoolean(false);
	KafkaConsumer<String, String> consumer = null;
	String topic = null;
	public MutilConsumerThread(KafkaConsumer<String, String> consumer,List<String> topic) {
		this.consumer=consumer;
		consumer.subscribe(topic);
	}
	public void run() {
		try{
			while(!closed.get()) {
				ConsumerRecords<String, String> records = consumer.poll(1000);
				for(ConsumerRecord<String, String> record: records) {
					//一組consumer的時候每個partition對應的線程是固定的
					System.out.println("Thread-Name:"+Thread.currentThread().getName()+"  "+"partition:"+record.partition()+"  "+record.value());
				}
			}
			
		}catch(WakeupException e ) {
			if(!closed.get()) throw e;
		}finally {
			consumer.close();
		}
	}
	
	public void shutdown() {
		closed.set(true);
		consumer.wakeup();
	}
}
package kafka.demo;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class MutiConsumerTest {
	public static void main(String[] args) throws InterruptedException {
		Properties props = new Properties();
		props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
		props.put("group.id", "group_test7");
		//配置topic的序列化類
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		//配置value的序列化類
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		//自動同步offset
        props.put("enable.auto.commit","true");
        //自動同步offset的時間間隔
        props.put("auto.commit.intervals.ms", "2000");
        //當在zookeeper中發(fā)現(xiàn)要消費的topic沒有或者topic的offset不合法時自動設置為最小值,可以設的值為 latest, earliest, none,默認為largest
        props.put("auto.offset.reset", "earliest ");
        String topic = "test7";
        List<MutilConsumerThread> consumers = new ArrayList<>();
        ExecutorService es = Executors.newFixedThreadPool(3);
        for(int i = 0 ;i<=2;i++) {
        	KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        	MutilConsumerThread cThread = new MutilConsumerThread(consumer,Arrays.asList(topic));
        	consumers.add(cThread);
        	es.submit(cThread);
        }
        
        //Thread.sleep(1000L);
        /* 這個方法的意思就是在JVM中增加一個關閉的鉤子,當JVM關閉的時候,
              會執(zhí)行系統(tǒng)中已經(jīng)設置的所有通過方法addShutdownHook添加的鉤子,當系統(tǒng)執(zhí)行完這些鉤子后,
          JVM才會關閉。所以這些鉤子可以在JVM關閉的時候進行內(nèi)存清理、對象銷毀等操作。*/
        Runtime.getRuntime().addShutdownHook(new Thread() {
			@Override
			public void run() {
				for(MutilConsumerThread consumer :consumers ) {
					consumer.shutdown();
				}
			}
        	
        });
        
	}
}

看完上述內(nèi)容,你們掌握如何解析Kafka 1.0.0 多消費者示例的方法了嗎?如果還想學到更多技能或想了解更多相關內(nèi)容,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!

分享標題:如何解析Kafka1.0.0多消費者示例
URL標題:http://jinyejixie.com/article26/ghoicg.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供App設計商城網(wǎng)站、網(wǎng)站策劃、外貿(mào)網(wǎng)站建設、ChatGPT網(wǎng)站導航

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化
玉溪市| 白玉县| 大英县| 永丰县| 孝义市| 大悟县| 诸暨市| 和龙市| 乌拉特前旗| 岚皋县| 鹿邑县| 大名县| 湘阴县| 台东县| 曲沃县| 广安市| 黄骅市| 安西县| 双城市| 商水县| 漳平市| 棋牌| 福建省| 邛崃市| 宁波市| 安岳县| 临漳县| 古交市| 和林格尔县| 江都市| 太湖县| 沁阳市| 县级市| 乌苏市| 尖扎县| 织金县| 望奎县| 常熟市| 韩城市| 泸州市| 元江|