成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

使用SpringCloudStream怎么實(shí)現(xiàn)服務(wù)間的通訊

這篇文章將為大家詳細(xì)講解有關(guān)使用SpringCloudStream怎么實(shí)現(xiàn)服務(wù)間的通訊,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

靖宇網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)建站,靖宇網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為靖宇1000+提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)營(yíng)銷網(wǎng)站建設(shè)要多少錢,請(qǐng)找那個(gè)售后服務(wù)好的靖宇做網(wǎng)站的公司定做!

Spring Cloud Stream

Srping cloud Bus的底層實(shí)現(xiàn)就是Spring Cloud Stream,Spring Cloud Stream的目的是用于構(gòu)建基于消息驅(qū)動(dòng)(或事件驅(qū)動(dòng))的微服務(wù)架構(gòu)。Spring Cloud Stream本身對(duì)Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot Externalized Configuration等模塊進(jìn)行封裝(整合)和擴(kuò)展,下面我們實(shí)現(xiàn)兩個(gè)服務(wù)之間的通訊來(lái)演示Spring Cloud Stream的使用方法。

整體概述

服務(wù)要想與其他服務(wù)通訊要定義通道,一般會(huì)定義輸出通道和輸入通道,輸出通道用于發(fā)送消息,輸入通道用于接收消息,每個(gè)通道都會(huì)有個(gè)名字(輸入和輸出只是通道類型,可以用不同的名字定義很多很多通道),不同通道的名字不能相同否則會(huì)報(bào)錯(cuò)(輸入通道和輸出通道不同類型的通道名稱也不能相同),綁定器是操作RabbitMQ或Kafka的抽象層,為了屏蔽操作這些消息中間件的復(fù)雜性和不一致性,綁定器會(huì)用通道的名字在消息中間件中定義主題,一個(gè)主題內(nèi)的消息生產(chǎn)者來(lái)自多個(gè)服務(wù),一個(gè)主題內(nèi)消息的消費(fèi)者也是多個(gè)服務(wù),也就是說(shuō)消息的發(fā)布和消費(fèi)是通過(guò)主題進(jìn)行定義和組織的,通道的名字就是主題的名字,在RabbitMQ中主題使用Exchanges實(shí)現(xiàn),在Kafka中主題使用Topic實(shí)現(xiàn)。

準(zhǔn)備環(huán)境

創(chuàng)建兩個(gè)項(xiàng)目spring-cloud-stream-a和spring-cloud-stream-b,spring-cloud-stream-a我們用Spring Cloud Stream實(shí)現(xiàn)通訊,spring-cloud-stream-b我們用Spring Cloud Stream的底層模塊Spring Integration實(shí)現(xiàn)通訊。

兩個(gè)項(xiàng)目的POM文件依賴都是:

<dependencies>    <dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-stream</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-stream-binder-rabbit</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-test</artifactId>      <scope>test</scope>    </dependency>    <dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-stream-test-support</artifactId>      <scope>test</scope>    </dependency>  </dependencies>

spring-cloud-stream-binder-rabbit是指綁定器的實(shí)現(xiàn)使用RabbitMQ。

項(xiàng)目配置內(nèi)容application.properties:

spring.application.name=spring-cloud-stream-aserver.port=9010#設(shè)置默認(rèn)綁定器spring.cloud.stream.defaultBinder = rabbitspring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest

spring.application.name=spring-cloud-stream-bserver.port=9011#設(shè)置默認(rèn)綁定器spring.cloud.stream.defaultBinder = rabbitspring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest

啟動(dòng)一個(gè)rabbitmq:

docker pull rabbitmq:3-managementdocker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

編寫(xiě)A項(xiàng)目代碼

在A項(xiàng)目中定義一個(gè)輸入通道一個(gè)輸出通道,定義通道在接口中使用@Input和@Output注解定義,程序啟動(dòng)的時(shí)候Spring Cloud Stream會(huì)根據(jù)接口定義將實(shí)現(xiàn)類自動(dòng)注入(Spring Cloud Stream自動(dòng)實(shí)現(xiàn)該接口不需要寫(xiě)代碼)。

A服務(wù)輸入通道,通道名稱ChatExchanges.A.Input,接口定義輸入通道必須返回SubscribableChannel:

public interface ChatInput {  String INPUT = "ChatExchanges.A.Input";  @Input(ChatInput.INPUT)  SubscribableChannel input();}

A服務(wù)輸出通道,通道名稱ChatExchanges.A.Output,輸出通道必須返回MessageChannel:

public interface ChatOutput {  String OUTPUT = "ChatExchanges.A.Output";  @Output(ChatOutput.OUTPUT)  MessageChannel output();}

定義消息實(shí)體類:

public class ChatMessage implements Serializable {  private String name;  private String message;  private Date chatDate;  //沒(méi)有無(wú)參數(shù)的構(gòu)造函數(shù)并行化會(huì)出錯(cuò)  private ChatMessage(){}  public ChatMessage(String name,String message,Date chatDate){    this.name = name;    this.message = message;    this.chatDate = chatDate;  }  public String getName(){    return this.name;  }  public String getMessage(){    return this.message;  }  public Date getChatDate() { return this.chatDate; }  public String ShowMessage(){    return String.format("聊天消息:%s的時(shí)候,%s說(shuō)%s。",this.chatDate,this.name,this.message);  }}

在業(yè)務(wù)處理類上用@EnableBinding注解綁定輸入通道和輸出通道,這個(gè)綁定動(dòng)作其實(shí)就是創(chuàng)建并注冊(cè)輸入和輸出通道的實(shí)現(xiàn)類到Bean中,所以可以直接是使用@Autowired進(jìn)行注入使用,另外消息的串行化默認(rèn)使用application/json格式(com.fastexml.jackson),最后用@StreamListener注解進(jìn)行指定通道消息的監(jiān)聽(tīng):

//ChatInput.class的輸入通道不在這里綁定,監(jiān)聽(tīng)到數(shù)據(jù)會(huì)找不到AClient類的引用。//Input和Output通道定義的名字不能一樣,否則程序啟動(dòng)會(huì)拋異常。@EnableBinding({ChatOutput.class,ChatInput.class})public class AClient {  private static Logger logger = LoggerFactory.getLogger(AClient.class);  @Autowired  private ChatOutput chatOutput;  //StreamListener自帶了Json轉(zhuǎn)對(duì)象的能力,收到B的消息打印并回復(fù)B一個(gè)新的消息。  @StreamListener(ChatInput.INPUT)  public void PrintInput(ChatMessage message) {    logger.info(message.ShowMessage());    ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());    chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build());  }}

到此A項(xiàng)目代碼編寫(xiě)完成。

編寫(xiě)B(tài)項(xiàng)目代碼

B項(xiàng)目使用Spring Integration實(shí)現(xiàn)消息的發(fā)布和消費(fèi),定義通道時(shí)我們要交換輸入通道和輸出通道的名稱:

public interface ChatProcessor {  String OUTPUT = "ChatExchanges.A.Input";  String INPUT = "ChatExchanges.A.Output";  @Input(ChatProcessor.INPUT)  SubscribableChannel input();  @Output(ChatProcessor.OUTPUT)  MessageChannel output();}

消息實(shí)體類:

public class ChatMessage {  private String name;  private String message;  private Date chatDate;  //沒(méi)有無(wú)參數(shù)的構(gòu)造函數(shù)并行化會(huì)出錯(cuò)  private ChatMessage(){}  public ChatMessage(String name,String message,Date chatDate){    this.name = name;    this.message = message;    this.chatDate = chatDate;  }  public String getName(){    return this.name;  }  public String getMessage(){    return this.message;  }  public Date getChatDate() { return this.chatDate; }  public String ShowMessage(){    return String.format("聊天消息:%s的時(shí)候,%s說(shuō)%s。",this.chatDate,this.name,this.message);  }}

業(yè)務(wù)處理類用@ServiceActivator注解代替@StreamListener,用@InboundChannelAdapter注解發(fā)布消息:

@EnableBinding(ChatProcessor.class)public class BClient {  private static Logger logger = LoggerFactory.getLogger(BClient.class);  //@ServiceActivator沒(méi)有Json轉(zhuǎn)對(duì)象的能力需要借助@Transformer注解  @ServiceActivator(inputChannel=ChatProcessor.INPUT)  public void PrintInput(ChatMessage message) {    logger.info(message.ShowMessage());  }  @Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT)  public ChatMessage transform(String message) throws Exception{    ObjectMapper objectMapper = new ObjectMapper();    return objectMapper.readValue(message,ChatMessage.class);  }  //每秒發(fā)出一個(gè)消息給A  @Bean  @InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))  public GenericMessage<ChatMessage> SendChatMessage(){    ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());    GenericMessage<ChatMessage> gm = new GenericMessage<>(message);    return gm;  }}

關(guān)于使用SpringCloudStream怎么實(shí)現(xiàn)服務(wù)間的通訊就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

網(wǎng)站題目:使用SpringCloudStream怎么實(shí)現(xiàn)服務(wù)間的通訊
URL地址:http://jinyejixie.com/article24/poecce.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)建站、用戶體驗(yàn)網(wǎng)站制作、網(wǎng)站設(shè)計(jì)搜索引擎優(yōu)化、Google

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

微信小程序開(kāi)發(fā)
额尔古纳市| 万荣县| 邢台市| 射洪县| 镶黄旗| 安阳县| 灌阳县| 宜昌市| 镇雄县| 博湖县| 余干县| 黔西县| 商城县| 都江堰市| 都兰县| 陆川县| 乐东| 克拉玛依市| 灯塔市| 喀喇沁旗| 万荣县| 凉城县| 共和县| 合川市| 庆元县| 通河县| 宁强县| 洛宁县| 绍兴县| 什邡市| 杨浦区| 抚宁县| 广安市| 平昌县| 龙川县| 军事| 沂水县| 苏尼特右旗| 务川| 金湖县| 瑞昌市|