成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

SpringBoot中如何使用RabbitMQ延時隊列

這篇文章將為大家詳細(xì)講解有關(guān)SpringBoot中如何使用RabbitMQ延時隊列,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

十余年的邳州網(wǎng)站建設(shè)經(jīng)驗,針對設(shè)計、前端、開發(fā)、售后、文案、推廣等六對一服務(wù),響應(yīng)快,48小時及時工作處理。全網(wǎng)營銷推廣的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動調(diào)整邳州建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計,從而大程度地提升瀏覽體驗。創(chuàng)新互聯(lián)從事“邳州網(wǎng)站設(shè)計”,“邳州網(wǎng)站推廣”以來,每個客戶項目都認(rèn)真落實執(zhí)行。

1.什么是MQ

MQ,是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。

在互聯(lián)網(wǎng)架構(gòu)中,MQ是一種非常常見的上下游“邏輯解耦+物理解耦”的消息通信服務(wù)。

使用了MQ之后,消息發(fā)送上游只需要依賴MQ,不用依賴其他服務(wù)。

為什么會產(chǎn)生消息列隊?

  1. 不同進(jìn)程(process)之間傳遞消息時,兩個進(jìn)程之間耦合程度過高,改動一個進(jìn)程,引發(fā)必須修改另一個進(jìn)程,為了隔離這兩個進(jìn)程,在兩進(jìn)程間抽離出一層(一個模塊),所有兩進(jìn)程之間傳遞的消息,都必須通過消息隊列來傳遞,單獨(dú)修改某一個進(jìn)程,不會影響另一個;  不同進(jìn)程(process)之間傳遞消息時,為了實現(xiàn)標(biāo)準(zhǔn)化,將消息的格式規(guī)范化了,并且,某一個進(jìn)程接受的消息太多,一下子無法處理完,并且也有先后順序,必須對收到的消息進(jìn)行排隊,因此誕生了事實上的消息隊列;

延時列隊的使用場景?

  1. 訂單業(yè)務(wù):在淘寶或者京東購買東西,用戶下單后未付款則30分鐘后取消訂單。  短信通知:手機(jī)用戶交完話費(fèi)后,幾分鐘之內(nèi)將會收到繳費(fèi)信息

2.什么是RabbitMQ(這里就做了一下簡單介紹)

RabbitMQ是一種消息隊列 ,用于常見的進(jìn)程通信。支持點對點,請求應(yīng)答和發(fā)布訂閱模式 并且提供多種語言的支持。常見的java,c#,php都支持。

常被用在異步處理,應(yīng)用解耦。流量消鋒等復(fù)雜的業(yè)務(wù)場景中。和java的kafka一樣都屬于消息中間件。

下載地址:

https://www.rabbitmq.com/download.html

進(jìn)入RabbitMQ官網(wǎng)

1.第一步

第二步

下載好后不要著急安裝RabbitMQ,我們這里還需要安裝Erlang

下載地址:http://www.erlang.org/download/otp_win64_17.3.exe

安裝步驟

步驟一

步驟二

步驟三

步驟四

安裝完成

現(xiàn)在安裝RabbitMQ

步驟一

步驟二

步驟三

安裝完成

啟動RabbitMQ管理工具

開始菜單 — 最新添加 — 展開 — 選中雙擊

輸入命令:rabbitmq-plugins enable rabbitmq_management

效果如果圖

在瀏覽器中輸入地址查看:http://127.0.0.1:15672/

出現(xiàn)次頁面代表成功,默認(rèn)用戶和密碼都是guest/ guest

若不出現(xiàn)此頁面,就是安裝失敗了,不要慌,多半問題在系統(tǒng)用戶名必須是中文(放心有解決辦法):

Windows下安裝RabbitMQ后,按正常RabbitMQ會自動注冊服務(wù)并自動啟動,但是如果有的道友不注意中英文目錄就會出現(xiàn)服務(wù)啟動后幾秒鐘自動停止,而且反反復(fù)復(fù)。

出現(xiàn)這種情況一般都是由我們的用戶名是中文,而導(dǎo)致默認(rèn)的DB和log訪問出現(xiàn)問。所以我建議以后大家在使用windows操作系統(tǒng)的時候盡量用英文來命名文件或目錄,這樣會極大的減小以后安裝軟件出現(xiàn)莫名其妙的問題的bug。

