成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

RabbitMQ第五課使用技巧

1) C++ 獲取消息數(shù)據(jù)
amqp_rpc_reply_t ret;
timeval tvTimeout;
tvTimeout.tv_sec = 1;
tvTimeout.tv_usec = 0;
ret = amqp_consume_message(conn, &envelope, &valTimeOut, 0);

成都創(chuàng)新互聯(lián)公司是一家專(zhuān)注網(wǎng)站建設(shè)、網(wǎng)絡(luò)營(yíng)銷(xiāo)策劃、微信小程序開(kāi)發(fā)、電子商務(wù)建設(shè)、網(wǎng)絡(luò)推廣、移動(dòng)互聯(lián)開(kāi)發(fā)、研究、服務(wù)為一體的技術(shù)型公司。公司成立10年以來(lái),已經(jīng)為近千家生料攪拌車(chē)各業(yè)的企業(yè)公司提供互聯(lián)網(wǎng)服務(wù)?,F(xiàn)在,服務(wù)的近千家客戶與我們一路同行,見(jiàn)證我們的成長(zhǎng);未來(lái),我們一起分享成功的喜悅。

if (AMQP_RESPONSE_NORMAL == ret.reply_type)
{
?? std::string strAMQPMsg((char*)envelope.message.body.bytes, envelope.message.body.len);
}

誤區(qū): std::string strAMQPMsg = char*)envelope.message.body.bytes 存在多余的數(shù)據(jù)
誤區(qū): 沒(méi)有設(shè)置接收超時(shí),而是直接傳遞NULL,導(dǎo)致函數(shù)進(jìn)入死循環(huán)


2)發(fā)送消息的時(shí)候,返回錯(cuò)誤信息:AMQP_STATUS_SOCKET_ERROR
AMQP_STATUS_SOCKET_ERROR = -0x0009,?????????????? /**< A socket error occurred */
服務(wù)器在一定時(shí)間之內(nèi),收到客戶端的消息,就會(huì)主動(dòng)斷開(kāi)連接,因此客戶端需要跟服務(wù)器Broker重新建立連接,如果不想斷開(kāi)連接,需要發(fā)送心跳


3)確認(rèn)數(shù)據(jù)是否已經(jīng)發(fā)送成功

????? 關(guān)于消費(fèi)者就不用代碼來(lái)獲取消息了,直接在RabbitMQ Management點(diǎn)擊某個(gè)隊(duì)列的名字,然后Get Message(s) 即可獲取消息內(nèi)容

4)指定消息的超時(shí)時(shí)間
某些實(shí)際的應(yīng)用場(chǎng)景中會(huì)產(chǎn)生許多過(guò)期的消息時(shí)間,可以通過(guò)設(shè)置amqp_basic_properties_t的超時(shí)時(shí)間參數(shù)expiration來(lái)解決隊(duì)列中的超時(shí)數(shù)據(jù)過(guò)多的問(wèn)題

amqp_basic_properties_t props;
props._flags = AMQP_BASIC_EXPIRATION_FLAG;
props.expiration = amqp_cstring_bytes("10000");//超時(shí)10秒
amqp_basic_publish(conn, channnel, exchange, queue, 0, 0, &props, message);

5)聲明隊(duì)列,返回錯(cuò)誤信息:AMQP_RESPONSE_SERVER_EXCEPTION

原因:1.交換機(jī)是否創(chuàng)建成功 2.聲明的隊(duì)列是否已經(jīng)創(chuàng)建過(guò),并且已經(jīng)存在的隊(duì)列跟現(xiàn)在的隊(duì)列的屬性不一致,例如auto_delete自動(dòng)刪除屬性,或者durable持久化屬性

導(dǎo)致的問(wèn)題:當(dāng)返回該錯(cuò)誤,說(shuō)明跟broker的連接已經(jīng)中斷,必須重新建立連接,否則,繼續(xù)調(diào)用其他函數(shù)接口會(huì)一直阻塞

