成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

RabbitMQ中簡(jiǎn)單隊(duì)列的實(shí)現(xiàn)原理是什么

今天就跟大家聊聊有關(guān)RabbitMQ中簡(jiǎn)單隊(duì)列的實(shí)現(xiàn)原理是什么,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

在網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì)中從網(wǎng)站色彩、結(jié)構(gòu)布局、欄目設(shè)置、關(guān)鍵詞群組等細(xì)微處著手,突出企業(yè)的產(chǎn)品/服務(wù)/品牌,幫助企業(yè)鎖定精準(zhǔn)用戶,提高在線咨詢和轉(zhuǎn)化,使成都網(wǎng)站營(yíng)銷成為有效果、有回報(bào)的無錫營(yíng)銷推廣。創(chuàng)新互聯(lián)專業(yè)成都網(wǎng)站建設(shè)十余年了,客戶滿意度97.8%,歡迎成都創(chuàng)新互聯(lián)客戶聯(lián)系。

RabbitMQ 簡(jiǎn)述

RabbitMQ是一個(gè)消息代理:它接受并轉(zhuǎn)發(fā)消息。 您可以將其視為郵局:當(dāng)您將要把寄發(fā)的郵件投遞到郵箱中時(shí),您可以確信Postman 先生最終會(huì)將郵件發(fā)送給收件人。 在這個(gè)比喻中,RabbitMQ是一個(gè)郵箱,郵局和郵遞員,用來接受,存儲(chǔ)和轉(zhuǎn)發(fā)二進(jìn)制數(shù)據(jù)塊的消息。

隊(duì)列就像是在RabbitMQ中扮演郵箱的角色。 雖然消息經(jīng)過RabbitMQ和應(yīng)用程序,但它們只能存儲(chǔ)在隊(duì)列中。 隊(duì)列只受主機(jī)的內(nèi)存和磁盤限制的限制,它本質(zhì)上是一個(gè)大的消息緩沖區(qū)。 許多生產(chǎn)者可以發(fā)送到一個(gè)隊(duì)列的消息,許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)。

producer即為生產(chǎn)者,用來產(chǎn)生消息發(fā)送給隊(duì)列。consumer是消費(fèi)者,需要去讀隊(duì)列內(nèi)的消息。producer,consumer和broker(rabbitMQ server)不必駐留在同一個(gè)主機(jī)上;確實(shí)在大多數(shù)應(yīng)用程序中它們是這樣分布的。

簡(jiǎn)單隊(duì)列

簡(jiǎn)單隊(duì)列是最簡(jiǎn)單的一種模式,由生產(chǎn)者、隊(duì)列、消費(fèi)者組成。生產(chǎn)者將消息發(fā)送給隊(duì)列,消費(fèi)者從隊(duì)列中讀取消息完成消費(fèi)。

在下圖中,“P”是我們的生產(chǎn)者,“C”是我們的消費(fèi)者。 中間的框是隊(duì)列 - RabbitMQ代表消費(fèi)者的消息緩沖區(qū)。

RabbitMQ中簡(jiǎn)單隊(duì)列的實(shí)現(xiàn)原理是什么

java 方式

生產(chǎn)者

package com.anqi.mq.nat;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class MyProducer {
  private static final String QUEUE_NAME = "ITEM_QUEUE";

  public static void main(String[] args) throws Exception {
    //1. 創(chuàng)建一個(gè) ConnectionFactory 并進(jìn)行設(shè)置
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("guest");

    //2. 通過連接工廠來創(chuàng)建連接
    Connection connection = factory.newConnection();

    //3. 通過 Connection 來創(chuàng)建 Channel
    Channel channel = connection.createChannel();

    //實(shí)際場(chǎng)景中,消息多為json格式的對(duì)象
    String msg = "hello";
    //4. 發(fā)送三條數(shù)據(jù)
    for (int i = 1; i <= 3 ; i++) {
      channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
      System.out.println("Send message" + i +" : " + msg);
    }

    //5. 關(guān)閉連接
    channel.close();
    connection.close();
  }
}
  /**
   * Declare a queue
   * @param queue the name of the queue
   * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
   * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
   * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
   * @param arguments other properties (construction arguments) for the queue
   * @return a declaration-confirm method to indicate the queue was successfully declared
   * @throws java.io.IOException if an error is encountered
   */
  Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;

  /**
   * Publish a message
   * @see com.rabbitmq.client.AMQP.Basic.Publish
   * @param exchange the exchange to publish the message to
   * @param routingKey the routing key
   * @param props other properties for the message - routing headers etc
   * @param body the message body
   * @throws java.io.IOException if an error is encountered
   */
  void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;


  /**
   * Start a non-nolocal, non-exclusive consumer, with
   * a server-generated consumerTag.
   * @param queue the name of the queue
   * @param autoAck true if the server should consider messages
   * acknowledged once delivered; false if the server should expect
   * explicit acknowledgements
   * @param callback an interface to the consumer object
   * @return the consumerTag generated by the server
   * @throws java.io.IOException if an error is encountered
   * @see com.rabbitmq.client.AMQP.Basic.Consume
   * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
   * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
   */
  String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

