小編給大家分享一下spring kakfa如何集成,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
為企業(yè)提供成都網(wǎng)站設計、網(wǎng)站制作、外貿(mào)營銷網(wǎng)站建設、網(wǎng)站優(yōu)化、成都全網(wǎng)營銷推廣、競價托管、品牌運營等營銷獲客服務。創(chuàng)新互聯(lián)建站擁有網(wǎng)絡營銷運營團隊,以豐富的互聯(lián)網(wǎng)營銷經(jīng)驗助力企業(yè)精準獲客,真正落地解決中小企業(yè)營銷獲客難題,做到“讓獲客更簡單”。自創(chuàng)立至今,成功用技術實力解決了企業(yè)“網(wǎng)站建設、網(wǎng)絡品牌塑造、網(wǎng)絡營銷”三大難題,同時降低了營銷成本,提高了有效客戶轉(zhuǎn)化率,獲得了眾多企業(yè)客戶的高度認可!
1.1 kafka-producer.xml配置說明
<!-- spring的屬性加載器,加載多個properties文件中的屬性 , 如果只有一個properties文件則用<context />就行了,用了這個加載器過后不用在其他xml中再使用了--> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:/properties/kafka-producer.properties</value> </list> </property> <property name="fileEncoding" value="utf-8" /> </bean>
<!-- 定義producer的參數(shù) --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${bootstrap.servers}" /> //kafka服務集群 <entry key="group.id" value="${group.id}" /> //分組 <entry key="retries" value="${retries}" /> //重試次數(shù) <entry key="batch.size" value="${batch.size}" /> //批量數(shù)量 <entry key="linger.ms" value="${linger.ms}" /> <entry key="buffer.memory" value="${buffer.memory}" /> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> </map> </constructor-arg> </bean> <!-- 創(chuàng)建kafkatemplate需要使用的producerfactory bean --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg ref="producerProperties" /> </bean> <!-- 創(chuàng)建kafkatemplate --> <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg index="0" ref="producerFactory" /> <constructor-arg index="1" value="true" /> <property name="defaultTopic" value="${defaultTopic}" /> //topic名稱 </bean> <bean id="kafkaProducerServer" class="com.rkhd.ienterprise.kafka.producer.KafkaProducerServer"> <property name="kafkaTemplate" ref="kafkaTemplate"/> </bean>
1.2 kafka-producer.properties屬性文件
bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094 group.id=testGroup retries=1 batch.size=16384 linger.ms=1 buffer.memory=33554432 defaultTopic=topic-test
1.3 生產(chǎn)端接口封裝說明:
1)類名:
com.rkhd.ienterprise.kafka.producer.KafkaProducerServer
2)方法:
/** * 發(fā)送信息(不分區(qū)) * @param data 要發(fā)送的數(shù)據(jù) * @return 返回一個map。如果成功code為0,其他則為失敗 */ public Map<String, Object> sendDefault(Object data);
/** * 發(fā)送信息(不分區(qū)) * @param key 要發(fā)送的鍵 * @param data 要發(fā)送的數(shù)據(jù) * @return 返回一個map。如果成功code為0,其他則為失敗 */ public Map<String, Object> sendDefault(Object key, Object data);
/** * 發(fā)送信息(分區(qū)) * @param partitionNum 分區(qū)數(shù)(大于1),請注意分區(qū)數(shù)是在topic創(chuàng)建的時候就指定了,不能改變了 * @param key 要發(fā)送的鍵 * @param data 要發(fā)送的數(shù)據(jù) * @return 返回一個map。如果成功code為0,其他則為失敗 */ public Map<String, Object> sendDefault(int partitionNum, Object key, Object data);
/** * 發(fā)送信息(不分區(qū)) * @param topic 發(fā)送目的topic名稱,如果topic為null或者是為"",則會使用xml中配置的defaultTopic * @param data 要發(fā)送的數(shù)據(jù) * @return 返回一個map。如果成功code為0,其他則為失敗 */ public Map<String, Object> sendMessage(String topic, Object data);
/** * 發(fā)送信息(不分區(qū)) * @param topic 發(fā)送目的topic名稱,如果topic為null或者是為"",則會使用xml中配置的defaultTopic * @param key 要發(fā)送的鍵 * @param data 要發(fā)送的數(shù)據(jù) * @return 返回一個map。如果成功code為0,其他則為失敗 * */ public Map<String, Object> sendMessage(String topic, Object key, Object data);
/** * 發(fā)送信息(分區(qū)) * @param topic 發(fā)送目的topic名稱,如果topic為null或者是為"",則會使用xml中配置的defaultTopic * @param partitionNum 分區(qū)數(shù)(大于1),請注意分區(qū)數(shù)是在topic創(chuàng)建的時候就指定了,不能改變了 * @param data 要發(fā)送的數(shù)據(jù) * @return 返回一個map。如果成功code為0,其他則為失敗 */ public Map<String, Object> sendMessage(String topic, Integer partitionNum, Object data);
/** * 發(fā)送信息(分區(qū)) * @param topic 發(fā)送目的topic名稱,如果topic為null或者是為"",則會使用xml中配置的defaultTopic * @param key 要發(fā)送的鍵 * @param value 要發(fā)送的數(shù)據(jù) * @param partitionNum 分區(qū)數(shù)(大于1),請注意分區(qū)數(shù)是在topic創(chuàng)建的時候就指定了,不能改變了 * @return 返回一個map。如果成功code為0,其他則為失敗 * */ public Map<String, Object> sendMessage(String topic, int partitionNum, Object key, Object value);
2.1 kafka-consumer.xml配置說明
<!-- 定義consumer的參數(shù) --> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${bootstrap.servers}"/> //kafka服務集群 <entry key="group.id" value="${group.id}"/> //分組 <entry key="enable.auto.commit" value="${enable.auto.commit}"/> //是否自動提交 <entry key="auto.commit.interval.ms" value="${auto.commit.interval.ms}"/> //自動提交間隔時間 <entry key="session.timeout.ms" value="${session.timeout.ms}"/> //session過期時間 <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> </map> </constructor-arg> </bean> <!-- 創(chuàng)建consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties"/> </constructor-arg> </bean> <!-- 實際執(zhí)行消息消費的類 --> <bean id="messageListernerConsumerService" class="com.rkhd.ienterprise.mq.client.consumer.generalFormula.GeneralFormulaConsumer"/> <!-- 消費者容器配置信息 --> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="${topicName}"/> <property name="messageListener" ref="messageListernerConsumerService"/> <!--<property name="ackMode" value="COUNT"/> //手動提交模式分為三種:(1)模式COUNT:數(shù)量達到COUNT時提交;(2)模式TIME:時間達到TIME;(3)模式COUNT_TIME:數(shù)量達到COUNT或時間達到TIME是提交;
<property name="ackCount" value="90"/>--> <!--<property name="ackMode" value="TIME"/> <property name="ackTime" value="5000"/>--> </bean> <!-- 創(chuàng)建單實例KafkaMessageListenerContainer-->
<!--<bean id="messageListenerContainer_trade" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties_trade"/> </bean>-->
<!-- 創(chuàng)建多實例ConcurrentMessageListenerContainer-->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" > <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties"/> <property name="concurrency" value="1"/> //配置消費端數(shù)量 </bean>
2.2 kafka-consumer.properties屬性文件
bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094 group.id=testGroup enable.auto.commit=false auto.commit.interval.ms=1000 session.timeout.ms=15000 topicName=ahao-test
2.3 消費端接口封裝說明
1)類名:com.rkhd.ienterprise.mq.client.consumer.client.KafkaConsumerClient
2)對外提供抽象方法(根據(jù)不同的業(yè)務實現(xiàn)):
public abstract void onConsumer(ConsumerRecord<String, String> record);
3)實現(xiàn)說明:各業(yè)務線通過繼承該類實現(xiàn)該抽象方法;
3.1 Kafka的特性
高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒
可擴展性:kafka集群支持熱擴展
持久性、可靠性:消息被持久化到本地磁盤,并且支持數(shù)據(jù)備份防止數(shù)據(jù)丟失
容錯性:允許集群中節(jié)點失?。ㄈ舾北緮?shù)量為n,則允許n-1個節(jié)點失敗)
高并發(fā):支持數(shù)千個客戶端同時讀寫
3.2 Kafka架構(gòu)組件
Kafka中發(fā)布訂閱的對象是topic。我們可以為每類數(shù)據(jù)創(chuàng)建一個topic,把向topic發(fā)布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和 consumers可以同時從多個topic讀寫數(shù)據(jù)。一個kafka集群由一個或多個broker服務器組成,它負責持久化和備份具體的kafka消息。
topic:消息存放的目錄即主題
Producer:生產(chǎn)消息到topic的一方
Consumer:訂閱topic消費消息的一方
Broker:Kafka的服務實例就是一個broker
3.3 kafka 應用場景
日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統(tǒng)一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
消息系統(tǒng):解耦和生產(chǎn)者和消費者、緩存消息等。
用戶活動跟蹤:Kafka經(jīng)常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網(wǎng)頁、搜索、點擊等活動,這些活動信息被各個服務器發(fā)布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監(jiān)控分析,或者裝載到hadoop、數(shù)據(jù)倉庫中做離線分析和挖掘。
運營指標:Kafka也經(jīng)常用來記錄運營監(jiān)控數(shù)據(jù)。包括收集各種分布式應用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報警和報告。
流式處理:比如spark streaming和storm
事件源
以上是“spring kakfa如何集成”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學習更多知識,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
分享文章:springkakfa如何集成
網(wǎng)頁URL:http://jinyejixie.com/article36/pgijpg.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供移動網(wǎng)站建設、網(wǎng)站營銷、品牌網(wǎng)站設計、品牌網(wǎng)站建設、云服務器、網(wǎng)站收錄
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)