成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

KafkaTopicshell操作+基準測試+javaAPI-創(chuàng)新互聯(lián)

Kafka Topic shell操作+基準測試+java API
  • 1- Kafka的相關(guān)使用操作

    創(chuàng)新互聯(lián)公司專業(yè)為企業(yè)提供張家港網(wǎng)站建設、張家港做網(wǎng)站、張家港網(wǎng)站設計、張家港網(wǎng)站制作等企業(yè)網(wǎng)站建設、網(wǎng)頁設計與制作、張家港企業(yè)網(wǎng)站模板建站服務,10年張家港做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡服務。
    • shell命令使用
    • Java API的使用
  • 2- Kafka的核心原理:

    • 分片和副本機制
1 消息隊列的基本介紹 1.1 消息隊列產(chǎn)生背景

什么是消息隊列呢?

消息: 數(shù)據(jù) 只不過這個數(shù)據(jù)具有一種流動狀態(tài)
隊列: 存儲數(shù)據(jù)的容器 只不過這個容器具有FIFO(先進先出)特性

消息隊列: 數(shù)據(jù)在隊列中, 從隊列的一端傳遞到另一端的過程, 數(shù)據(jù)在整個隊列中產(chǎn)生一種流動狀態(tài)
1.2 常見的消息隊列的產(chǎn)品

常見的消息隊列的產(chǎn)品:

  • 1- ActiveMQ: 出現(xiàn)時間比較早的一款消息隊列的組件, 目前整個社區(qū)活躍度非常低, 此軟件使用人群也在不斷的減少, 此軟件在前幾年中被Java工程師主要使用
  • 2- RabbitMQ: 目前在Java領域中使用非常頻繁的一款消息隊列的產(chǎn)品, 其社區(qū)活躍度相對不錯, 支持多種語言開發(fā)
  • 3- RocketMQ: 是由阿里推出一款的消息隊列的中間件產(chǎn)品, 目前主要是在阿里系范圍內(nèi)使用, 目前支持的開發(fā)語言相對較少一些, 比如成熟的客戶端還是JAVA
  • 4- Kafka: 是一款大數(shù)據(jù)領域下的消息隊列的產(chǎn)品, 主要應用在大數(shù)據(jù)領域在, 在業(yè)務領域中使用較少
  • 5- Pulsar: 最近一兩年新起的一款消息隊列的組件, 也是Aapache頂級開源項目, 目前主要由StreamNative公司進行商業(yè)運營中
1.3 消息隊列的作用是什么
  • 1- 同步轉(zhuǎn)異步
  • 2- 應用解耦合
  • 3- 流量削峰 : 在秒殺場景中, 突然會有龐大的并發(fā)量, 但是過后就沒有了
  • 4- 消息驅(qū)動系統(tǒng)
1.4 消息隊列的兩種消費模型
  • 點對點: 數(shù)據(jù)被生產(chǎn)到容器后, 最終這個數(shù)據(jù)只能被一個消費方來消費數(shù)據(jù)
  • 發(fā)布訂閱: 數(shù)據(jù)被生產(chǎn)到容器后, 可以被多個消費方同時接收到
2 Kafka的基本介紹

? Kafka是Apache旗下的一款開源免費的消息隊列的中間件產(chǎn)品,最早是由領英公司開發(fā)的, 后期共享給Apache, 目前已經(jīng)是Apache旗下的頂級開源的項目, 采用語言為Scala

? 官方網(wǎng)站: http://www.kafka.apache.org

適用場景: 數(shù)據(jù)傳遞工作, 需要將數(shù)據(jù)從一端傳遞到另一端, 此時可以通過Kafka來實現(xiàn), 不局限兩端的程序

? 在實時領域中, 主要是用于流式的數(shù)據(jù)處理工作

3 Kafka的架構(gòu)
Kafka Cluster: kafka集群
broker: kafka的節(jié)點
producer: 生產(chǎn)者
consumer: 消費者
Topic: 主題/話題 理解就是一個大的邏輯容器(管道)
	shard: 分片. 一個Topic可以被分為N多個分片, 分片的數(shù)量與節(jié)點數(shù)據(jù)沒有關(guān)系
	replicas: 副本, 可以對每一個分片構(gòu)建多個副本, 副本數(shù)量最多和節(jié)點數(shù)量一致(包含本身) 保證數(shù)據(jù)不丟失
zookeeper: 存儲管理集群的元數(shù)據(jù)信息
4 Kafka的安裝操作

參考Kafka的集群安裝文檔 完成整個安裝工作即可

如果安裝后, 無法啟動, 可能遇到的問題:

1) 配置文件中忘記修改broker id 
2) 忘記修改監(jiān)聽的地址, 或者修改了但是前面的注釋沒有打開

如何啟動Kafka集群:

啟動zookeeper集群: 每個節(jié)點都要執(zhí)行
cd /export/server/zookeeper/bin
./zkServer.sh start

啟動完成后 需要通過 jps -m 查看是否啟動 , 并且還需要通過:
./zkServer.sh status 查看狀態(tài), 必須看到一個leader 兩個follower才認為啟動成功了

啟動Kafka集群: 

單節(jié)點: 每個節(jié)點都需要執(zhí)行
cd /export/server/kafka_2.12-2.4.1/bin
前臺啟動:
	./kafka-server-start.sh ../config/server.properties
后臺啟動:
	nohup ./kafka-server-start.sh ../config/server.properties 2>&1 &
注意: 第一次啟動, 建議先前臺啟動, 觀察是否可以正常啟動, 如果OK, ctrl +C 退出, 然后掛載到后臺

如何停止:

單節(jié)點: 每個節(jié)點都需要執(zhí)行
cd /export/server/kafka_2.12-2.4.1/bin
操作:
	jps 然后通過 kill -9
	或者:
	./kafka-server-stop.sh

配置一鍵化腳本: 僅用于啟動Kafka 不會啟動zookeeper, zookeeper還是需要單獨啟動, 或者配置zookeeper的一鍵化腳本

  • 1- 創(chuàng)建一個onekey目錄: node1節(jié)點
mkdir -p /export/onekey
  • 2- 將資料中提供的一鍵化腳本上傳到此目錄下
cd /export/onekey/
上傳即可
  • 3- 對shell腳本賦執(zhí)行的權(quán)限
cd /export/onekey/
chmod 755 *.sh
  • 4- 通過對應的腳本來執(zhí)行啟動和關(guān)閉即可
5 Kafka的相關(guān)使用

? Kafka是一個消息隊列的中間件產(chǎn)品, 主要的作用: 將數(shù)據(jù)從程序一端傳遞到另一端的操作, 所以說學習Kafka主要學習如何使用Kafka生產(chǎn)數(shù)據(jù), 以及如何使用Kafka消費數(shù)據(jù)

5.1 Kafka的shell命令使用
  • 1- 如何創(chuàng)建Topic : kafka-topic.sh
./kafka-topics.sh  --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test01 --partitions 3 --replication-factor 2
  • 2- 查看當前有那些topic:
./kafka-topics.sh  --list  --zookeeper node1:2181,node2:2181,node3:2181
  • 3- 查看某一個Topic的詳細信息:
./kafka-topics.sh --describe --zookeeper  node1:2181,node2:2181,node3:2181 --topic test01
  • 4- 如何刪除Topic
./kafka-topics.sh --delete --zookeeper node1:2181,node2:2181,node3:2181 --topic test01
注意: 
	默認情況下, 刪除一個topic 僅僅是標記刪除, 主要原因: Kafka擔心直接刪除, 會導致誤刪數(shù)據(jù)
	
	如果想執(zhí)行刪除的時候, 直接將topic完整的刪除掉: 此時需要在server.properties配置中修改一下配置為True
		delete.topic.enable=true
	
	如果本身Topic中的數(shù)據(jù)量非常少, 或者沒有任何的使用, 此時Topic會自動先執(zhí)行邏輯刪除, 然后在物理刪除, 不管是否配置delete.topic.enable
  • 5- 如何修改Topic
Topic 僅允許增大分片, 不允許減少分片 同時也不支持修改副本的數(shù)量

增大分區(qū):
./kafka-topics.sh  --alter --zookeeper node1:2181,node2:2181,node3:2181 --topic test01  --partitions 5
  • 6- 如何模擬生產(chǎn)者: 發(fā)送數(shù)據(jù)
./kafka-console-producer.sh  --broker-list node1:9092,node2:9092,node3:9092 --topic test01
>
  • 7- 如何模擬消費者: 消費數(shù)據(jù)
./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test01

默認從當前的時間開始消費數(shù)據(jù), 如果想從頭開始消費, 可以添加 --from-beginning  參數(shù)即可
5.2 Kafka的基準測試

? Kafka的基準測試: 主要指的是將安裝完成的Kafka集群, 進行測試操作, 測試其能否承載多大的并發(fā)量(讀寫的效率)

? 注意: 在進行Kafka的基準測試的時候, 受Topic的分片和副本的數(shù)量影響會比較大, 一般在測試的時候, 會構(gòu)建多個topic, 每一個topic設置不同的分片和副本的數(shù)量, 比如: 一個設置分片多一些, 副本多一些 一個設置分片多一些, 副本少些…

  • 1- 創(chuàng)建一個Topic
./kafka-topics.sh  --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test02 --partitions 6 --replication-factor 1
  • 2- 測試寫入的數(shù)據(jù)的效率:
cd /export/server/kafka/bin
./kafka-producer-perf-test.sh --topic test02 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1

屬性說明:
	--num-records : 發(fā)送的總消息量
	--throughput : 指定吞吐量(限流)  -1 不限制
	--record-size: 每條數(shù)據(jù)的大小(字節(jié))
    --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1 : 設置kafka的地址和消息發(fā)送模式
  • 3- 測試讀取消息的效率
cd /export/server/kafka/bin
./kafka-consumer-perf-test.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test02 --fetch-size 1048576 --messages 5000000

屬性說明:
	--fetch-size : 每次從Kafka端拉取數(shù)據(jù)量
	--message: 測試的總消息量
假設Kafka的節(jié)點數(shù)量是無限多的:
	topic分片數(shù)量越多, 理論上讀寫效率越高
	topic副本數(shù)量越多, 整體執(zhí)行效率越差

一般可以將分片的數(shù)量設置為副本數(shù)量的三倍左右 可以測試出比較最佳的性能  副本調(diào)整為1
5.3 Kafka的Java API的操作
  • 1- 創(chuàng)建一個Maven的項目, 導入相關(guān)的依賴
aliyunhttp://maven.aliyun.com/nexus/content/groups/public/true false neverorg.apache.kafkakafka-clients2.4.1org.apache.commonscommons-io1.3.2org.slf4jslf4j-log4j121.7.6log4jlog4j1.2.16 org.apache.maven.plugins maven-compiler-plugin 3.1  1.8 1.8 
  • 2- 創(chuàng)建兩個包結(jié)構(gòu): com.kafka.producer / com.kafka.consumer
5.3.1 演示如何將數(shù)據(jù)生產(chǎn)到Kafka
package com.itheima.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerTest {public static void main(String[] args) {// 第一步: 創(chuàng)建kafka的生產(chǎn)者核心對象: KafkaProducer  傳入相關(guān)的配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producerproducer = new KafkaProducer<>(props);

        //2. 執(zhí)行發(fā)送數(shù)據(jù)操作
        for (int i = 0; i< 10; i++) {ProducerRecordproducerRecord = new ProducerRecord<>(
                    "test01", "張三"+i
            );
            producer.send(producerRecord);
        }

        //3. 執(zhí)行close 釋放資源
        producer.close();

    }
}
5.3.2 演示如何從Kafka消費數(shù)據(jù)
package com.itheima.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerTest {public static void main(String[] args) {// 1- 創(chuàng)建Kafka的消費者的核心對象: KafkaConsumer
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("group.id", "test"); // 消費者組的ID
        props.put("enable.auto.commit", "true"); // 是否自動提交偏移量offset
        props.put("auto.commit.interval.ms", "1000"); // 自動提交的間隔時間
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key值的反序列化的類
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的值反序列化的類

        KafkaConsumerconsumer = new KafkaConsumer<>(props);
        //2. 訂閱topic: 表示消費者從那個topic來消費數(shù)據(jù)  可以指定多個
        consumer.subscribe(Arrays.asList("test01"));

        while (true) {// 3. 從kafka中獲取消息數(shù)據(jù), 參數(shù)表示當kafka中沒有消息的時候, 等待的超時時間, 如果過了等待的時間, 返回空對象(對象存在, 但是內(nèi)部沒有數(shù)據(jù)  相當于空容器)
            ConsumerRecordsrecords = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecordrecord : records) {long offset = record.offset();
                String key = record.key();
                String value = record.value();
                // 偏移量: 每一條數(shù)據(jù) 其實就是一個偏移量 , 每個分片單獨統(tǒng)計消息到達了第幾個偏移量 偏移量從 0 開始的
                System.out.println("消息的偏移量為:"+offset+"; key值為:"+key + "; value的值為:"+ value);
            }
        }

    }

}

你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

文章標題:KafkaTopicshell操作+基準測試+javaAPI-創(chuàng)新互聯(lián)
文章轉(zhuǎn)載:http://jinyejixie.com/article28/deoicp.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供靜態(tài)網(wǎng)站、關(guān)鍵詞優(yōu)化品牌網(wǎng)站制作、網(wǎng)站營銷云服務器、域名注冊

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作
昂仁县| 鄂尔多斯市| 吉木萨尔县| 开江县| 新巴尔虎右旗| 永宁县| 英超| 虹口区| 玛沁县| 毕节市| 高密市| 呼和浩特市| 宁国市| 太保市| 泰顺县| 荃湾区| 无棣县| 博客| 焦作市| 罗甸县| 扬中市| 武安市| 夏津县| 丹东市| 米泉市| 环江| 湾仔区| 安康市| 元阳县| 屏东县| 丽江市| 旬邑县| 六盘水市| 库车县| 普格县| 雷州市| 宿松县| 沙田区| 大冶市| 英超| 衡阳市|