成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

SprintBoot如何基于Redis發(fā)布訂閱實(shí)現(xiàn)異步消息系統(tǒng)的同步調(diào)用?

前言

在很多互聯(lián)網(wǎng)應(yīng)用系統(tǒng)中,請求處理異步化是提升系統(tǒng)性能一種常用的手段,而基于消息系統(tǒng)的異步處理由于具備高可靠性、高吞吐量的特點(diǎn),因而在并發(fā)請求量比較高的互聯(lián)網(wǎng)系統(tǒng)中被廣泛應(yīng)用。與此同時(shí),這種方案也帶來了調(diào)用鏈路處理上的問題,因?yàn)榇蟛糠謶?yīng)用請求都會要求同步響應(yīng)實(shí)時(shí)處理結(jié)果,而由于請求的處理過程已經(jīng)通過消息異步解耦,所以整個(gè)調(diào)用鏈路就變成了異步鏈路,此時(shí)請求鏈路的發(fā)起者如何同步拿到響應(yīng)結(jié)果,就需要進(jìn)行額外的系統(tǒng)設(shè)計(jì)考慮。

創(chuàng)新互聯(lián)是一家業(yè)務(wù)范圍包括IDC托管業(yè)務(wù),網(wǎng)頁空間、主機(jī)租用、主機(jī)托管,四川、重慶、廣東電信服務(wù)器租用,移動服務(wù)器托管,成都網(wǎng)通服務(wù)器托管,成都服務(wù)器租用,業(yè)務(wù)范圍遍及中國大陸、港澳臺以及歐美等多個(gè)國家及地區(qū)的互聯(lián)網(wǎng)數(shù)據(jù)服務(wù)公司。

為了更清晰地理解這個(gè)問題,小碼哥以最近正在做的共享單車的IOT系統(tǒng)為例,給大家來一張圖描述下,如圖所示:

Sprint Boot如何基于Redis發(fā)布訂閱實(shí)現(xiàn)異步消息系統(tǒng)的同步調(diào)用?

在上述系統(tǒng)流程中,終端設(shè)備與服務(wù)端之間通過MQTT協(xié)議相連,而MQTT協(xié)議本質(zhì)上是一種異步消息的連接方式,因此業(yè)務(wù)應(yīng)用(如圖中的訂單系統(tǒng))發(fā)起開鎖請求后,IOT應(yīng)用系統(tǒng)會以MQTT協(xié)議的方式通過物聯(lián)網(wǎng)平臺(此處使用的是AWS IOT服務(wù))向設(shè)備發(fā)起開鎖下行消息,而這一過程在IOT應(yīng)用系統(tǒng)完成與物聯(lián)網(wǎng)平臺的交互后同步調(diào)用鏈路就結(jié)束了,因?yàn)槲锫?lián)網(wǎng)平臺與鎖設(shè)備之間通過MQTT消息服務(wù)異步解耦了,當(dāng)然物聯(lián)網(wǎng)平臺會通過一系列可靠消息機(jī)制來確保開鎖消息能夠發(fā)送到指定設(shè)備的監(jiān)聽MQTT隊(duì)列。而至于鎖設(shè)備是否能夠及時(shí)接收到開鎖下行MQTT消息,則取決于鎖設(shè)備當(dāng)時(shí)的移動網(wǎng)絡(luò)情況。

鎖設(shè)備在收到MQTT開鎖消息后,會通過嵌入式軟件系統(tǒng)觸發(fā)硬件設(shè)備完成開鎖動作,之后就需要通過MQTT上行消息將開鎖結(jié)果反饋到服務(wù)端,從而由服務(wù)端系統(tǒng)判斷是否創(chuàng)建騎行訂單并計(jì)算費(fèi)用。這一過程需要物聯(lián)網(wǎng)平臺監(jiān)聽指定鎖設(shè)備相應(yīng)的MQTT上行消息隊(duì)列,由于物聯(lián)網(wǎng)平臺需要與上萬個(gè)設(shè)備進(jìn)行連接,因而不可能將每一個(gè)鎖設(shè)備所產(chǎn)生的MQTT上行消息都直接轉(zhuǎn)發(fā)給Iot應(yīng)用系統(tǒng),因此在物聯(lián)網(wǎng)平臺可以將一類的設(shè)備MQTT消息轉(zhuǎn)發(fā)至特定的業(yè)務(wù)消息隊(duì)列中,例如開鎖上行消息,所有設(shè)備的MQTT開鎖響應(yīng)上行消息都可以轉(zhuǎn)發(fā)到表示開鎖響應(yīng)的Iot業(yè)務(wù)消息隊(duì)列,如“iot_upstream_lock_response”,這樣Iot業(yè)務(wù)系統(tǒng)則不需要再關(guān)注底層設(shè)備的MQTT消息,可以用更利于業(yè)務(wù)理解的方式去處理開鎖響應(yīng)結(jié)果。

