成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

SpringBoot中怎么利用ActiveMQ實現(xiàn)延遲消息

這篇文章將為大家詳細講解有關(guān)SpringBoot中怎么利用ActiveMQ實現(xiàn)延遲消息,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項目有:域名注冊、網(wǎng)絡(luò)空間、營銷軟件、網(wǎng)站建設(shè)、前郭網(wǎng)站維護、網(wǎng)站推廣。

一、安裝activeMQ

Windows下安裝ActiveMQ:

到官網(wǎng)(http://activemq.apache.org/download-archives.html)下載最新發(fā)布的壓縮包(我下的是5.15.9)到本地后解壓(我解壓到D盤Dev目錄下)即可。進入解壓后的bin目錄,我是64位機器,再進入win64目錄后,雙擊activemq.bat啟動:

wrapper | --> Wrapper Started as Consolewrapper | Launching a JVM...jvm 1 | Wrapper (Version 3.2.3) http://wrapper.tanukisoftware.orgjvm 1 | Copyright 1999-2006 Tanuki Software, Inc. All Rights Reserved.jvm 1 |jvm 1 | Java Runtime: Oracle Corporation 1.8.0_181 C:\Program Files\Java\jre1.8.0_181jvm 1 | Heap sizes: current=125952k free=115299k max=932352kjvm 1 | JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=../../conf/broker.ks -Djavax.net.ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path=../../bin/win64 -Dwrapper.key=mChNCWMZ2FoXhZ9g -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=3500 -Dwrapper.version=3.2.3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1jvm 1 | Extensions classpath:jvm 1 | [..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra]jvm 1 | ACTIVEMQ_HOME: ..\..jvm 1 | ACTIVEMQ_BASE: ..\..jvm 1 | ACTIVEMQ_CONF: ..\..\confjvm 1 | ACTIVEMQ_DATA: ..\..\datajvm 1 | Loading message broker from: xbean:activemq.xmljvm 1 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@f0ef68d: startup date [Fri May 24 15:16:21 CST 2019]; root of context hierarchyjvm 1 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadb]jvm 1 | INFO | PListStore:[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\localhost\tmp_storage] startedjvm 1 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:wulf00-51057-1558682182909-0:1) is startingjvm 1 | INFO | Listening for connections at: tcp://wulf00:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600jvm 1 | INFO | Connector openwire startedjvm 1 | INFO | Listening for connections at: amqp://wulf00:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600jvm 1 | INFO | Connector amqp startedjvm 1 | INFO | Listening for connections at: stomp://wulf00:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600jvm 1 | INFO | Connector stomp startedjvm 1 | INFO | Listening for connections at: mqtt://wulf00:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600jvm 1 | INFO | Connector mqtt startedjvm 1 | INFO | Starting Jetty serverjvm 1 | INFO | Creating Jetty connectorjvm 1 | WARN | ServletContext@o.e.j.s.ServletContextHandler@17bc7c8a{/,null,STARTING} has uncovered http methods for path: /jvm 1 | INFO | Listening for connections at ws://wulf00:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600jvm 1 | INFO | Connector ws startedjvm 1 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:wulf00-51057-1558682182909-0:1) startedjvm 1 | INFO | For help or more information please see: http://activemq.apache.orgjvm 1 | WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadb only has 92649 mb of usable space. - resetting to maximum available disk space: 92649 mbjvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpathjvm 1 | INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/jvm 1 | INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/jvm 1 | INFO | Initializing Spring FrameworkServlet 'dispatcher'jvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpathjvm 1 | INFO | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml

默認端口8161,訪問下http://localhost:8161/admin,用戶名密碼都是admin,進入控制臺頁面:

我們用坐上方的Queues來創(chuàng)建一個叫vboxlog的隊列:

二、修改activeMQ配置文件

broker新增配置信息 schedulerSupport="true"

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" > <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" > <!-- The constantPendingMessageLimitStrategy is used to prevent slow topic consumers to block producers and affect other consumers by limiting the number of messages that are retained For more information, see: http://activemq.apache.org/slow-consumer-handling.html --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>

三、創(chuàng)建SpringBoot工程

1、配置ActiveMQ工廠信息,信任包必須配置否則會報錯

package com.example.demoactivemq.config;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.RedeliveryPolicy;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.ArrayList;import java.util.List;/** * @author shanks on 2019-11-12 */@Configurationpublic class ActiveMqConfig { @Bean public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); // 設(shè)置信任序列化包集合 List<String> models = new ArrayList<>(); models.add("com.example.demoactivemq.domain"); factory.setTrustedPackages(models); return factory; }}

消息實體類

package com.example.demoactivemq.domain;import lombok.Builder;import lombok.Data;import java.io.Serializable;/** * @author shanks on 2019-11-12 */@Builder@Datapublic class MessageModel implements Serializable { private String titile; private String message;}

生產(chǎn)者

package com.example.demoactivemq.producer;import lombok.extern.slf4j.Slf4j;import org.apache.activemq.ScheduledMessage;import org.apache.activemq.command.ActiveMQQueue;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.autoconfigure.jms.JmsProperties;import org.springframework.jms.core.JmsMessagingTemplate;import org.springframework.stereotype.Service;import javax.jms.*;import java.io.Serializable;/** * 消息生產(chǎn)者 * * @author shanks */@Service@Slf4jpublic class Producer { public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue"); @Autowired private JmsMessagingTemplate template; /** * 發(fā)送消息 * * @param destination destination是發(fā)送到的隊列 * @param message message是待發(fā)送的消息 */ public <T extends Serializable> void send(Destination destination, T message) { template.convertAndSend(destination, message); } /** * 延時發(fā)送 * * @param destination 發(fā)送的隊列 * @param data 發(fā)送的消息 * @param time 延遲時間 */ public <T extends Serializable> void delaySend(Destination destination, T data, Long time) { Connection connection = null; Session session = null; MessageProducer producer = null; // 獲取連接工廠 ConnectionFactory connectionFactory = template.getConnectionFactory(); try { // 獲取連接 connection = connectionFactory.createConnection(); connection.start(); // 獲取session,true開啟事務(wù),false關(guān)閉事務(wù) session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 創(chuàng)建一個消息隊列 producer = session.createProducer(destination); producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue()); ObjectMessage message = session.createObjectMessage(data); //設(shè)置延遲時間 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); // 發(fā)送消息 producer.send(message); log.info("發(fā)送消息:{}", data); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } }}

消費者

package com.example.demoactivemq.producer;import com.example.demoactivemq.domain.MessageModel;import lombok.extern.slf4j.Slf4j;import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component;/** * 消費者 */@Component@Slf4jpublic class Consumer { @JmsListener(destination = "delay.queue") public void receiveQueue(MessageModel message) { log.info("收到消息:{}", message); }}

application.yml

spring: activemq: broker-url: tcp://localhost:61616

測試類

package com.example.demoactivemq;import com.example.demoactivemq.domain.MessageModel;import com.example.demoactivemq.producer.Producer;import org.junit.jupiter.api.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = DemoActivemqApplication.class)@RunWith(SpringRunner.class)class DemoActivemqApplicationTests { /** * 消息生產(chǎn)者 */ @Autowired private Producer producer; /** * 及時消息隊列測試 */ @Test public void test() { MessageModel messageModel = MessageModel.builder() .message("測試消息") .titile("消息000") .build(); // 發(fā)送消息 producer.send(Producer.DEFAULT_QUEUE, messageModel); } /** * 延時消息隊列測試 */ @Test public void test2() { for (int i = 0; i < 5; i++) { MessageModel messageModel = MessageModel.builder() .titile("延遲10秒執(zhí)行") .message("測試消息" + i) .build(); // 發(fā)送延遲消息 producer.delaySend(Producer.DEFAULT_QUEUE, messageModel, 10000L); } try { // 休眠100秒,等等消息執(zhí)行 Thread.currentThread().sleep(100000L); } catch (InterruptedException e) { e.printStackTrace(); } }}

執(zhí)行結(jié)果

2019-11-12 22:18:52.939 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 發(fā)送消息:MessageModel(titile=延遲10秒執(zhí)行, message=測試消息0)2019-11-12 22:18:52.953 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 發(fā)送消息:MessageModel(titile=延遲10秒執(zhí)行, message=測試消息1)2019-11-12 22:18:52.958 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 發(fā)送消息:MessageModel(titile=延遲10秒執(zhí)行, message=測試消息2)2019-11-12 22:18:52.964 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 發(fā)送消息:MessageModel(titile=延遲10秒執(zhí)行, message=測試消息3)2019-11-12 22:18:52.970 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 發(fā)送消息:MessageModel(titile=延遲10秒執(zhí)行, message=測試消息4)2019-11-12 22:19:03.012 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延遲10秒執(zhí)行, message=測試消息0)2019-11-12 22:19:03.017 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延遲10秒執(zhí)行, message=測試消息1)2019-11-12 22:19:03.019 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延遲10秒執(zhí)行, message=測試消息2)2019-11-12 22:19:03.020 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延遲10秒執(zhí)行, message=測試消息3)2019-11-12 22:19:03.021 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延遲10秒執(zhí)行, message=測試消息4)

關(guān)于SpringBoot中怎么利用ActiveMQ實現(xiàn)延遲消息就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

本文題目:SpringBoot中怎么利用ActiveMQ實現(xiàn)延遲消息
分享路徑:http://jinyejixie.com/article14/jopjde.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站制作、網(wǎng)站設(shè)計公司、靜態(tài)網(wǎng)站、網(wǎng)站改版移動網(wǎng)站建設(shè)、ChatGPT

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站建設(shè)網(wǎng)站維護公司
赤峰市| 西平县| 临邑县| 即墨市| 定结县| 天津市| 张家口市| 黄梅县| 天镇县| 顺义区| 巨鹿县| 咸阳市| 榆林市| 汨罗市| 扬州市| 泽普县| 汉川市| 汾阳市| 安化县| 清原| 沾化县| 黄大仙区| 巨野县| 沁水县| 嘉禾县| 左权县| 盐城市| 隆德县| 长治县| 枞阳县| 博罗县| 宜兰市| 麦盖提县| 漳浦县| 丰顺县| 黄冈市| 洪湖市| 奉化市| 龙口市| 泰安市| 武邑县|