本篇內(nèi)容介紹了“如何利用MQ實(shí)現(xiàn)事務(wù)補(bǔ)償”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
從策劃到設(shè)計(jì)制作,每一步都追求做到細(xì)膩,制作可持續(xù)發(fā)展的企業(yè)網(wǎng)站。為客戶提供成都網(wǎng)站建設(shè)、做網(wǎng)站、網(wǎng)站策劃、網(wǎng)頁設(shè)計(jì)、域名注冊、網(wǎng)頁空間、網(wǎng)絡(luò)營銷、VI設(shè)計(jì)、 網(wǎng)站改版、漏洞修補(bǔ)等服務(wù)。為客戶提供更好的一站式互聯(lián)網(wǎng)解決方案,以客戶的口碑塑造優(yōu)易品牌,攜手廣大客戶,共同發(fā)展進(jìn)步。
rabbitMQ 在互聯(lián)網(wǎng)公司有著大規(guī)模應(yīng)用,本篇將實(shí)戰(zhàn)介紹 springboot 整合 rabbitMQ,同時(shí)也將在具體的業(yè)務(wù)場景中介紹利用 MQ 實(shí)現(xiàn)事務(wù)補(bǔ)償操作。
一、介紹
本篇我們一起來實(shí)操一下SpringBoot整合rabbitMQ,為后續(xù)業(yè)務(wù)處理做鋪墊。
廢話不多說,直奔主題!
二、整合實(shí)戰(zhàn)
2.1、創(chuàng)建一個(gè) maven 工程,引入 amqp 包
<!--amqp 支持--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.2、在全局文件中配置 rabbitMQ 服務(wù)信息
spring.rabbitmq.addresses=197.168.24.206:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
其中,spring.rabbitmq.addresses參數(shù)值為 rabbitmq 服務(wù)器地址
2.3、編寫 rabbitmq 配置類
@Slf4j @Configuration public class RabbitConfig { /** * 初始化連接工廠 * @param addresses * @param userName * @param password * @param vhost * @return */ @Bean ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.addresses}") String addresses, @Value("${spring.rabbitmq.username}") String userName, @Value("${spring.rabbitmq.password}") String password, @Value("${spring.rabbitmq.virtual-host}") String vhost) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses); connectionFactory.setUsername(userName); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); return connectionFactory; } /** * 重新實(shí)例化 RabbitAdmin 操作類 * @param connectionFactory * @return */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } /** * 重新實(shí)例化 RabbitTemplate 操作類 * @param connectionFactory * @return */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); //數(shù)據(jù)轉(zhuǎn)換為json存入消息隊(duì)列 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } /** * 將 RabbitUtil 操作工具類加入IOC容器 * @return */ @Bean public RabbitUtil rabbitUtil(){ return new RabbitUtil(); } }
2.4、編寫 RabbitUtil 工具類
public class RabbitUtil { private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class); @Autowired private RabbitAdmin rabbitAdmin; @Autowired private RabbitTemplate rabbitTemplate; /** * 創(chuàng)建Exchange * @param exchangeName */ public void addExchange(String exchangeType, String exchangeName){ Exchange exchange = createExchange(exchangeType, exchangeName); rabbitAdmin.declareExchange(exchange); } /** * 刪除一個(gè)Exchange * @param exchangeName */ public boolean deleteExchange(String exchangeName){ return rabbitAdmin.deleteExchange(exchangeName); } /** * 創(chuàng)建一個(gè)指定的Queue * @param queueName * @return queueName */ public void addQueue(String queueName){ Queue queue = createQueue(queueName); rabbitAdmin.declareQueue(queue); } /** * 刪除一個(gè)queue * @return queueName * @param queueName */ public boolean deleteQueue(String queueName){ return rabbitAdmin.deleteQueue(queueName); } /** * 按照篩選條件,刪除隊(duì)列 * @param queueName * @param unused 是否被使用 * @param empty 內(nèi)容是否為空 */ public void deleteQueue(String queueName, boolean unused, boolean empty){ rabbitAdmin.deleteQueue(queueName,unused,empty); } /** * 清空某個(gè)隊(duì)列中的消息,注意,清空的消息并沒有被消費(fèi) * @return queueName * @param queueName */ public void purgeQueue(String queueName){ rabbitAdmin.purgeQueue(queueName, false); } /** * 判斷指定的隊(duì)列是否存在 * @param queueName * @return */ public boolean existQueue(String queueName){ return rabbitAdmin.getQueueProperties(queueName) == null ? false : true; } /** * 綁定一個(gè)隊(duì)列到一個(gè)匹配型交換器使用一個(gè)routingKey * @param exchangeType * @param exchangeName * @param queueName * @param routingKey * @param isWhereAll * @param headers EADERS模式類型設(shè)置,其他模式類型傳空 */ public void addBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); rabbitAdmin.declareBinding(binding); } /** * 聲明綁定 * @param binding */ public void addBinding(Binding binding){ rabbitAdmin.declareBinding(binding); } /** * 解除交換器與隊(duì)列的綁定 * @param exchangeType * @param exchangeName * @param queueName * @param routingKey * @param isWhereAll * @param headers */ public void removeBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); removeBinding(binding); } /** * 解除交換器與隊(duì)列的綁定 * @param binding */ public void removeBinding(Binding binding){ rabbitAdmin.removeBinding(binding); } /** * 創(chuàng)建一個(gè)交換器、隊(duì)列,并綁定隊(duì)列 * @param exchangeType * @param exchangeName * @param queueName * @param routingKey * @param isWhereAll * @param headers */ public void andExchangeBindingQueue(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ //聲明交換器 addExchange(exchangeType, exchangeName); //聲明隊(duì)列 addQueue(queueName); //聲明綁定關(guān)系 addBinding(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); } /** * 發(fā)送消息 * @param exchange * @param routingKey * @param object */ public void convertAndSend(String exchange, String routingKey, final Object object){ rabbitTemplate.convertAndSend(exchange, routingKey, object); } /** * 轉(zhuǎn)換Message對(duì)象 * @param messageType * @param msg * @return */ public Message getMessage(String messageType, Object msg){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(messageType); Message message = new Message(msg.toString().getBytes(),messageProperties); return message; } /** * 聲明交換機(jī) * @param exchangeType * @param exchangeName * @return */ private Exchange createExchange(String exchangeType, String exchangeName){ if(ExchangeType.DIRECT.equals(exchangeType)){ return new DirectExchange(exchangeName); } if(ExchangeType.TOPIC.equals(exchangeType)){ return new TopicExchange(exchangeName); } if(ExchangeType.HEADERS.equals(exchangeType)){ return new HeadersExchange(exchangeName); } if(ExchangeType.FANOUT.equals(exchangeType)){ return new FanoutExchange(exchangeName); } return null; } /** * 聲明綁定關(guān)系 * @param exchangeType * @param exchangeName * @param queueName * @param routingKey * @param isWhereAll * @param headers * @return */ private Binding bindingBuilder(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){ if(ExchangeType.DIRECT.equals(exchangeType)){ return BindingBuilder.bind(new Queue(queueName)).to(new DirectExchange(exchangeName)).with(routingKey); } if(ExchangeType.TOPIC.equals(exchangeType)){ return BindingBuilder.bind(new Queue(queueName)).to(new TopicExchange(exchangeName)).with(routingKey); } if(ExchangeType.HEADERS.equals(exchangeType)){ if(isWhereAll){ return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAll(headers).match(); }else{ return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAny(headers).match(); } } if(ExchangeType.FANOUT.equals(exchangeType)){ return BindingBuilder.bind(new Queue(queueName)).to(new FanoutExchange(exchangeName)); } return null; } /** * 聲明隊(duì)列 * @param queueName * @return */ private Queue createQueue(String queueName){ return new Queue(queueName); } /** * 交換器類型 */ public final static class ExchangeType { /** * 直連交換機(jī)(全文匹配) */ public final static String DIRECT = "DIRECT"; /** * 通配符交換機(jī)(兩種通配符:*只能匹配一個(gè)單詞,#可以匹配零個(gè)或多個(gè)) */ public final static String TOPIC = "TOPIC"; /** * 頭交換機(jī)(自定義鍵值對(duì)匹配,根據(jù)發(fā)送消息內(nèi)容中的headers屬性進(jìn)行匹配) */ public final static String HEADERS = "HEADERS"; /** * 扇形(廣播)交換機(jī) (將消息轉(zhuǎn)發(fā)到所有與該交互機(jī)綁定的隊(duì)列上) */ public final static String FANOUT = "FANOUT"; } }
此致, rabbitMQ 核心操作功能操作已經(jīng)開發(fā)完畢!
2.5、編寫隊(duì)列監(jiān)聽類(靜態(tài))
@Slf4j @Configuration public class DirectConsumeListener { /** * 監(jiān)聽指定隊(duì)列,名稱:mq.direct.1 * @param message * @param channel * @throws IOException */ @RabbitListener(queues = "mq.direct.1") public void consume(Message message, Channel channel) throws IOException { log.info("DirectConsumeListener,收到消息: {}", message.toString()); } }
如果你需要監(jiān)聽指定的隊(duì)列,只需要方法上加上@RabbitListener(queues = "")即可,同時(shí)填寫對(duì)應(yīng)的隊(duì)列名稱。
但是,如果你想動(dòng)態(tài)監(jiān)聽隊(duì)列,而不是通過寫死在方法上呢?
請看下面介紹!
2.6、編寫隊(duì)列監(jiān)聽類(動(dòng)態(tài))
重新實(shí)例化一個(gè)SimpleMessageListenerContainer對(duì)象,這個(gè)對(duì)象就是監(jiān)聽容器。
@Slf4j @Configuration public class DynamicConsumeListener { /** * 使用SimpleMessageListenerContainer實(shí)現(xiàn)動(dòng)態(tài)監(jiān)聽 * @param connectionFactory * @return */ @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setMessageListener((MessageListener) message -> { log.info("ConsumerMessageListen,收到消息: {}", message.toString()); }); return container; } }
如果想向SimpleMessageListenerContainer添加監(jiān)聽隊(duì)列或者移除隊(duì)列,只需通過如下方式即可操作。
@Slf4j @RestController @RequestMapping("/consumer") public class ConsumerController { @Autowired private SimpleMessageListenerContainer container; @Autowired private RabbitUtil rabbitUtil; /** * 添加隊(duì)列到監(jiān)聽器 * @param consumerInfo */ @PostMapping("addQueue") public void addQueue(@RequestBody ConsumerInfo consumerInfo) { boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName()); if(!existQueue){ throw new CommonExecption("當(dāng)前隊(duì)列不存在"); } //消費(fèi)mq消息的類 container.addQueueNames(consumerInfo.getQueueName()); //打印監(jiān)聽容器中正在監(jiān)聽到隊(duì)列 log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames())); } /** * 移除正在監(jiān)聽的隊(duì)列 * @param consumerInfo */ @PostMapping("removeQueue") public void removeQueue(@RequestBody ConsumerInfo consumerInfo) { //消費(fèi)mq消息的類 container.removeQueueNames(consumerInfo.getQueueName()); //打印監(jiān)聽容器中正在監(jiān)聽到隊(duì)列 log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames())); } /** * 查詢監(jiān)聽容器中正在監(jiān)聽到隊(duì)列 */ @PostMapping("queryListenerQueue") public void queryListenerQueue() { log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames())); } }
2.7、發(fā)送消息到交換器
發(fā)送消息到交換器,非常簡單,只需要通過如下方式即可!
先編寫一個(gè)請求參數(shù)實(shí)體類
@Data public class ProduceInfo implements Serializable { private static final long serialVersionUID = 1l; /** * 交換器名稱 */ private String exchangeName; /** * 路由鍵key */ private String routingKey; /** * 消息內(nèi)容 */ public String msg; }
編寫接口api
@RestController @RequestMapping("/produce") public class ProduceController { @Autowired private RabbitUtil rabbitUtil; /** * 發(fā)送消息到交換器 * @param produceInfo */ @PostMapping("sendMessage") public void sendMessage(@RequestBody ProduceInfo produceInfo) { rabbitUtil.convertAndSend(produceInfo.getExchangeName(), produceInfo.getRoutingKey(), produceInfo); } }
當(dāng)然,你也可以直接使用rabbitTemplate操作類,來實(shí)現(xiàn)發(fā)送消息。
rabbitTemplate.convertAndSend(exchange, routingKey, message);
參數(shù)內(nèi)容解釋:
exchange:表示交換器名稱
routingKey:表示路由鍵key
message:表示消息
2.8、交換器、隊(duì)列維護(hù)操作
如果想通過接口對(duì) rabbitMQ 中的交換器、隊(duì)列以及綁定關(guān)系進(jìn)行維護(hù),通過如下方式接口操作,即可實(shí)現(xiàn)!
先編寫一個(gè)請求參數(shù)實(shí)體類
@Data public class QueueConfig implements Serializable{ private static final long serialVersionUID = 1l; /** * 交換器類型 */ private String exchangeType; /** * 交換器名稱 */ private String exchangeName; /** * 隊(duì)列名稱 */ private String queueName; /** * 路由鍵key */ private String routingKey; }
編寫接口api
/** * rabbitMQ管理操作控制層 */ @RestController @RequestMapping("/config") public class RabbitController { @Autowired private RabbitUtil rabbitUtil; /** * 創(chuàng)建交換器 * @param config */ @PostMapping("addExchange") public void addExchange(@RequestBody QueueConfig config) { rabbitUtil.addExchange(config.getExchangeType(), config.getExchangeName()); } /** * 刪除交換器 * @param config */ @PostMapping("deleteExchange") public void deleteExchange(@RequestBody QueueConfig config) { rabbitUtil.deleteExchange(config.getExchangeName()); } /** * 添加隊(duì)列 * @param config */ @PostMapping("addQueue") public void addQueue(@RequestBody QueueConfig config) { rabbitUtil.addQueue(config.getQueueName()); } /** * 刪除隊(duì)列 * @param config */ @PostMapping("deleteQueue") public void deleteQueue(@RequestBody QueueConfig config) { rabbitUtil.deleteQueue(config.getQueueName()); } /** * 清空隊(duì)列數(shù)據(jù) * @param config */ @PostMapping("purgeQueue") public void purgeQueue(@RequestBody QueueConfig config) { rabbitUtil.purgeQueue(config.getQueueName()); } /** * 添加綁定 * @param config */ @PostMapping("addBinding") public void addBinding(@RequestBody QueueConfig config) { rabbitUtil.addBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null); } /** * 解除綁定 * @param config */ @PostMapping("removeBinding") public void removeBinding(@RequestBody QueueConfig config) { rabbitUtil.removeBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null); } /** * 創(chuàng)建頭部類型的交換器 * 判斷條件是所有的鍵值對(duì)都匹配成功才發(fā)送到隊(duì)列 * @param config */ @PostMapping("andExchangeBindingQueueOfHeaderAll") public void andExchangeBindingQueueOfHeaderAll(@RequestBody QueueConfig config) { HashMap<String, Object> header = new HashMap<>(); header.put("queue", "queue"); header.put("bindType", "whereAll"); rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, true, header); } /** * 創(chuàng)建頭部類型的交換器 * 判斷條件是只要有一個(gè)鍵值對(duì)匹配成功就發(fā)送到隊(duì)列 * @param config */ @PostMapping("andExchangeBindingQueueOfHeaderAny") public void andExchangeBindingQueueOfHeaderAny(@RequestBody QueueConfig config) { HashMap<String, Object> header = new HashMap<>(); header.put("queue", "queue"); header.put("bindType", "whereAny"); rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, false, header); } }
至此,rabbitMQ 管理器基本的 crud 全部開發(fā)完成!
三、利用 MQ 實(shí)現(xiàn)事務(wù)補(bǔ)償
當(dāng)然,我們花了這么大的力氣,絕不僅僅是為了將 rabbitMQ 通過 web 項(xiàng)目將其管理起來,最重要的是能投入業(yè)務(wù)使用中去!
上面的操作只是告訴我們怎么使用 rabbitMQ!
當(dāng)你仔細(xì)回想整個(gè)過程的時(shí)候,其實(shí)還是回到最初那個(gè)問題,什么時(shí)候使用 MQ ?
以常見的訂單系統(tǒng)為例,用戶點(diǎn)擊【下單】按鈕之后的業(yè)務(wù)邏輯可能包括:支付訂單、扣減庫存、生成相應(yīng)單據(jù)、發(fā)紅包、發(fā)短信通知等等。
在業(yè)務(wù)發(fā)展初期這些邏輯可能放在一起同步執(zhí)行,隨著業(yè)務(wù)的發(fā)展訂單量增長,需要提升系統(tǒng)服務(wù)的性能,這時(shí)可以將一些不需要立即生效的操作拆分出來異步執(zhí)行,比如發(fā)放紅包、發(fā)短信通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應(yīng)單據(jù))完成之后發(fā)送一條消息到 MQ 讓主流程快速完結(jié),而由另外的單獨(dú)線程拉取 MQ 的消息(或者由 MQ 推送消息),當(dāng)發(fā)現(xiàn) MQ 中有發(fā)紅包或發(fā)短信之類的消息時(shí),執(zhí)行相應(yīng)的業(yè)務(wù)邏輯。
這種是利用 MQ 實(shí)現(xiàn)業(yè)務(wù)解耦,其它的場景包括最終一致性、廣播、錯(cuò)峰流控等等。
利用 MQ 實(shí)現(xiàn)業(yè)務(wù)解耦的過程其實(shí)也很簡單。
當(dāng)主流程結(jié)束之后,將消息推送到發(fā)紅包、發(fā)短信交換器中即可
@Service public class OrderService { @Autowired private RabbitUtil rabbitUtil; /** * 創(chuàng)建訂單 * @param order */ @Transactional public void createOrder(Order order){ //1、創(chuàng)建訂單 //2、調(diào)用庫存接口,減庫存 //3、向客戶發(fā)放紅包 rabbitUtil.convertAndSend("exchange.send.bonus", null, order); //4、發(fā)短信通知 rabbitUtil.convertAndSend("exchange.sms.message", null, order); } }
監(jiān)聽發(fā)紅包操作
/** * 監(jiān)聽發(fā)紅包 * @param message * @param channel * @throws IOException */ @RabbitListener(queues = "exchange.send.bonus") public void consume(Message message, Channel channel) throws IOException { String msgJson = new String(message.getBody(),"UTF-8"); log.info("收到消息: {}", message.toString()); //調(diào)用發(fā)紅包接口 }
監(jiān)聽發(fā)短信操作
/** * 監(jiān)聽發(fā)短信 * @param message * @param channel * @throws IOException */ @RabbitListener(queues = "exchange.sms.message") public void consume(Message message, Channel channel) throws IOException { String msgJson = new String(message.getBody(),"UTF-8"); log.info("收到消息: {}", message.toString()); //調(diào)用發(fā)短信接口 }
既然 MQ 這么好用,那是不是完全可以將以前的業(yè)務(wù)也按照整個(gè)模型進(jìn)行拆分呢?
答案顯然不是!
當(dāng)引入 MQ 之后業(yè)務(wù)的確是解耦了,但是當(dāng) MQ 一旦掛了,所有的服務(wù)基本都掛了,是不是很可怕!
但是沒關(guān)系,俗話說,兵來將擋、水來土掩,這句話同樣適用于 IT 開發(fā)者,有坑填坑!
“如何利用MQ實(shí)現(xiàn)事務(wù)補(bǔ)償”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
本文名稱:如何利用MQ實(shí)現(xiàn)事務(wù)補(bǔ)償
網(wǎng)頁地址:http://jinyejixie.com/article24/igoeje.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供移動(dòng)網(wǎng)站建設(shè)、企業(yè)建站、響應(yīng)式網(wǎng)站、面包屑導(dǎo)航、定制網(wǎng)站、手機(jī)網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)