這篇文章給大家介紹如何使用Golang語(yǔ)言中的kafka和Sarama,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。
創(chuàng)新互聯(lián)建站服務(wù)項(xiàng)目包括淶水網(wǎng)站建設(shè)、淶水網(wǎng)站制作、淶水網(wǎng)頁(yè)制作以及淶水網(wǎng)絡(luò)營(yíng)銷策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,淶水網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到淶水省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
Apache Kafka 是一款開(kāi)源的消息引擎系統(tǒng)。它在項(xiàng)目中的作用主要是削峰填谷和解耦。本文我們只介紹 Apache Kafka 的 Golang 客戶端庫(kù) Sarama。Sarama 是 MIT 許可的 Apache Kafka 0.8 及更高版本的 Golang 客戶端庫(kù)。
如果讀者朋友對(duì) Apache Kafka 服務(wù)端還不了解,建議先閱讀官方文檔中的入門部分,本文使用的版本是 Apache Kafka 2.8。
我們可以使用 Sarama 庫(kù)的 AsyncProducer 或 SyncProducer 生產(chǎn)消息。在大多數(shù)情況下首選使用 AsyncProducer 生產(chǎn)消息。它通過(guò)一個(gè) channel 接收消息,并在后臺(tái)盡可能高效的異步生產(chǎn)消息。
SyncProducer 發(fā)送 Kafka 消息后阻塞,直到接收到 ACK 確認(rèn)。SyncProducer 有兩個(gè)警告:它通常效率較低,并且實(shí)際的耐用性保證取決于 Producer.RequiredAcks 的配置值。在某些配置中,有時(shí)仍會(huì)丟失由 SyncProducer 確認(rèn)的消息,但是使用比較簡(jiǎn)單。
為了讀者朋友們?nèi)菀桌斫?,本文我們介紹 SyncProducer 作為生產(chǎn)者的使用方式。如果讀者朋友想了解 AsyncProducer 作為生產(chǎn)者的使用方式,請(qǐng)參考官方文檔。
使用 SyncProducer 作為生產(chǎn)者的示例代碼:
func sendMessage (brokerAddr []string, config *sarama.Config, topic string, value sarama.Encoder) { producer, err := sarama.NewSyncProducer(brokerAddr, config) if err != nil { fmt.Println(err) return } defer func() { if err = producer.Close(); err != nil { fmt.Println(err) return } }() msg := &sarama.ProducerMessage{ Topic: topic, Value: value, } partition, offset, err := producer.SendMessage(msg) if err != nil { fmt.Println(err) return } fmt.Printf("partition:%d offset:%d\n", partition, offset) }
閱讀上面這段代碼,我們調(diào)用 NewSyncProducer() 創(chuàng)建一個(gè)新的 SyncProducer,給定 broker 地址和配置信息。調(diào)用 SendMessage() 生產(chǎn)給定的消息,并且僅在生產(chǎn)成功或失敗時(shí)返回。它將返回分區(qū)(Partition)和生產(chǎn)的消息的偏移量(Offset),如果消息生產(chǎn)失敗,則返回錯(cuò)誤。
需要注意的是,為了避免泄露,必須在生產(chǎn)者上調(diào)用 Close(),因?yàn)楫?dāng)它超出范圍時(shí),可能不會(huì)自動(dòng)垃圾回收。
我們可以使用 Sarama 庫(kù)的消費(fèi)者 Consumer 或消費(fèi)者組 ConsumerGroup API 消費(fèi)消息。為了讀者朋友們?nèi)菀桌斫?,本文我們介紹使用 Consumer 消費(fèi)消息。
Consumer 管理 PartitionConsumers,該 PartitionConsumers 處理來(lái)自 brokers 的 Kafka 消息。
Consumer 消費(fèi)消息的示例代碼:
func consumer (brokenAddr []string, topic string, partition int32, offset int64) { consumer, err := sarama.NewConsumer(brokenAddr, nil) if err != nil { fmt.Println(err) return } defer func() { if err = consumer.Close(); err != nil { fmt.Println(err) return } }() partitionConsumer, err := consumer.ConsumePartition(topic, partition, offset) if err != nil { fmt.Println(err) return } defer func() { if err = partitionConsumer.Close(); err != nil { fmt.Println(err) return } }() for msg := range partitionConsumer.Messages() { fmt.Printf("partition:%d offset:%d key:%s val:%s\n", msg.Partition, msg.Offset, msg.Key, msg.Value) } }
閱讀上面這段代碼,我們調(diào)用 NewConsumer() 創(chuàng)建一個(gè)新的 consumer,給定 broker 地址和配置信息。調(diào)用 ConsumePartition() 創(chuàng)建 PartitionConsumer,給定 topic、partition 和 offset。PartitionConsumer 處理來(lái)自給定 topic 和 partition 的 Kafka 消息。
需要注意的是,為了防止泄露,必須調(diào)用 consumer 和 partitionConsumer 的 Close(),因?yàn)楫?dāng)它超出范圍時(shí),可能不會(huì)自動(dòng)垃圾回收。
關(guān)于如何使用Golang語(yǔ)言中的kafka和Sarama就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。
分享名稱:如何使用Golang語(yǔ)言中的kafka和Sarama
文章路徑:http://jinyejixie.com/article46/jjighg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站收錄、Google、云服務(wù)器、品牌網(wǎng)站建設(shè)、網(wǎng)站維護(hù)、商城網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)