接下來我們先卸載我們的RabbitMQ,然后在我們的系統(tǒng)變量里設(shè)置一個RABBITMQ_BASE 的變量路徑為一個不含英文的路徑 比如 E:\rabbit,最后我們重新安裝RabbitMQ即可,然后就會看到RabbitMQ服務(wù)自動注冊了,并且不會自動停止。

SpringBoot整合RabbitMQ

1.添加依賴

pom.xml中添加 spring-boot-starter-amqp的依賴

<!-- spring-boot-starter-amqp的依賴 -->      <dependency>   <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

其他依賴

<dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>      </dependency>        <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>        <optional>true</optional>      </dependency>        <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-test</artifactId>        <scope>test</scope>        <exclusions>          <exclusion>            <groupId>org.junit.vintage</groupId>            <artifactId>junit-vintage-engine</artifactId>          </exclusion>        </exclusions>      </dependency>            <dependency>        <groupId>junit</groupId>        <artifactId>junit</artifactId>        <version>4.12</version>        <scope>test</scope>      </dependency>

application.yml文件中配置rabbitmq相關(guān)內(nèi)容

spring:  rabbitmq:   host: localhost   port: 5672   username: guest   password: guest

這里我們環(huán)境就搭建起來了

2.具體編碼實現(xiàn)

配置列隊

