我們都知道,RocketMQ在代碼級別對連接服務(wù)器進行了限制,基本上可以理解為一個JVM進程中只能連接一個NameServer,但實際應(yīng)用場景中,我們可能會在架構(gòu)設(shè)計層面上對RocketMQ進行了職能上的劃分,規(guī)定了A服務(wù)處理A類消息,而B服務(wù)處理B類消息,這時我們應(yīng)該如何解決這個問題呢?
成都創(chuàng)新互聯(lián)公司-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價比寧江網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式寧江網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋寧江地區(qū)。費用合理售后完善,十年實體公司更值得信賴。我們從代碼層級來分析到底為什么會產(chǎn)生“一個JVM實例只能連接一個NameServer”。
RocketMQ Client有一個核心類MQClientManager
,在我們需要使用MQ Client實例的時候,實際上都是通過它的getAndCreateMQClientInstance
方法進行創(chuàng)建的;名稱比較拗口,同時是Get和Create,這不太符合我們所說的設(shè)計單一性原則,但這不是我們討論的重點,我們看一看這個方法的實現(xiàn)
public MQClientInstance getAndCreateMQClientInstance(ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = (MQClientInstance)this.factoryTable.get(clientId);
if (null == instance) {
instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = (MQClientInstance)this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
代碼不復(fù)雜,我們可以看到它利用客戶的配置信息生成一個固定的clientId,以此去緩存factoryTable中查找,不存在才會創(chuàng)建全新一個實例。
那么,可以理解一個clientID僅能存在一個連接實例了,可這個clientId是怎么產(chǎn)生的呢?繼續(xù)跟蹤看看這段代碼
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
代碼層面上對clientId進行了約定,格式為“ClientIp@InstanceName”格式,當unitName不為空的時候還會在后面加上“@unitName”。
從代碼分析上我們可以知道,為了創(chuàng)建多實例,我們可以
instanceName從哪來的?
instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
從系統(tǒng)屬性中讀取出來的,也就是一般在JVM啟動時設(shè)定的。。。
可以變嗎?當然,你可以通過代碼去做到,但這么做的話,你會失去讓人理解你代碼的能力的,哈哈
這就是為什么多少RocketMQ Client都只能連接一個服務(wù)器的原因了,它根本不考慮服務(wù)器是誰,僅關(guān)心自己,自私的家伙!
除此之外還有其它解決方案嗎?我仔細從網(wǎng)絡(luò)上翻了一輪,沒看到什么好方法,是大家都沒這個場景還是有其它好辦法解決了呢?歡迎大家討論~
在上一篇博文來自平行世界的救贖里面,我做了個工具sandbox,我提供的方法3就是依托于這個工具。
sandbox通過代碼隔離的方式,將另一份類定義放入沙箱中運行,從而實現(xiàn)多個實例完全隔離的效果。MQClientManager
通過緩存方式,以clientId作為key值存儲到自身實例當中,為了實現(xiàn)多個Client,那么前兩種方法的邏輯是修改clientId實現(xiàn)多個實例,而方法3的邏輯則是“既然你的緩存已經(jīng)有這個key,我就換個緩存”,本質(zhì)就是“你這個鍋不裝我,我就換個鍋”。
這里我使用一個springboot項目作為演示案例。
通過springboot的Configuration
將多個RocketMQ Client進行注冊,再定義一個Controller接收不同請求去發(fā)送MQ消息,最后加上啟動類。
我們先從pom文件中引入包(我沒有推上maven倉庫,各位可以從github/gitee上下載),代碼如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>me.van</groupId>
<artifactId>rocket-mq-multi-client-test</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>測試多個rocketmq client共存</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<start-class>me.van.App</start-class>
<java.version>1.8</java.version>
<lombok.version>1.14.8</lombok.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
<dependency>
<groupId>me.van</groupId>
<artifactId>sandbox</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
此處引入了apache的rocketmq-client組件作為mq客戶端,也就是存在前面所說的問題的組件。
package me.van;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
非常的簡單,沒什么好介紹的。
package me.van;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AppConfig {
@Bean(autowire = Autowire.BY_NAME, value = "producer")
MQProducer producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer();
initProducer(producer, "a.io:9876;b.io:9876");
return producer;
}
@Bean(autowire = Autowire.BY_NAME, value = "producer_sandbox1")
MQProducer producerSandbox1() throws MQClientException, SandboxCannotCreateObjectException {
DefaultMQProducer producer = createProducerInSandbox();
initProducer(producer, "x.io:9876;y.io:9876");
return producer;
}
@Bean(autowire = Autowire.BY_NAME, value = "producer_sandbox2")
MQProducer producerSandbox2() throws MQClientException, SandboxCannotCreateObjectException {
DefaultMQProducer producer = createProducerInSandbox();
initProducer(producer, "1.io:9876;2.io:9876");
return producer;
}
private DefaultMQProducer createProducerInSandbox() throws SandboxCannotCreateObjectException {
Sandbox sandbox = new Sandbox("org.apache.rocketmq.client");
return sandbox.createObject(DefaultMQProducer.class);
}
private void initProducer(DefaultMQProducer producer, String namesrvAddr) throws MQClientException {
producer.setNamesrvAddr(namesrvAddr);
producer.setProducerGroup("test-group");
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
producer.start();
}
}
這里可以看到,producer
對象是直接new 出來的DefaultMQProducer
,而producer_sandbox1
和producer_sandbox2
是通過不同的沙箱創(chuàng)建出來的;三個client分別連接到不同的NameServer中,同時其它屬性保持一致。
package me.van;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
MQProducer producer;
@Autowired
MQProducer producer_sandbox1;
@Autowired
MQProducer producer_sandbox2;
@GetMapping("/")
public String hello(){
return "hello world";
}
@GetMapping("/send")
public String send(String msg){
if(null == msg) return "msg is null";
String returnMsg = "";
Message message = new Message("topic-test-multi-mq-client", msg.getBytes());
try {
producer.send(message);
returnMsg += "原生producer發(fā)送完成<br/>";
producer_sandbox1.send(message);
returnMsg += "第一個沙箱內(nèi)producer發(fā)送完成<br/>";
producer_sandbox2.send(message);
returnMsg += "第二個沙箱內(nèi)producer發(fā)送完成<br/>";
} catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) {
returnMsg += "發(fā)送過程出現(xiàn)異常:" + e.getMessage();
}
return returnMsg;
}
}
通過send
方法同時向三個producer
發(fā)送消息。
運行App
,等幾秒鐘啟動完畢,訪問http://localhost:8080/send,返回
msg is null
訪問,http://localhost:8080/send?msg=test
github: https://github.com/vancoo/multi-mq-demo
gitee: https://gitee.com/vancoo/multi-mq-demo
參考文檔: 來自平行世界的救贖
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。
網(wǎng)站題目:如何在同一個Java進程中連接多個RocketMQ服務(wù)器-創(chuàng)新互聯(lián)
文章起源:http://jinyejixie.com/article30/peeso.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈、網(wǎng)站營銷、商城網(wǎng)站、網(wǎng)站導(dǎo)航、虛擬主機、網(wǎng)站收錄
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容