成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

springkakfa如何集成

小編給大家分享一下spring kakfa如何集成,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

為企業(yè)提供成都網(wǎng)站設計、網(wǎng)站制作、外貿(mào)營銷網(wǎng)站建設、網(wǎng)站優(yōu)化、成都全網(wǎng)營銷推廣、競價托管、品牌運營等營銷獲客服務。創(chuàng)新互聯(lián)建站擁有網(wǎng)絡營銷運營團隊,以豐富的互聯(lián)網(wǎng)營銷經(jīng)驗助力企業(yè)精準獲客,真正落地解決中小企業(yè)營銷獲客難題,做到“讓獲客更簡單”。自創(chuàng)立至今,成功用技術實力解決了企業(yè)“網(wǎng)站建設、網(wǎng)絡品牌塑造、網(wǎng)絡營銷”三大難題,同時降低了營銷成本,提高了有效客戶轉(zhuǎn)化率,獲得了眾多企業(yè)客戶的高度認可!

一、生產(chǎn)端

1.1 kafka-producer.xml配置說明

<!-- spring的屬性加載器,加載多個properties文件中的屬性 ,
 如果只有一個properties文件則用<context />就行了,用了這個加載器過后不用在其他xml中再使用了-->
<bean id="propertyConfigurer"
 class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
        <list>
            <value>classpath:/properties/kafka-producer.properties</value>
        </list>
    </property>
    <property name="fileEncoding" value="utf-8" />
</bean>
<!-- 定義producer的參數(shù) -->
<bean id="producerProperties" class="java.util.HashMap">
    <constructor-arg>
        <map>
            <entry key="bootstrap.servers" value="${bootstrap.servers}" /> //kafka服務集群
            <entry key="group.id" value="${group.id}" /> //分組
            <entry key="retries" value="${retries}" /> //重試次數(shù)
            <entry key="batch.size" value="${batch.size}" /> //批量數(shù)量
            <entry key="linger.ms" value="${linger.ms}" />
            <entry key="buffer.memory" value="${buffer.memory}" />
            <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
            <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
        </map>
    </constructor-arg>
</bean>

<!-- 創(chuàng)建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory"
 class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg ref="producerProperties" />
</bean>

<!-- 創(chuàng)建kafkatemplate -->
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg index="0" ref="producerFactory" />
    <constructor-arg index="1" value="true" />
    <property name="defaultTopic" value="${defaultTopic}" /> //topic名稱
</bean>

<bean id="kafkaProducerServer" class="com.rkhd.ienterprise.kafka.producer.KafkaProducerServer">
    <property name="kafkaTemplate" ref="kafkaTemplate"/>
</bean>

1.2 kafka-producer.properties屬性文件

bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094
group.id=testGroup
retries=1
batch.size=16384
linger.ms=1
buffer.memory=33554432
defaultTopic=topic-test

1.3 生產(chǎn)端接口封裝說明:

1)類名:
com.rkhd.ienterprise.kafka.producer.KafkaProducerServer

2)方法:

/**
 * 發(fā)送信息(不分區(qū))
 * @param data 要發(fā)送的數(shù)據(jù)
 * @return 返回一個map。如果成功code為0,其他則為失敗
 */
public Map<String, Object> sendDefault(Object data);
/**
 * 發(fā)送信息(不分區(qū))
 * @param key 要發(fā)送的鍵
 * @param data 要發(fā)送的數(shù)據(jù)
 * @return 返回一個map。如果成功code為0,其他則為失敗
 */
public Map<String, Object> sendDefault(Object key, Object data);
/**
 * 發(fā)送信息(分區(qū))
 * @param partitionNum 分區(qū)數(shù)(大于1),請注意分區(qū)數(shù)是在topic創(chuàng)建的時候就指定了,不能改變了
 * @param key 要發(fā)送的鍵
 * @param data 要發(fā)送的數(shù)據(jù)
 * @return 返回一個map。如果成功code為0,其他則為失敗
 */