解決: 通過(guò)web手動(dòng)刪除隊(duì)列

6)只知道隊(duì)列的情況下獲取數(shù)據(jù)

實(shí)際上說(shuō)明聲明的交換機(jī)和隊(duì)列都必須唯一

?amqp_connection_state_t connState = amqp_new_connection();
?amqp_socket_t *pSocket = amqp_tcp_socket_new(connState);
?if (!pSocket) {
??amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
??amqp_destroy_connection(connState);
??std::cout << "跟消息服務(wù)器創(chuàng)建連接失敗" << std::endl;
??return;
?}
?int nConnStatus = amqp_socket_open(pSocket, strIP.c_str(), nPort);
?if (AMQP_STATUS_OK != nConnStatus) {
??amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
??amqp_destroy_connection(connState);
??return;
?}

?amqp_rpc_reply_t? rpcReply = amqp_login(connState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, strUserName.c_str(), strPassword.c_str());
?if (AMQP_RESPONSE_NORMAL != rpcReply.reply_type)
?{
??std::cout << "登陸消息服務(wù)器失敗" << std::endl;
??return;
?}

?amqp_channel_open(connState, 1);
?amqp_basic_consume(connState, 1, amqp_cstring_bytes("passerby-000001"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

?amqp_frame_t frame;
?std::cout << "登陸消息服務(wù)器成功,開(kāi)始接收數(shù)據(jù)" << std::endl;

?while (1)
?{
??amqp_envelope_t envelope;

??amqp_maybe_release_buffers(connState);
??timeval tvTimeout;
??tvTimeout.tv_sec = 10;
??tvTimeout.tv_usec = 0;
??amqp_rpc_reply_t ret = amqp_consume_message(connState, &envelope, &tvTimeout, 0);

??if (AMQP_RESPONSE_NORMAL != ret.reply_type)
??{
???if (AMQP_STATUS_SOCKET_ERROR == ret.library_error)
???{
????std::cout << "跟消息服務(wù)器連接中斷,清理資源,重連連接" << std::endl;
????break;
???}
???if (AMQP_STATUS_TIMEOUT == ret.library_error)
???{
????std::cout << "等待消息服務(wù)器消息超時(shí),繼續(xù)等待" << std::endl;
????continue;
???}
???std::cout << "跟消息服務(wù)器連接出現(xiàn)異常,清理資源,重連連接" << std::endl;
???break;
??}
??else
??{
???std::string strRecvMsg((char*)envelope.message.body.bytes, envelope.message.body.len);
???std::cout << "接收到的抓拍信息:" << strRecvMsg<< std::endl;
???amqp_destroy_envelope(&envelope);
???continue;
??}
?}
?amqp_channel_close(connState, 1, AMQP_REPLY_SUCCESS);
?amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
?amqp_destroy_connection(connState);

7)生產(chǎn)者不產(chǎn)生隊(duì)列,消費(fèi)者通過(guò)指定的交換機(jī)和routing-key,創(chuàng)建隊(duì)列,然后將該隊(duì)列綁定到交換機(jī)上

?char const* pszExchange = "passerByExchange";
?char const* pszRoutingKey = "passerby-000001";

?amqp_connection_state_t connState = amqp_new_connection();
?amqp_socket_t* pSocket = amqp_tcp_socket_new(connState);
?if (!pSocket) {
??amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
??amqp_destroy_connection(connState);
??return;
?}

?int nStatus = amqp_socket_open(pSocket, strIP.c_str(), nPort);
?if (nStatus) {
??amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
??amqp_destroy_connection(connState);
??return;
?}

?amqp_rpc_reply_t? replyLogin = amqp_login(connState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, strUserName.c_str(), strPassword.c_str());
?if (AMQP_RESPONSE_NORMAL != replyLogin.reply_type)
?{
??std::cout << "登陸消息服務(wù)器失敗" << std::endl;
??return;
?}

?amqp_channel_open(connState, 1);
?amqp_queue_declare_ok_t *r = amqp_queue_declare(
??connState, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
?amqp_bytes_t queueName = amqp_bytes_malloc_dup(r->queue);
?if (queueName.bytes == NULL)
?{
??amqp_bytes_free(queueName);
??amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
??amqp_destroy_connection(connState);
??return;
?}

?amqp_queue_bind(connState, 1, queueName, amqp_cstring_bytes(pszExchange),
??amqp_cstring_bytes(pszRoutingKey), amqp_empty_table);
?amqp_basic_consume(connState, 1, queueName, amqp_empty_bytes, 0, 1, 0,
??amqp_empty_table);
?amqp_frame_t frame;

?while (1)
?{
??amqp_rpc_reply_t ret;
??amqp_envelope_t envelope;

??amqp_maybe_release_buffers(connState);
??timeval tvTimeout;
??tvTimeout.tv_sec = 10;
??tvTimeout.tv_usec = 0;
??ret = amqp_consume_message(connState, &envelope, &tvTimeout, 0);

??if (AMQP_RESPONSE_NORMAL != ret.reply_type)
??{
???if (AMQP_STATUS_TIMEOUT == ret.library_error)
???{
????std::cout << "接收消息超時(shí)" << std::endl;
????continue;
???}
???std::cout << "連接消息服務(wù)器異常,清理資源退出" << std::endl;
???break;
??}
??else
??{
???std::string strRecvMsg((char*)envelope.message.body.bytes, envelope.message.body.len);
???std::string strGBK = UTF8ToGBK(strRecvMsg.c_str());
???amqp_destroy_envelope(&envelope);
??}
?}
?amqp_channel_close(connState, 1, AMQP_REPLY_SUCCESS);
?amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
?amqp_destroy_connection(connState);

8)amqp_basic_consume函數(shù)不能連續(xù)調(diào)用多次同時(shí)消費(fèi)多個(gè)隊(duì)列

代碼如下:

amqp_basic_consume(connState, 1, amqp_cstring_bytes("alarm"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

amqp_basic_consume(connState, 1, amqp_cstring_bytes("capture"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

只能執(zhí)行第一句代碼,第二句代碼會(huì)一直阻塞

9)登陸MQ服務(wù)器,進(jìn)行心跳交互代碼

?amqp_rpc_reply_t? replyLogin = amqp_login(conn, "/", 0, 131072, 120, AMQP_SASL_METHOD_PLAIN, strUserName.c_str(), strPassword.c_str());

第五個(gè)參數(shù),指定了跟服務(wù)器多少秒發(fā)送一次心跳,如果不發(fā)心跳,跟服務(wù)器在連接一段時(shí)間之后,斷開(kāi),當(dāng)然,也要考慮到長(zhǎng)連接也可能在網(wǎng)絡(luò)異常情況下斷開(kāi)

當(dāng)前名稱(chēng):RabbitMQ第五課使用技巧
網(wǎng)站鏈接:http://jinyejixie.com/article0/gpehoo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站策劃、自適應(yīng)網(wǎng)站軟件開(kāi)發(fā)、虛擬主機(jī)、網(wǎng)站設(shè)計(jì)、手機(jī)網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)
喀喇| 岢岚县| 杭锦旗| 林口县| 哈尔滨市| 常宁市| 克山县| 偏关县| 壤塘县| 十堰市| 玛多县| 龙游县| 田东县| 外汇| 从化市| 河津市| 吉安市| 江北区| 阿勒泰市| 宜春市| 潞西市| 老河口市| 武鸣县| 安仁县| 武宁县| 潞西市| 青河县| 凤冈县| 庆安县| 永仁县| 巴彦淖尔市| 页游| 宜黄县| 德钦县| 唐河县| 娄烦县| 宜城市| 靖边县| 正镶白旗| 白玉县| 楚雄市|