package com.example.spring_boot_rabbitmq;        import lombok.extern.slf4j.Slf4j;  import org.springframework.amqp.core.*;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;    import java.util.HashMap;  import java.util.Map;    /**   * @author:zq   * @date: Greated in 2019/12/19 11:46   * 配置隊列   */    @Configuration  @Slf4j  public class DelayRabbitConfig {      /**     * 延遲隊列 TTL 名稱     */    private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";    /**     * DLX,dead letter發(fā)送到的 exchange     * 延時消息就是發(fā)送到該交換機(jī)的     */    public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";    /**     * routing key 名稱     * 具體消息發(fā)送在該 routingKey 的     */    public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";      public static final String ORDER_QUEUE_NAME = "user.order.queue";    public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";    public static final String ORDER_ROUTING_KEY = "order";      /**     * 延遲隊列配置     * <p>     * 1、params.put("x-message-ttl", 5 * 1000);     * 第一種方式是直接設(shè)置 Queue 延遲時間 但如果直接給隊列設(shè)置過期時間,這種做法不是很靈活,(當(dāng)然二者是兼容的,默認(rèn)是時間小的優(yōu)先)     * 2、rabbitTemplate.convertAndSend(book, message -> {     * message.getMessageProperties().setExpiration(2 * 1000 + "");     * return message;     * });     * 第二種就是每次發(fā)送消息動態(tài)設(shè)置延遲時間,這樣我們可以靈活控制     **/    @Bean    public Queue delayOrderQueue() {      Map<String, Object> params = new HashMap<>();      // x-dead-letter-exchange 聲明了隊列里的死信轉(zhuǎn)發(fā)到的DLX名稱,      params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);      // x-dead-letter-routing-key 聲明了這些死信在轉(zhuǎn)發(fā)時攜帶的 routing-key 名稱。      params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);      return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);    }    /**     * 需要將一個隊列綁定到交換機(jī)上,要求該消息與一個特定的路由鍵完全匹配。     * 這是一個完整的匹配。如果一個隊列綁定到該交換機(jī)上要求路由鍵 “dog”,則只有被標(biāo)記為“dog”的消息才被轉(zhuǎn)發(fā),     * 不會轉(zhuǎn)發(fā)dog.puppy,也不會轉(zhuǎn)發(fā)dog.guard,只會轉(zhuǎn)發(fā)dog。     * @return DirectExchange     */    @Bean    public DirectExchange orderDelayExchange() {      return new DirectExchange(ORDER_DELAY_EXCHANGE);    }    @Bean    public Binding dlxBinding() {      return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);    }      @Bean    public Queue orderQueue() {      return new Queue(ORDER_QUEUE_NAME, true);    }    /**     * 將路由鍵和某模式進(jìn)行匹配。此時隊列需要綁定要一個模式上。     * 符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。     **/    @Bean    public TopicExchange orderTopicExchange() {      return new TopicExchange(ORDER_EXCHANGE_NAME);    }      @Bean    public Binding orderBinding() {      // TODO 如果要讓延遲隊列之間有關(guān)聯(lián),這里的 routingKey 和 綁定的交換機(jī)很關(guān)鍵      return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);    }    }

創(chuàng)建一個Order實體類

package com.example.spring_boot_rabbitmq.pojo;  import lombok.Data;  import java.io.Serializable;  /**  * @author:zq  * @date: Greated in 2019/12/19 11:49  */ @Data public class Order implements Serializable {   private static final long serialVersionUID = -2221214252163879885L;    private String orderId; // 訂單id    private Integer orderStatus; // 訂單狀態(tài) 0:未支付,1:已支付,2:訂單已取消    private String orderName; // 訂單名字  }

接收者

package com.example.spring_boot_rabbitmq;  import com.example.spring_boot_rabbitmq.pojo.Order; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;  import java.util.Date;  /**  * @author:zq  * @date: Greated in 2019/12/19 11:53  * 接收者  */  @Component @Slf4j public class DelayReceiver {   @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})   public void orderDelayQueue(Order order, Message message, Channel channel) {     log.info("###########################################");     log.info("【orderDelayQueue 監(jiān)聽的消息】 - 【消費(fèi)時間】 - [{}]- 【訂單內(nèi)容】 - [{}]", new Date(), order.toString());     if(order.getOrderStatus() == 0) {       order.setOrderStatus(2);       log.info("【該訂單未支付,取消訂單】" + order.toString());     } else if(order.getOrderStatus() == 1) {       log.info("【該訂單已完成支付】");     } else if(order.getOrderStatus() == 2) {       log.info("【該訂單已取消】");     }     log.info("###########################################");   }  }

發(fā)送者

package com.example.spring_boot_rabbitmq;   import com.example.spring_boot_rabbitmq.pojo.Order; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;  import java.util.Date;  /** * @author:zq * @date: Greated in 2019/12/19 11:55 * 發(fā)送者 */ @Component @Slf4j public class DelaySender {   @Autowired   private AmqpTemplate amqpTemplate;    public void sendDelay(Order order) {     log.info("【訂單生成時間】" + new Date().toString() +"【1分鐘后檢查訂單是否已經(jīng)支付】" + order.toString() );     this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {       // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么這一句也可以省略,具體根據(jù)業(yè)務(wù)需要是聲明 Queue 的時候就指定好延遲時間還是在發(fā)送自己控制時間       message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");       return message;     });   }  }

測試,訪問http://localhost:8080/sendDelay查看日志輸出

package com.example.spring_boot_rabbitmq;import com.example.spring_boot_rabbitmq.pojo.Order;import org.springframework.web.bind.annotation.RestController;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;/** * @author:zq * @date: Greated in 2019/12/19 11:57 * 測試 */@RestControllerpublic class TestController {  @Autowired  private DelaySender delaySender;  @GetMapping("/sendDelay")  public Object sendDelay() {    Order order1 = new Order();    order1.setOrderStatus(0);    order1.setOrderId("123456");    order1.setOrderName("小米6");    Order order2 = new Order();    order2.setOrderStatus(1);    order2.setOrderId("456789");    order2.setOrderName("小米8");    delaySender.sendDelay(order1);    delaySender.sendDelay(order2);    return "ok";  }}

關(guān)于SpringBoot中如何使用RabbitMQ延時隊列就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

當(dāng)前題目:SpringBoot中如何使用RabbitMQ延時隊列
轉(zhuǎn)載來源:http://jinyejixie.com/article14/pggdge.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站排名、微信小程序網(wǎng)頁設(shè)計公司、Google、做網(wǎng)站、商城網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁設(shè)計公司
松江区| 龙岩市| 新泰市| 眉山市| 镇远县| 双城市| 昌宁县| 鸡泽县| 故城县| 昌邑市| 醴陵市| 太仓市| 兴山县| 东台市| 新绛县| 鸡泽县| 普安县| 呈贡县| 南阳市| 苏尼特右旗| 双城市| 陇西县| 邵东县| 巨鹿县| 黎平县| 屏东县| 陆丰市| 湟源县| 梅河口市| 革吉县| 康平县| 封开县| 科尔| 邳州市| 安义县| 永平县| 藁城市| 通化市| 龙江县| 密山市| 广平县|