public Map<String, Object> sendDefault(int partitionNum, Object key, Object data);
/**
 * 發(fā)送信息(不分區(qū))
 * @param topic 發(fā)送目的topic名稱,如果topic為null或者是為"",則會使用xml中配置的defaultTopic
 * @param data 要發(fā)送的數(shù)據(jù)
 * @return 返回一個map。如果成功code為0,其他則為失敗
 */
public Map<String, Object> sendMessage(String topic, Object data);
/**
 * 發(fā)送信息(不分區(qū))
 * @param topic 發(fā)送目的topic名稱,如果topic為null或者是為"",則會使用xml中配置的defaultTopic
 * @param key 要發(fā)送的鍵
 * @param data 要發(fā)送的數(shù)據(jù)
 * @return 返回一個map。如果成功code為0,其他則為失敗
 * */
public Map<String, Object> sendMessage(String topic, Object key, Object data);
/**
 * 發(fā)送信息(分區(qū))
 * @param topic 發(fā)送目的topic名稱,如果topic為null或者是為"",則會使用xml中配置的defaultTopic
 * @param partitionNum 分區(qū)數(shù)(大于1),請注意分區(qū)數(shù)是在topic創(chuàng)建的時候就指定了,不能改變了
 * @param data 要發(fā)送的數(shù)據(jù)
 * @return 返回一個map。如果成功code為0,其他則為失敗
 */
public Map<String, Object> sendMessage(String topic, Integer partitionNum, Object data);
/**
 * 發(fā)送信息(分區(qū))
 * @param topic 發(fā)送目的topic名稱,如果topic為null或者是為"",則會使用xml中配置的defaultTopic
 * @param key 要發(fā)送的鍵
 * @param value 要發(fā)送的數(shù)據(jù)
 * @param partitionNum 分區(qū)數(shù)(大于1),請注意分區(qū)數(shù)是在topic創(chuàng)建的時候就指定了,不能改變了
 * @return 返回一個map。如果成功code為0,其他則為失敗
 * */
public Map<String, Object> sendMessage(String topic, int partitionNum, Object key, Object value);

二、消費端

2.1 kafka-consumer.xml配置說明

<!-- 定義consumer的參數(shù) -->
<bean id="consumerProperties" class="java.util.HashMap">
    <constructor-arg>
        <map>
            <entry key="bootstrap.servers" value="${bootstrap.servers}"/> //kafka服務集群
            <entry key="group.id" value="${group.id}"/> //分組
            <entry key="enable.auto.commit" value="${enable.auto.commit}"/> //是否自動提交
            <entry key="auto.commit.interval.ms" value="${auto.commit.interval.ms}"/> //自動提交間隔時間
            <entry key="session.timeout.ms" value="${session.timeout.ms}"/> //session過期時間
            <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
        </map>
    </constructor-arg>
</bean>

<!-- 創(chuàng)建consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
        <ref bean="consumerProperties"/>
    </constructor-arg>
</bean>

<!-- 實際執(zhí)行消息消費的類 -->
<bean id="messageListernerConsumerService" class="com.rkhd.ienterprise.mq.client.consumer.generalFormula.GeneralFormulaConsumer"/>

<!-- 消費者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg value="${topicName}"/>
    <property name="messageListener" ref="messageListernerConsumerService"/>
    <!--<property name="ackMode" value="COUNT"/> //手動提交模式分為三種:(1)模式COUNT:數(shù)量達到COUNT時提交;(2)模式TIME:時間達到TIME;(3)模式COUNT_TIME:數(shù)量達到COUNT或時間達到TIME是提交;
 
<property name="ackCount" value="90"/>-->
 <!--<property name="ackMode" value="TIME"/>
 <property name="ackTime" value="5000"/>-->
</bean>