消費(fèi)者

package com.anqi.mq.nat;

import com.rabbitmq.client.*;
import java.io.IOException;

public class MyConsumer {

  private static final String QUEUE_NAME = "ITEM_QUEUE";

  public static void main(String[] args) throws Exception {
    //1. 創(chuàng)建一個(gè) ConnectionFactory 并進(jìn)行設(shè)置
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("guest");

    //2. 通過連接工廠來創(chuàng)建連接
    Connection connection = factory.newConnection();

    //3. 通過 Connection 來創(chuàng)建 Channel
    Channel channel = connection.createChannel();

    //4. 聲明一個(gè)隊(duì)列
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    /*
      true:表示自動(dòng)確認(rèn),只要消息從隊(duì)列中獲取,無論消費(fèi)者獲取到消息后是否成功消費(fèi),都會(huì)認(rèn)為消息已經(jīng)成功消費(fèi)
      false:表示手動(dòng)確認(rèn),消費(fèi)者獲取消息后,服務(wù)器會(huì)將該消息標(biāo)記為不可用狀態(tài),等待消費(fèi)者的反饋,如果消費(fèi)者一
      直沒有反饋,那么該消息將一直處于不可用狀態(tài),并且服務(wù)器會(huì)認(rèn)為該消費(fèi)者已經(jīng)掛掉,不會(huì)再給其發(fā)送消息,
      直到該消費(fèi)者反饋。
    */

    //5. 創(chuàng)建消費(fèi)者并接收消息
    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body)
          throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };

    //6. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列
    channel.basicConsume(QUEUE_NAME, true, consumer);

  }
}
Send message1 : hello
Send message2 : hello
Send message3 : hello

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'hello'
 [x] Received 'hello'
 [x] Received 'hello'

當(dāng)我們啟動(dòng)生產(chǎn)者之后查看RabbitMQ管理后臺(tái)可以看到有一條消息正在等待被消費(fèi)。

RabbitMQ中簡(jiǎn)單隊(duì)列的實(shí)現(xiàn)原理是什么

當(dāng)我們啟動(dòng)消費(fèi)者之后再次查看,可以看到積壓的一條消息已經(jīng)被消費(fèi)。

RabbitMQ中簡(jiǎn)單隊(duì)列的實(shí)現(xiàn)原理是什么

