這篇文章給大家介紹Spring Boot中怎么利用RabbitMQ實(shí)現(xiàn)優(yōu)先級隊(duì)列,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
成都創(chuàng)新互聯(lián)公司是一家專業(yè)從事做網(wǎng)站、成都網(wǎng)站制作、網(wǎng)頁設(shè)計(jì)的品牌網(wǎng)絡(luò)公司。如今是成都地區(qū)具影響力的網(wǎng)站設(shè)計(jì)公司,作為專業(yè)的成都網(wǎng)站建設(shè)公司,成都創(chuàng)新互聯(lián)公司依托強(qiáng)大的技術(shù)實(shí)力、以及多年的網(wǎng)站運(yùn)營經(jīng)驗(yàn),為您提供專業(yè)的成都網(wǎng)站建設(shè)、營銷型網(wǎng)站建設(shè)及網(wǎng)站設(shè)計(jì)開發(fā)服務(wù)!
本地運(yùn)行 RabbitMQ
docker run -d \ --name rabbitmq \ --restart always \ -p 5672:5672 \ -p 15672:15672 \ -e RABBITMQ_DEFAULT_USER=user \ -e RABBITMQ_DEFAULT_PASS=password \ rabbitmq:3-management
訪問可視化面板
地址:http://127.0.0.1:15672/
賬號:user
密碼:password
Spring Boot With RabbitMQ
Spring Boot 集成 RabbitMQ
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
基本參數(shù)配置
# host & port spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672
Queue / Exchange / Routing 配置
/** * RabbitMQ 配置 */ @Configuration public class RabbitMQConfig { private static final String EXCHANGE = "priority-exchange"; public static final String QUEUE = "priority-queue"; private static final String ROUTING_KEY = "priority.queue.#"; /** * 定義優(yōu)先級隊(duì)列 */ @Bean Queue queue() { Map<String, Object> args= new HashMap<>(); args.put("x-max-priority", 100); return new Queue(QUEUE, false, false, false, args); } /** * 定義交換器 */ @Bean TopicExchange exchange() { return new TopicExchange(EXCHANGE); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } }
priority queue 定義參考官方文檔:https://www.rabbitmq.com/priority.html
Spring Boot 應(yīng)用啟動后,會自動創(chuàng)建 Queue 和 Exchange ,并相互綁定,優(yōu)先級隊(duì)列會有如圖所示標(biāo)識。
Spring Boot 相關(guān)配置
# 是否開啟消息發(fā)送到交換器(Exchange)后觸發(fā)回調(diào) spring.rabbitmq.publisher-confirms=false # 是否開啟消息發(fā)送到隊(duì)列(Queue)后觸發(fā)回調(diào) spring.rabbitmq.publisher-returns=false # 消息發(fā)送失敗重試相關(guān)配置 spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=3000ms spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000ms spring.rabbitmq.template.retry.multiplier=1
發(fā)送消息
@Component @AllArgsConstructor public class FileMessageSender { private static final String EXCHANGE = "priority-exchange"; private static final String ROUTING_KEY_PREFIX = "priority.queue."; private final RabbitTemplate rabbitTemplate; /** * 發(fā)送設(shè)置有優(yōu)先級的消息 * * @param priority 優(yōu)先級 */ public void sendPriorityMessage(String content, Integer priority) { rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content, message -> { message.getMessageProperties().setPriority(priority); return message; }); } }
Spring Boot 相關(guān)配置
# 消息接收確認(rèn),可選模式:NONE(不確認(rèn))、AUTO(自動確認(rèn))、MANUAL(手動確認(rèn)) spring.rabbitmq.listener.simple.acknowledge-mode=AUTO # 最小線程數(shù)量 spring.rabbitmq.listener.simple.concurrency=10 # 最大線程數(shù)量 spring.rabbitmq.listener.simple.max-concurrency=10 # 每個消費(fèi)者可能未完成的最大未確認(rèn)消息數(shù)量 spring.rabbitmq.listener.simple.prefetch=1
消費(fèi)者執(zhí)行耗時(shí)較長的話,建議
spring.rabbitmq.listener.simple.prefetch
設(shè)置為較小數(shù)值,讓優(yōu)先級越高的消息更快加入到消費(fèi)者線程。
監(jiān)聽消息
@Slf4j @Component public class MessageListener { /** * 處理消息 */ @RabbitListener(queues = "priority-queue") public void listen(String message) { log.info(message); } }
1、自定義消息發(fā)送確認(rèn)的回調(diào)
配置如下:
# 開啟消息發(fā)送到交換器(Exchange)后觸發(fā)回調(diào) spring.rabbitmq.publisher-confirms=true # 開啟消息發(fā)送到隊(duì)列(Queue)后觸發(fā)回調(diào) spring.rabbitmq.publisher-returns=true
自定義
RabbitTemplate.ConfirmCallback
實(shí)現(xiàn)類
@Slf4j public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息唯一標(biāo)識: {}", correlationData); log.info("確認(rèn)狀態(tài): {}", ack); log.info("造成原因: {}", cause); } }
自定義
RabbitTemplate.ConfirmCallback
實(shí)現(xiàn)類
@Slf4j public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主體: {}", message); log.info("回復(fù)編碼: {}", replyCode); log.info("回復(fù)內(nèi)容: {}", replyText); log.info("交換器: {}", exchange); log.info("路由鍵: {}", routingKey); } }
配置 rabbitTemplate
@Component @AllArgsConstructor public class RabbitTemplateInitializingBean implements InitializingBean { private final RabbitTemplate rabbitTemplate; @Override public void afterPropertiesSet() { rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack()); rabbitTemplate.setReturnCallback(new RabbitReturnCallback()); } }
關(guān)于Spring Boot中怎么利用RabbitMQ實(shí)現(xiàn)優(yōu)先級隊(duì)列就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
文章標(biāo)題:SpringBoot中怎么利用RabbitMQ實(shí)現(xiàn)優(yōu)先級隊(duì)列
文章源于:http://jinyejixie.com/article20/pgedjo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站收錄、網(wǎng)站設(shè)計(jì)公司、企業(yè)網(wǎng)站制作、靜態(tài)網(wǎng)站、網(wǎng)站設(shè)計(jì)、網(wǎng)站導(dǎo)航
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)