<!-- 創(chuàng)建單實例KafkaMessageListenerContainer-->
<!--<bean id="messageListenerContainer_trade" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
 init-method="doStart">
 <constructor-arg ref="consumerFactory"/>
 <constructor-arg ref="containerProperties_trade"/>
</bean>-->
<!-- 創(chuàng)建多實例ConcurrentMessageListenerContainer-->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
 init-method="doStart" >
    <constructor-arg ref="consumerFactory"/>
    <constructor-arg ref="containerProperties"/>
    <property name="concurrency" value="1"/> //配置消費端數(shù)量
</bean>

2.2 kafka-consumer.properties屬性文件

bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094
group.id=testGroup
enable.auto.commit=false
auto.commit.interval.ms=1000
session.timeout.ms=15000
topicName=ahao-test

2.3 消費端接口封裝說明

1)類名:com.rkhd.ienterprise.mq.client.consumer.client.KafkaConsumerClient

2)對外提供抽象方法(根據(jù)不同的業(yè)務實現(xiàn)):

public abstract void onConsumer(ConsumerRecord<String, String> record);

3)實現(xiàn)說明:各業(yè)務線通過繼承該類實現(xiàn)該抽象方法;

三、Kafka技術概覽

3.1 Kafka的特性

  • 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒

  • 可擴展性:kafka集群支持熱擴展

  • 持久性、可靠性:消息被持久化到本地磁盤,并且支持數(shù)據(jù)備份防止數(shù)據(jù)丟失

  • 容錯性:允許集群中節(jié)點失?。ㄈ舾北緮?shù)量為n,則允許n-1個節(jié)點失敗)

  • 高并發(fā):支持數(shù)千個客戶端同時讀寫

3.2 Kafka架構(gòu)組件

       Kafka中發(fā)布訂閱的對象是topic。我們可以為每類數(shù)據(jù)創(chuàng)建一個topic,把向topic發(fā)布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和                consumers可以同時從多個topic讀寫數(shù)據(jù)。一個kafka集群由一個或多個broker服務器組成,它負責持久化和備份具體的kafka消息。

  • topic:消息存放的目錄即主題

  • Producer:生產(chǎn)消息到topic的一方

  • Consumer:訂閱topic消費消息的一方

  • Broker:Kafka的服務實例就是一個broker

spring kakfa如何集成

3.3  kafka 應用場景

  • 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統(tǒng)一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。

  • 消息系統(tǒng):解耦和生產(chǎn)者和消費者、緩存消息等。

  • 用戶活動跟蹤:Kafka經(jīng)常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網(wǎng)頁、搜索、點擊等活動,這些活動信息被各個服務器發(fā)布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監(jiān)控分析,或者裝載到hadoop、數(shù)據(jù)倉庫中做離線分析和挖掘。

  • 運營指標:Kafka也經(jīng)常用來記錄運營監(jiān)控數(shù)據(jù)。包括收集各種分布式應用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報警和報告。

  • 流式處理:比如spark streaming和storm

  • 事件源

以上是“spring kakfa如何集成”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學習更多知識,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道!

分享文章:springkakfa如何集成
網(wǎng)頁URL:http://jinyejixie.com/article36/pgijpg.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供移動網(wǎng)站建設、網(wǎng)站營銷、品牌網(wǎng)站設計、品牌網(wǎng)站建設、云服務器、網(wǎng)站收錄

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

手機網(wǎng)站建設
栾川县| 富平县| 江门市| 昭苏县| 禄劝| 左权县| 定远县| 周宁县| 四川省| 周宁县| 五台县| 康马县| 鄢陵县| 新竹市| 拜城县| 丹江口市| 衡阳市| 女性| 武冈市| 嘉黎县| 股票| 汤阴县| 米易县| 浑源县| 阳山县| 玉林市| 瑞丽市| 镇远县| 博湖县| 定结县| 南丹县| 田阳县| 利川市| 青岛市| 绍兴市| 黎川县| 合肥市| 高淳县| 克拉玛依市| 土默特左旗| 茂名市|