這篇文章主要介紹了redis中如何使用消息隊(duì)列,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
成都創(chuàng)新互聯(lián)堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的廣昌網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
說(shuō)到消息隊(duì)列中間件,我們都會(huì)想到RabbitMQ、RocketMQ和Kafka,來(lái)給應(yīng)用實(shí)現(xiàn)異步消息傳遞的功能。這些都是專業(yè)的消息隊(duì)列中間件,其特性之多超出了我們的理解能力。
而這些消息中間件使用起來(lái)的是復(fù)雜的,例如RabbitMQ,發(fā)消息之前要?jiǎng)?chuàng)建Exchange,還要?jiǎng)?chuàng)建Queue,然后將Exchange和Queue通過(guò)某種規(guī)則綁定起來(lái),發(fā)送消息的時(shí)候還要制定routing-key,還要 控制頭消息。這僅是生產(chǎn)者,消費(fèi)者在消費(fèi)消息之前也要將上面一系列的繁瑣步驟再操作一遍。
那么對(duì)于那些并不要求百分百可靠,并且希望實(shí)現(xiàn)簡(jiǎn)單的消息隊(duì)列需求時(shí),我們可以通過(guò)Redis將我們從消息隊(duì)列的中間件的繁瑣步驟中解脫出來(lái)。
Redis的消息隊(duì)列不是專業(yè)的消息隊(duì)列,他并沒(méi)有消息隊(duì)列中許多的高級(jí)特性,也沒(méi)有ack保證。如果對(duì)消息的可靠性有著極致的追求,請(qǐng)轉(zhuǎn)向?qū)I(yè)的MQ中間件。
從最簡(jiǎn)單的異步消息隊(duì)列開(kāi)始,Redis的list數(shù)據(jù)結(jié)構(gòu)常用來(lái)作為異步消息隊(duì)列,通過(guò)lrpush/lpush來(lái)操作入列,通過(guò)rpop/lpop來(lái)出列。
對(duì)于pop操作來(lái)說(shuō),當(dāng)消息隊(duì)列空了的時(shí)候,客戶端會(huì)陷入pop的死循環(huán),造成大量的浪費(fèi)生命的空輪詢,導(dǎo)致客戶端CPU拉高,同時(shí)Redis的QPS也被拉高。
對(duì)于以上問(wèn)題的解決辦法就是通過(guò)list結(jié)構(gòu)的blpop/brpop來(lái)操作出列,其中b前綴代表的就是blocking,阻塞讀。對(duì)于阻塞讀在隊(duì)列沒(méi)有數(shù)據(jù)的時(shí)候就會(huì)進(jìn)入休眠狀態(tài),一旦數(shù)據(jù)到來(lái)就會(huì)立刻醒來(lái)。完美的解決了上面這個(gè)問(wèn)題。
阻塞讀的方案看似完美,緊接著引出了另外一個(gè)問(wèn)題:空閑連接。 如果線程一直阻塞在哪哪里,Redis的客戶端連接就變成了空閑連接??臻e時(shí)間過(guò)長(zhǎng),Redis服務(wù)器就會(huì)主動(dòng)斷開(kāi)連接,以減少閑置資源占用。這時(shí)候blpop/brpop就會(huì)拋出異常來(lái)。
所以,我們?cè)诰帉?xiě)客戶端(應(yīng)用程序)消費(fèi)者的時(shí)候需要小心,注意捕獲異常,并進(jìn)行重試。
在Redis的分布式鎖中一般有三種策略來(lái)處理加鎖失敗的情況:
直接拋出異常,前端提醒用戶是否要繼續(xù)操作;
sleep一會(huì)再重試;
將請(qǐng)求放到延時(shí)隊(duì)列中,一會(huì)再重試;
而Redis中延時(shí)隊(duì)列,我們可以通過(guò)zset(有序列表)數(shù)據(jù)結(jié)構(gòu)來(lái)實(shí)現(xiàn)。我們將消息序列化作為一個(gè)字符串作為zse的value,而消息的到期處理時(shí)間(延時(shí)時(shí)間)作為score。然后通過(guò)輪詢zset獲取到期時(shí)間進(jìn)行處理,通過(guò)zrem將key從zset移除代表成功消費(fèi),進(jìn)而處理任務(wù)。
核心代碼如下:
// 生產(chǎn)\ public void delay(T msg) {\ TaskItem task = new TaskItem();\ task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid\ task.msg = msg;\ String s = JSON.toJSONString(task); // fastjson 序列化\ jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時(shí)隊(duì)列 ,5s 后再試\ }\ // 消費(fèi)\ public void loop() {\ while (!Thread.interrupted()) {\ // zrangeByScore參數(shù)中0, System.currentTimeMills()代表從redis中去score范圍在0到系統(tǒng)當(dāng)前時(shí)間的數(shù)據(jù), 0,1表示從0開(kāi)始取1個(gè) 拓展傳入的score為-inf, +inf 分別表示zset中的最大值和最小值,當(dāng)你不知道zset中的score最值時(shí)就可以使用inf作為參數(shù)變量\ Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);\ if (values.isEmpty()) {\ try {\ Thread.sleep(500); // 歇會(huì)繼續(xù)\ }\ catch (InterruptedException e) {\ break;\ }\ continue;\ }\ String s = values.iterator().next(); //消費(fèi)隊(duì)列\(zhòng) if (jedis.zrem(queueKey, s) > 0) { // 搶到了,要考慮到多線程下鎖爭(zhēng)搶的情況,只有rem成功代表成功的消費(fèi)了一條消息。\ TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化\ this.handleMsg(task.msg);\ }\ }\ }
以上的代碼在多線程中對(duì)于同一個(gè)任務(wù)被多個(gè)線程爭(zhēng)搶的情況,雖然能夠通過(guò)zrem后在處理任務(wù)來(lái)避免一個(gè)任務(wù)被多次消費(fèi)的情況。但是對(duì)于那些獲取到了任務(wù)但是沒(méi)有成功消費(fèi)的線程來(lái)說(shuō),都是白白浪費(fèi)時(shí)間取了一次任務(wù)。所以可以考慮通過(guò)lua scripting來(lái)優(yōu)化這個(gè)邏輯。將zrangeByScore和zrem一同挪到服務(wù)器進(jìn)行原子操作,就能夠完美解決了。
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Redis中如何使用消息隊(duì)列”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!
網(wǎng)站名稱:Redis中如何使用消息隊(duì)列
網(wǎng)站地址:http://jinyejixie.com/article12/gpecgc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站導(dǎo)航、全網(wǎng)營(yíng)銷推廣、品牌網(wǎng)站設(shè)計(jì)、域名注冊(cè)、企業(yè)建站、微信小程序
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
移動(dòng)網(wǎng)站建設(shè)知識(shí)