現(xiàn)在的問題是通過MQTT協(xié)議的開鎖下行消息、上行消息已經(jīng)完全處于兩條不同的異步網(wǎng)絡(luò)鏈路,而鏈路的發(fā)起者此時(shí)卻需要同步等待開鎖結(jié)果,但是實(shí)際上同步鏈路早已在Iot應(yīng)用系統(tǒng)向物聯(lián)網(wǎng)平臺發(fā)送開鎖消息后就已經(jīng)完成,所以為了滿足調(diào)用方的同步請求/響應(yīng)需要就需要在Iot應(yīng)用系統(tǒng)的下發(fā)開鎖消息后進(jìn)行額外的同步阻塞等待,并監(jiān)聽開鎖響應(yīng)的Iot業(yè)務(wù)消息隊(duì)列“iot_upstream_lock_response”關(guān)于此次開鎖請求的上行消息,之后再結(jié)束掉之前的同步阻塞等待邏輯,從而實(shí)現(xiàn)向業(yè)務(wù)調(diào)用方返回實(shí)時(shí)開鎖響應(yīng)結(jié)果的同步調(diào)用效果。那么在上述流程中如何實(shí)施額外的同步阻塞以及如何進(jìn)行回調(diào)消息的監(jiān)聽呢?在接下來的內(nèi)容中就和大家一起探討具體的實(shí)施方案!

解決方案分析

以上問題在使用消息服務(wù)進(jìn)行異步解耦的應(yīng)用場景中是比較普遍的需求,由于異步調(diào)用鏈路非常長所以通用的解決思路是在調(diào)用鏈的起始端進(jìn)行同步阻塞,而在調(diào)用鏈的結(jié)束端通過回調(diào)的方式來實(shí)現(xiàn),如下圖所示:

Sprint Boot如何基于Redis發(fā)布訂閱實(shí)現(xiàn)異步消息系統(tǒng)的同步調(diào)用?

在上述圖示中,鏈路起始隊(duì)列處在發(fā)送第一次異步消息后會開啟一個(gè)臨時(shí)隊(duì)列并同步阻塞監(jiān)聽該臨時(shí)隊(duì)列的回調(diào)消息,而鏈路的結(jié)束隊(duì)列在完成邏輯處理后需要回調(diào)起始隊(duì)列監(jiān)聽的臨時(shí)隊(duì)列,而由于請求線程一直處于阻塞監(jiān)聽該臨時(shí)隊(duì)列的狀態(tài),所以一旦收到回調(diào)消息就可以結(jié)束阻塞執(zhí)行后續(xù)流程,從而完成整個(gè)鏈路的同步響應(yīng)。
雖然常見的消息中間件都可以實(shí)現(xiàn)以上邏輯,例如小碼哥之前所在的公司就基于RabbitMQ通過臨時(shí)隊(duì)列的方式實(shí)現(xiàn)過消息鏈路的同步調(diào)用,但是基于消息中間件的方式多少還是顯得有些繁瑣,對于常見的消息中間件如RocketMQ、RabbitMQ來說異步消息才是其強(qiáng)項(xiàng),如果以大量臨時(shí)隊(duì)列的創(chuàng)建和銷毀為代價(jià)來實(shí)現(xiàn)消息調(diào)用鏈路的同步,不僅從使用上來說顯得有些麻煩,并且也會對消息中間件的穩(wěn)定性帶來一些不好的影響。

