成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

如何使用Golang語(yǔ)言中的kafka和Sarama

這篇文章給大家介紹如何使用Golang語(yǔ)言中的kafka和Sarama,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

創(chuàng)新互聯(lián)建站服務(wù)項(xiàng)目包括淶水網(wǎng)站建設(shè)、淶水網(wǎng)站制作、淶水網(wǎng)頁(yè)制作以及淶水網(wǎng)絡(luò)營(yíng)銷策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,淶水網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到淶水省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

01、介紹

Apache Kafka 是一款開(kāi)源的消息引擎系統(tǒng)。它在項(xiàng)目中的作用主要是削峰填谷和解耦。本文我們只介紹 Apache Kafka 的 Golang  客戶端庫(kù) Sarama。Sarama 是 MIT 許可的 Apache Kafka 0.8 及更高版本的 Golang 客戶端庫(kù)。

如果讀者朋友對(duì) Apache Kafka 服務(wù)端還不了解,建議先閱讀官方文檔中的入門部分,本文使用的版本是 Apache Kafka 2.8。

如何使用Golang語(yǔ)言中的kafka和Sarama

02、生產(chǎn)者

我們可以使用 Sarama 庫(kù)的 AsyncProducer 或 SyncProducer 生產(chǎn)消息。在大多數(shù)情況下首選使用 AsyncProducer  生產(chǎn)消息。它通過(guò)一個(gè) channel 接收消息,并在后臺(tái)盡可能高效的異步生產(chǎn)消息。

SyncProducer 發(fā)送 Kafka 消息后阻塞,直到接收到 ACK 確認(rèn)。SyncProducer  有兩個(gè)警告:它通常效率較低,并且實(shí)際的耐用性保證取決于 Producer.RequiredAcks 的配置值。在某些配置中,有時(shí)仍會(huì)丟失由  SyncProducer 確認(rèn)的消息,但是使用比較簡(jiǎn)單。

為了讀者朋友們?nèi)菀桌斫?,本文我們介紹 SyncProducer 作為生產(chǎn)者的使用方式。如果讀者朋友想了解 AsyncProducer  作為生產(chǎn)者的使用方式,請(qǐng)參考官方文檔。

使用 SyncProducer 作為生產(chǎn)者的示例代碼:

func sendMessage (brokerAddr []string, config *sarama.Config, topic string, value sarama.Encoder) {  producer, err := sarama.NewSyncProducer(brokerAddr, config)  if err != nil {   fmt.Println(err)   return  }  defer func() {   if err = producer.Close(); err != nil {    fmt.Println(err)    return   }  }()  msg := &sarama.ProducerMessage{   Topic: topic,   Value: value,  }  partition, offset, err := producer.SendMessage(msg)  if err != nil {   fmt.Println(err)   return  }  fmt.Printf("partition:%d offset:%d\n", partition, offset) }

閱讀上面這段代碼,我們調(diào)用 NewSyncProducer() 創(chuàng)建一個(gè)新的 SyncProducer,給定 broker 地址和配置信息。調(diào)用  SendMessage()  生產(chǎn)給定的消息,并且僅在生產(chǎn)成功或失敗時(shí)返回。它將返回分區(qū)(Partition)和生產(chǎn)的消息的偏移量(Offset),如果消息生產(chǎn)失敗,則返回錯(cuò)誤。

需要注意的是,為了避免泄露,必須在生產(chǎn)者上調(diào)用 Close(),因?yàn)楫?dāng)它超出范圍時(shí),可能不會(huì)自動(dòng)垃圾回收。

03、消費(fèi)者

我們可以使用 Sarama 庫(kù)的消費(fèi)者 Consumer 或消費(fèi)者組 ConsumerGroup API  消費(fèi)消息。為了讀者朋友們?nèi)菀桌斫?,本文我們介紹使用 Consumer 消費(fèi)消息。

Consumer 管理 PartitionConsumers,該 PartitionConsumers 處理來(lái)自 brokers 的 Kafka  消息。

Consumer 消費(fèi)消息的示例代碼:

func consumer (brokenAddr []string, topic string, partition int32, offset int64) {  consumer, err := sarama.NewConsumer(brokenAddr, nil)  if err != nil {   fmt.Println(err)   return  }  defer func() {   if err = consumer.Close(); err != nil {    fmt.Println(err)    return   }  }()  partitionConsumer, err := consumer.ConsumePartition(topic, partition, offset)  if err != nil {   fmt.Println(err)   return  }  defer func() {   if err = partitionConsumer.Close(); err != nil {    fmt.Println(err)    return   }  }()  for msg := range partitionConsumer.Messages() {   fmt.Printf("partition:%d offset:%d key:%s val:%s\n", msg.Partition, msg.Offset, msg.Key, msg.Value)  } }

閱讀上面這段代碼,我們調(diào)用 NewConsumer() 創(chuàng)建一個(gè)新的 consumer,給定 broker 地址和配置信息。調(diào)用  ConsumePartition() 創(chuàng)建 PartitionConsumer,給定 topic、partition 和  offset。PartitionConsumer 處理來(lái)自給定 topic 和 partition 的 Kafka 消息。

需要注意的是,為了防止泄露,必須調(diào)用 consumer 和 partitionConsumer 的  Close(),因?yàn)楫?dāng)它超出范圍時(shí),可能不會(huì)自動(dòng)垃圾回收。

關(guān)于如何使用Golang語(yǔ)言中的kafka和Sarama就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

分享名稱:如何使用Golang語(yǔ)言中的kafka和Sarama
文章路徑:http://jinyejixie.com/article46/jjighg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站收錄、Google云服務(wù)器、品牌網(wǎng)站建設(shè)網(wǎng)站維護(hù)、商城網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁(yè)設(shè)計(jì)
承德县| 阳山县| 常熟市| 方正县| 阜康市| 霍州市| 乐清市| 和静县| 星座| 内黄县| 阿拉尔市| 金华市| 南漳县| 祁门县| 射阳县| 凌源市| 阿拉尔市| 新宾| 宜兰市| 永善县| 永春县| 克拉玛依市| 民县| 皮山县| 东明县| 宝坻区| 金川县| 闽清县| 开远市| 嫩江县| 岳阳县| 涿鹿县| 讷河市| 安远县| 马关县| 渝北区| 搜索| 古交市| 孙吴县| 苏尼特右旗| 仙游县|