現(xiàn)在,我們將發(fā)送一些字符串,把這些字符串當(dāng)作復(fù)雜的任務(wù)。我們并沒有一個(gè)真實(shí)的復(fù)雜任務(wù),類似于圖片大小被調(diào)整或 pdf 文件被渲染,所以我們通過 sleep () 方法來(lái)模擬這種情況。我們?cè)谧址屑由宵c(diǎn)號(hào)(.)來(lái)表示任務(wù)的復(fù)雜程度,一個(gè)點(diǎn)(.)將會(huì)耗時(shí) 1 秒鐘。比如 “Hello…” 就會(huì)耗時(shí) 3 秒鐘。
成都創(chuàng)新互聯(lián)是一家專業(yè)提供東興企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站制作、網(wǎng)站設(shè)計(jì)、H5建站、小程序制作等業(yè)務(wù)。10年已為東興眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)站設(shè)計(jì)公司優(yōu)惠進(jìn)行中。
如果您尚未設(shè)置項(xiàng)目,請(qǐng)參閱第一個(gè)教程中的設(shè)置。我們將遵循與第一個(gè)教程相同的模式:創(chuàng)建一個(gè)包(tut2)并創(chuàng)建 Tut2Config、Tut2Receiver 和 Tut2Sender。
代碼整合
首先創(chuàng)建一個(gè)新的包(tut2),我們將在這里放置我們的三個(gè)類。在配置類 Tut2Config 中,我們?cè)O(shè)置了兩個(gè)配置文件 ——tut2 和 work-queues。我們利用 Spring 來(lái)將隊(duì)列 Queue 暴露為一個(gè) bean。我們配置消費(fèi)者,并定義兩個(gè) bean 以對(duì)應(yīng)于上圖中的工作進(jìn)程 receiver1 和 receiver2。
配置類
@Profile({"tut2", "work-queues"}) @Configuration public class Tut2Config { @Bean public Queue queue() { return new Queue("work-queues"); } /** * 定義兩個(gè)消費(fèi)者,并且給了他們不同的標(biāo)識(shí) */ @Profile ("receiver") private static class ReceiverConfig { @Bean public Tut2Receiver receiver1() { return new Tut2Receiver(1); } @Bean public Tut2Receiver receiver2() { return new Tut2Receiver(2); } } @Profile("sender") @Bean public Tut2Sender sender() { return new Tut2Sender(); } }
生產(chǎn)者
我們簡(jiǎn)單修改一下生產(chǎn)者的代碼,以添加點(diǎn)號(hào)(.)的方式來(lái)人為的增加該任務(wù)的時(shí)長(zhǎng),字符串中的每個(gè)點(diǎn)號(hào)(.)都會(huì)增加 1s 的耗時(shí)。
public class Tut2Sender { @Autowired private AmqpTemplate template; @Autowired private Queue queue; int dots = 0; int count = 0; @Scheduled(fixedDelay = 1000, initialDelay = 500) public void send(){ StringBuilder builder = new StringBuilder("Hello"); if (dots++ == 3) { dots = 1; } for (int i = 0; i < dots; i++) { builder.append('.'); } builder.append(Integer.toString(++count)); String message = builder.toString(); template.convertAndSend(queue.getName(), message); System.out.println(" [x] Sent '" + message + "'"); } }
消費(fèi)者
我們的消費(fèi)者 Tut2Receiver 通過 doWork () 方法模擬了一個(gè)耗時(shí)的虛假任務(wù),它需要為消息體中每一個(gè)點(diǎn)號(hào)(.)模擬 1 秒鐘的操作。并且我們?yōu)橄M(fèi)者增加了一個(gè)實(shí)例編號(hào),以知道是哪個(gè)實(shí)例消費(fèi)了消息和處理的時(shí)長(zhǎng)。
@RebbitListener(queues = "work-queues") public class Tut2Receiver { private int instance; public Tut2Receiver(int instance) { this.instance = instance; } @RabbitHandler public void receive(String in) throws InterruptedException { StopWatch watch = new StopWatch(); watch.start(); System.out.println("instance " + this.instance + " [x] Received '" + in + "'"); doWork(in); watch.stop(); System.out.println("instance " + this.instance + " [x] Done in " + watch.getTotalTimeSeconds() + "s"); } private void doWork(String in) throws InterruptedException { for (char ch : in.toCharArray()) { if (ch == '.') { Thread.sleep(1000); } } } }
運(yùn)行
maven 編譯
mvn clean package -Dmaven.test.skip=true
運(yùn)行
java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut2,sender --tutorial.client.duration=60000
java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut2,receiver --tutorial.client.duration=60000
輸出
// Sender
Ready … running for 10000ms
[x] Sent ‘Hello.1’
[x] Sent ‘Hello…2’
[x] Sent ‘Hello…3’
[x] Sent ‘Hello.4’
[x] Sent ‘Hello…5’
[x] Sent ‘Hello…6’
[x] Sent ‘Hello.7’
[x] Sent ‘Hello…8’
[x] Sent ‘Hello…9’
// Receiver
Ready … running for 10000ms
instance 1 [x] Received ‘Hello.1’
instance 2 [x] Received ‘Hello…2’
instance 1 [x] Done in 1.005s
instance 1 [x] Received ‘Hello…3’
instance 2 [x] Done in 2.007s
instance 2 [x] Received ‘Hello.4’
instance 2 [x] Done in 1.005s
instance 1 [x] Done in 3.01s
instance 1 [x] Received ‘Hello…5’
instance 2 [x] Received ‘Hello…6’
instance 1 [x] Done in 2.006s
instance 1 [x] Received ‘Hello.7’
instance 1 [x] Done in 1.002s
instance 1 [x] Received ‘Hello…9’
instance 2 [x] Done in 3.01s
instance 2 [x] Received ‘Hello…8’
prefetch
從消費(fèi)者這端的輸出可以看出來(lái),instance 1 得到的任務(wù)編號(hào)始終是奇數(shù)(Hello.1,Hello…3,Hello…5,Hello.7),而 instance 2 得到的任務(wù)編號(hào)始終是偶數(shù)。了解springcloud架構(gòu)可以加求求:三五三六二四七二五九
如果感覺這次的輸出只是巧合,可以多試幾次或通過 --tutorial.client.duration= 調(diào)整時(shí)長(zhǎng)得到更多的輸出,而結(jié)果肯定都是一樣的。
這里設(shè)計(jì)的問題就是之前在基礎(chǔ)概念里講到的調(diào)度策略的問題了。要實(shí)現(xiàn)公平調(diào)度(Fair dispatch)就是設(shè)置 prefetch 的值,實(shí)現(xiàn)方式有兩種。
全局設(shè)置
在 application.yml 中設(shè)置 spring.rabbitmq.listener.simple.prefetch=1 即可,這會(huì)影響到本 Spring Boot 應(yīng)用中所有使用默認(rèn) SimpleRabbitListenerContainerFactory 的消費(fèi)者。
網(wǎng)上很多人說改配置 pring.rabbitmq.listener.prefetc,實(shí)測(cè)已經(jīng)無(wú)效,應(yīng)該是版本的問題。我所使用的版本(RabbitMQ:3.7.4,Spring Boot: 2.0.1.RELEASE),除了 spring.rabbitmq.listener.simple.prefetch,還有一個(gè) spring.rabbitmq.listener.direct.prefetch 可以配置。
改了配置后再運(yùn)行,可以看到 instance 1 可以獲取到”Hello…6”、”Hello…12” 了。
Ready … running for 60000ms
instance 1 [x] Received ‘Hello.1’
instance 2 [x] Received ‘Hello…2’
instance 1 [x] Done in 1.004s
instance 1 [x] Received ‘Hello…3’
instance 2 [x] Done in 2.008s
instance 2 [x] Received ‘Hello.4’
instance 2 [x] Done in 1.004s
instance 2 [x] Received ‘Hello…5’
instance 1 [x] Done in 3.012s
instance 1 [x] Received ‘Hello…6’
instance 2 [x] Done in 2.007s
instance 2 [x] Received ‘Hello.7’
instance 2 [x] Done in 1.004s
instance 2 [x] Received ‘Hello…8’
instance 1 [x] Done in 3.011s
instance 1 [x] Received ‘Hello…9’
instance 2 [x] Done in 2.007s
instance 2 [x] Received ‘Hello.10’
instance 2 [x] Done in 1.006s
instance 2 [x] Received ‘Hello…11’
instance 1 [x] Done in 3.01s
instance 1 [x] Received ‘Hello…12’
特定消費(fèi)者
上邊是改了全局的消費(fèi)者,如果只針對(duì)特定的消費(fèi)者的話,又怎么處理呢?
我們可以通過自定義 RabbitListenerContainerFactory 來(lái)實(shí)現(xiàn)。
@Bean public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneRabbitListenerContainerFactory(ConnectionFactory rabbitConnectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(rabbitConnectionFactory); factory.setPrefetchCount(1); return factory; }
然后在特定的消費(fèi)者上指定 containerFactory
@RebbitListener(queues = "hello", containerFactory = "prefetchTenRabbitListenerContainerFactory") public void receive(String in) { System.out.println(" [x] Received '" + in + "'") }
本文題目:JavaSpringboot整合RabbitMQ(二):工作隊(duì)列(Workqueues)-B2B2C小程序電子商務(wù)
文章鏈接:http://jinyejixie.com/article30/gpidpo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站收錄、商城網(wǎng)站、企業(yè)建站、外貿(mào)建站、品牌網(wǎng)站制作、定制開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)