總結(jié)

  • 隊(duì)列聲明queueDeclare的參數(shù):第一個(gè)參數(shù)表示隊(duì)列名稱、第二個(gè)參數(shù)為是否持久化(true表示是,隊(duì)列將在服務(wù)器重啟時(shí)生存)、第三個(gè)參數(shù)為是否是獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開后自動(dòng)刪除)、第四個(gè)參數(shù)為當(dāng)所有消費(fèi)者客戶端連接斷開時(shí)是否自動(dòng)刪除隊(duì)列、第五個(gè)參數(shù)為隊(duì)列的其他參數(shù)。

  • basicConsume的第二個(gè)參數(shù)autoAck: 應(yīng)答模式,true:自動(dòng)應(yīng)答,即消費(fèi)者獲取到消息,該消息就會(huì)從隊(duì)列中刪除掉,false:手動(dòng)應(yīng)答,當(dāng)從隊(duì)列中取出消息后,需要程序員手動(dòng)調(diào)用方法應(yīng)答,如果沒有應(yīng)答,該消息還會(huì)再放進(jìn)隊(duì)列中,就會(huì)出現(xiàn)該消息一直沒有被消費(fèi)掉的現(xiàn)象。

  • 這種簡(jiǎn)單隊(duì)列的模式,系統(tǒng)會(huì)為每個(gè)隊(duì)列隱式地綁定一個(gè)默認(rèn)交換機(jī),交換機(jī)名稱為" (AMQP default)",類型為直連 direct,當(dāng)你手動(dòng)創(chuàng)建一個(gè)隊(duì)列時(shí),系統(tǒng)會(huì)自動(dòng)將這個(gè)隊(duì)列綁定到一個(gè)名稱為空的 Direct 類型的交換機(jī)上,綁定的路由鍵 routing key 與隊(duì)列名稱相同,相當(dāng)于channel.queueBind(queue:"QUEUE_NAME", exchange:"(AMQP default)“, routingKey:"QUEUE_NAME");雖然實(shí)例沒有顯式聲明交換機(jī),但是當(dāng)路由鍵和隊(duì)列名稱一樣時(shí),就會(huì)將消息發(fā)送到這個(gè)默認(rèn)的交換機(jī)中。這種方式比較簡(jiǎn)單,但是無法滿足復(fù)雜的業(yè)務(wù)需求,所以通常在生產(chǎn)環(huán)境中很少使用這種方式。

  • The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.默認(rèn)交換機(jī)隱式綁定到每個(gè)隊(duì)列,其中路由鍵等于隊(duì)列名稱。不可能顯式綁定到,或從缺省交換中解除綁定。它也不能被刪除。 ——引自 RabbitMQ 官方文檔

spring-amqp方式

引入 Maven 依賴

<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.6.0</version>
    </dependency>    
        <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>2.1.5.RELEASE</version>
    </dependency>

spring 配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/rabbit
      https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
      http://www.springframework.org/schema/beans
      https://www.springframework.org/schema/beans/spring-beans.xsd">

  <rabbit:connection-factory id="connectionFactory" host="localhost" virtual-host="/"
  username="guest" password="guest"/>
  <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
  <rabbit:admin connection-factory="connectionFactory"/>
  <rabbit:queue name="MY-QUEUE"/>
</beans>

使用測(cè)試

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Main {
  public static void main(String[] args) {
    ApplicationContext app = new ClassPathXmlApplicationContext("spring/rabbit-context.xml");
    AmqpTemplate amqpTemplate = app.getBean(AmqpTemplate.class);
    amqpTemplate.convertAndSend("MY-QUEUE", "Item");
    String msg = (String) amqpTemplate.receiveAndConvert("MY-QUEUE");
    System.out.println(msg);
  }
}

參考方法

void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
/**
   * Receive a message if there is one from a specific queue and convert it to a Java
   * object. Returns immediately, possibly with a null value.
   *
   * @param queueName the name of the queue to poll
   * @return a message or null if there is none waiting
   * @throws AmqpException if there is a problem
   */
@Nullable
Object receiveAndConvert(String queueName) throws AmqpException;

看完上述內(nèi)容,你們對(duì)RabbitMQ中簡(jiǎn)單隊(duì)列的實(shí)現(xiàn)原理是什么有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。

網(wǎng)頁(yè)題目:RabbitMQ中簡(jiǎn)單隊(duì)列的實(shí)現(xiàn)原理是什么
本文路徑:http://jinyejixie.com/article44/ggeeee.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司、軟件開發(fā)、品牌網(wǎng)站建設(shè)、服務(wù)器托管、網(wǎng)頁(yè)設(shè)計(jì)公司、定制開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

綿陽服務(wù)器托管
长春市| 宜阳县| 聂拉木县| 邳州市| 安国市| 洱源县| 扎赉特旗| 大埔区| 定南县| 苍梧县| 墨竹工卡县| 阆中市| 河池市| 新源县| 大方县| 宣武区| 濉溪县| 桦川县| 井陉县| 娱乐| 隆尧县| 洛隆县| 大足县| 汝阳县| 交口县| 英德市| 全椒县| 开阳县| 东平县| 咸阳市| 兰溪市| 富宁县| 鸡东县| 万宁市| 古浪县| 正镶白旗| 商丘市| 黑水县| 嘉峪关市| 偃师市| 高碑店市|