因此在前面提到的IOT系統(tǒng)中,我們采用了基于redis的發(fā)布/訂閱功能來實(shí)現(xiàn)異步消息鏈路的同步化調(diào)用。而由于Redis的高性能以及Redis的應(yīng)用場景非常豐富,并且非常適合數(shù)據(jù)頻繁變動的場景,在系統(tǒng)中既可以作為NoSql數(shù)據(jù)庫來使用,同時(shí)還支持分布式鎖等功能,因而維護(hù)的性價(jià)比也相對較高。接下來我們就基于Spring Boot的開發(fā)框架來演示如何利用Redis的發(fā)布/訂閱來實(shí)現(xiàn)異步消息鏈路的同步回調(diào)!

Redis發(fā)布訂閱機(jī)制

Redis本身可以通過發(fā)布訂閱機(jī)制實(shí)現(xiàn)一定的消息隊(duì)列功能,在Redis中通過subscribe/publish等命令可以實(shí)現(xiàn)發(fā)布訂閱功能,基于此原先的IOT系統(tǒng)處理示意圖如下:

Sprint Boot如何基于Redis發(fā)布訂閱實(shí)現(xiàn)異步消息系統(tǒng)的同步調(diào)用?

如上圖所示,在IOT應(yīng)用端發(fā)送異步MQTT消息后會以消息ID組成的Key作為頻道**,并保持請求線程對該頻道的同步監(jiān)聽**,直到收到Iot業(yè)務(wù)消息隊(duì)列的開鎖結(jié)果上行消息后,在消息隊(duì)列的消費(fèi)端將該上行消息發(fā)布至同樣以消息requestId組成的頻道中,從而實(shí)現(xiàn)基于Redis發(fā)布訂閱機(jī)制的異步消息系統(tǒng)同步調(diào)用效果。

Spring Boot代碼實(shí)現(xiàn)

下面我們基于Spring Boot演示如何通過代碼進(jìn)行實(shí)現(xiàn),創(chuàng)建Spring Boot工程后引入Spring Boot Redis集成依賴包,如下:

 <!--Redis依賴-->
 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
 </dependency>

之后在項(xiàng)目的配置文件中添加Redis服務(wù)連接信息,如下所示:

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password: 123456

此時(shí)項(xiàng)目就具備了訪問Redis的能力,接下來我們通過具體的代碼實(shí)現(xiàn)來進(jìn)行功能演示。訂閱監(jiān)聽代碼如下:

@RestController
@RequestMapping("/iot")
public class IotController {

    //注入Redis消息容器對象
    @Autowired
    RedisMessageListenerContainer redisMessageListenerContainer;

    @RequestMapping(value = "/unLock", method = RequestMethod.POST)
    public boolean unLock(@RequestParam(value = "thingName") String thingName,
            @RequestParam(value = "requestId") String requestId)
            throws InterruptedException, ExecutionException, TimeoutException {

        //此處實(shí)現(xiàn)異步消息調(diào)用處理....

        //生成監(jiān)聽頻道Key
        String key = "IOT_" + thingName + "_" + requestId;
        //創(chuàng)建監(jiān)聽Topic
        ChannelTopic channelTopic = new ChannelTopic(key);
        //創(chuàng)建消息任務(wù)對象
        IotMessageTask iotMessageTask = new IotMessageTask();
        //任務(wù)對象及監(jiān)聽Topic添加到消息監(jiān)聽容器
        try {
            redisMessageListenerContainer.addMessageListener(new IotMessageListener(iotMessageTask), channelTopic);
            System.out.println("start redis subscribe listener->" + key);
            //進(jìn)入同步阻塞等待,超時(shí)時(shí)間設(shè)置為60秒
            Message message = (Message) iotMessageTask.getIotMessageFuture().get(60000, TimeUnit.MILLISECONDS);
            System.out.println("receive redis callback message->" + message.toString());
        } finally {
            //銷毀消息監(jiān)聽對象
            if (iotMessageTask != null) {
                redisMessageListenerContainer.removeMessageListener(iotMessageTask.getMessageListener());
            }
        }
        return true;
    }
}

在上述代碼中,我們模擬了一個(gè)開鎖請求,在完成異步消息處理后會開啟Redis訂閱監(jiān)聽,為了實(shí)現(xiàn)異步阻塞還需要我們創(chuàng)建消息任務(wù)對象,代碼如下:

public class IotMessageTask<T> {

    //聲明線程異步阻塞對象(JDK 1.8新提供Api)
    private CompletableFuture<T> iotMessageFuture = new CompletableFuture<>();

    //聲明消息監(jiān)聽對象
    private MessageListener messageListener;

    //聲明超時(shí)時(shí)間
    private boolean isTimeout;

    public IotMessageTask() {
    }
    public CompletableFuture<T> getIotMessageFuture() {
        return iotMessageFuture;
    }
    public void setIotMessageFuture(CompletableFuture<T> iotMessageFuture) {
        this.iotMessageFuture = iotMessageFuture;
    }
    public MessageListener getMessageListener() {
        return messageListener;
    }
    public void setMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
    }
    public boolean isTimeout() {
        return isTimeout;
    }
    public void setTimeout(boolean timeout) {
        isTimeout = timeout;
    }
}

在消息任務(wù)對象中我們通過JDK1.8新提供的CompletableFuture類實(shí)現(xiàn)線程阻塞效果,并通過定義消息監(jiān)聽對象及超時(shí)時(shí)間完善處理機(jī)制。此外根據(jù)Controller層代碼還需要自定義定義消息監(jiān)聽處理對象,代碼如下:

public class IotMessageListener implements MessageListener {

    IotMessageTask iotMessageTask;

    public IotMessageListener(IotMessageTask iotMessageTask) {
        this.iotMessageTask = iotMessageTask;
    }

    //實(shí)現(xiàn)消息發(fā)布監(jiān)聽處理方法
    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println("subscribe redis iot task response:{}" + message.toString());
        //線程阻塞完成
        iotMessageTask.getIotMessageFuture().complete(message);
    }
}

此時(shí)就完成了Redis服務(wù)訂閱這部分邏輯的編寫,在后續(xù)的邏輯處理中需要完成消息的發(fā)布才能正常結(jié)束此處的阻塞等待,接下來我們寫一段代碼來模擬消息發(fā)布,代碼如下:

@RestController
@RequestMapping("/iot")
public class IotCallBackController {

    //引入Redis客戶端操作對象
    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @RequestMapping(value = "/unLockCallBack", method = RequestMethod.POST)
    public boolean unLockCallBack(@RequestParam(value = "thingName") String thingName,
            @RequestParam(value = "requestId") String requestId) {
        //生成監(jiān)聽頻道Key
        String key = "IOT_" + thingName + "_" + requestId;
        //模擬實(shí)現(xiàn)消息回調(diào)
        stringRedisTemplate.convertAndSend(key, "this is a redis callback");
        return true;
    }
}

此時(shí)啟動Spring Boot應(yīng)用調(diào)用開鎖模擬接口,邏輯就會暫時(shí)處于訂閱等待狀態(tài);之后再模擬調(diào)用開鎖回調(diào)Redis消息發(fā)布邏輯,之前的阻塞等待就會因?yàn)楸O(jiān)聽回調(diào)而完成同步返回。

本文標(biāo)題:SprintBoot如何基于Redis發(fā)布訂閱實(shí)現(xiàn)異步消息系統(tǒng)的同步調(diào)用?
路徑分享:http://jinyejixie.com/article36/pgidpg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設(shè)計(jì)、品牌網(wǎng)站建設(shè)、服務(wù)器托管網(wǎng)站內(nèi)鏈、全網(wǎng)營銷推廣虛擬主機(jī)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都做網(wǎng)站
邛崃市| 客服| 东乌珠穆沁旗| 布拖县| 岳阳县| 衡阳县| 积石山| 芦溪县| 汨罗市| 溆浦县| 台中市| 鄄城县| 宁海县| 天门市| 修文县| 南江县| 长汀县| 额济纳旗| 昌平区| 饶阳县| 饶阳县| 鄂州市| 湘潭县| 自治县| 龙山县| 通海县| 河南省| 南乐县| 和静县| 吴江市| 腾冲县| 开远市| 巴青县| 定边县| 南投市| 松原市| 汤阴县| 鹤山市| 衡东县| 孙吴县| 镶黄旗|