1) C++ 獲取消息數(shù)據(jù)
amqp_rpc_reply_t ret;
timeval tvTimeout;
tvTimeout.tv_sec = 1;
tvTimeout.tv_usec = 0;
ret = amqp_consume_message(conn, &envelope, &valTimeOut, 0);
成都創(chuàng)新互聯(lián)公司是一家專(zhuān)注網(wǎng)站建設(shè)、網(wǎng)絡(luò)營(yíng)銷(xiāo)策劃、微信小程序開(kāi)發(fā)、電子商務(wù)建設(shè)、網(wǎng)絡(luò)推廣、移動(dòng)互聯(lián)開(kāi)發(fā)、研究、服務(wù)為一體的技術(shù)型公司。公司成立10年以來(lái),已經(jīng)為近千家生料攪拌車(chē)各業(yè)的企業(yè)公司提供互聯(lián)網(wǎng)服務(wù)?,F(xiàn)在,服務(wù)的近千家客戶與我們一路同行,見(jiàn)證我們的成長(zhǎng);未來(lái),我們一起分享成功的喜悅。
if (AMQP_RESPONSE_NORMAL == ret.reply_type)
{
?? std::string strAMQPMsg((char*)envelope.message.body.bytes, envelope.message.body.len);
}
誤區(qū): std::string strAMQPMsg = char*)envelope.message.body.bytes 存在多余的數(shù)據(jù)
誤區(qū): 沒(méi)有設(shè)置接收超時(shí),而是直接傳遞NULL,導(dǎo)致函數(shù)進(jìn)入死循環(huán)
2)發(fā)送消息的時(shí)候,返回錯(cuò)誤信息:AMQP_STATUS_SOCKET_ERROR
AMQP_STATUS_SOCKET_ERROR = -0x0009,?????????????? /**< A socket error occurred */
服務(wù)器在一定時(shí)間之內(nèi),收到客戶端的消息,就會(huì)主動(dòng)斷開(kāi)連接,因此客戶端需要跟服務(wù)器Broker重新建立連接,如果不想斷開(kāi)連接,需要發(fā)送心跳
3)確認(rèn)數(shù)據(jù)是否已經(jīng)發(fā)送成功
????? 關(guān)于消費(fèi)者就不用代碼來(lái)獲取消息了,直接在RabbitMQ Management點(diǎn)擊某個(gè)隊(duì)列的名字,然后Get Message(s) 即可獲取消息內(nèi)容
4)指定消息的超時(shí)時(shí)間
某些實(shí)際的應(yīng)用場(chǎng)景中會(huì)產(chǎn)生許多過(guò)期的消息時(shí)間,可以通過(guò)設(shè)置amqp_basic_properties_t的超時(shí)時(shí)間參數(shù)expiration來(lái)解決隊(duì)列中的超時(shí)數(shù)據(jù)過(guò)多的問(wèn)題
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_EXPIRATION_FLAG;
props.expiration = amqp_cstring_bytes("10000");//超時(shí)10秒
amqp_basic_publish(conn, channnel, exchange, queue, 0, 0, &props, message);
5)聲明隊(duì)列,返回錯(cuò)誤信息:AMQP_RESPONSE_SERVER_EXCEPTION
原因:1.交換機(jī)是否創(chuàng)建成功 2.聲明的隊(duì)列是否已經(jīng)創(chuàng)建過(guò),并且已經(jīng)存在的隊(duì)列跟現(xiàn)在的隊(duì)列的屬性不一致,例如auto_delete自動(dòng)刪除屬性,或者durable持久化屬性
導(dǎo)致的問(wèn)題:當(dāng)返回該錯(cuò)誤,說(shuō)明跟broker的連接已經(jīng)中斷,必須重新建立連接,否則,繼續(xù)調(diào)用其他函數(shù)接口會(huì)一直阻塞
解決: 通過(guò)web手動(dòng)刪除隊(duì)列
6)只知道隊(duì)列的情況下獲取數(shù)據(jù)
實(shí)際上說(shuō)明聲明的交換機(jī)和隊(duì)列都必須唯一
?amqp_connection_state_t connState = amqp_new_connection();
?amqp_socket_t *pSocket = amqp_tcp_socket_new(connState);
?if (!pSocket) {
??amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
??amqp_destroy_connection(connState);
??std::cout << "跟消息服務(wù)器創(chuàng)建連接失敗" << std::endl;
??return;
?}
?int nConnStatus = amqp_socket_open(pSocket, strIP.c_str(), nPort);
?if (AMQP_STATUS_OK != nConnStatus) {
??amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
??amqp_destroy_connection(connState);
??return;
?}
?amqp_rpc_reply_t? rpcReply = amqp_login(connState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, strUserName.c_str(), strPassword.c_str());
?if (AMQP_RESPONSE_NORMAL != rpcReply.reply_type)
?{
??std::cout << "登陸消息服務(wù)器失敗" << std::endl;
??return;
?}
?amqp_channel_open(connState, 1);
?amqp_basic_consume(connState, 1, amqp_cstring_bytes("passerby-000001"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
?amqp_frame_t frame;
?std::cout << "登陸消息服務(wù)器成功,開(kāi)始接收數(shù)據(jù)" << std::endl;
?while (1)
?{
??amqp_envelope_t envelope;
??amqp_maybe_release_buffers(connState);
??timeval tvTimeout;
??tvTimeout.tv_sec = 10;
??tvTimeout.tv_usec = 0;
??amqp_rpc_reply_t ret = amqp_consume_message(connState, &envelope, &tvTimeout, 0);
??if (AMQP_RESPONSE_NORMAL != ret.reply_type)
??{
???if (AMQP_STATUS_SOCKET_ERROR == ret.library_error)
???{
????std::cout << "跟消息服務(wù)器連接中斷,清理資源,重連連接" << std::endl;
????break;
???}
???if (AMQP_STATUS_TIMEOUT == ret.library_error)
???{
????std::cout << "等待消息服務(wù)器消息超時(shí),繼續(xù)等待" << std::endl;
????continue;
???}
???std::cout << "跟消息服務(wù)器連接出現(xiàn)異常,清理資源,重連連接" << std::endl;
???break;
??}
??else
??{
???std::string strRecvMsg((char*)envelope.message.body.bytes, envelope.message.body.len);
???std::cout << "接收到的抓拍信息:" << strRecvMsg<< std::endl;
???amqp_destroy_envelope(&envelope);
???continue;
??}
?}
?amqp_channel_close(connState, 1, AMQP_REPLY_SUCCESS);
?amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
?amqp_destroy_connection(connState);
7)生產(chǎn)者不產(chǎn)生隊(duì)列,消費(fèi)者通過(guò)指定的交換機(jī)和routing-key,創(chuàng)建隊(duì)列,然后將該隊(duì)列綁定到交換機(jī)上
?char const* pszExchange = "passerByExchange";
?char const* pszRoutingKey = "passerby-000001";
?amqp_connection_state_t connState = amqp_new_connection();
?amqp_socket_t* pSocket = amqp_tcp_socket_new(connState);
?if (!pSocket) {
??amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
??amqp_destroy_connection(connState);
??return;
?}
?int nStatus = amqp_socket_open(pSocket, strIP.c_str(), nPort);
?if (nStatus) {
??amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
??amqp_destroy_connection(connState);
??return;
?}
?amqp_rpc_reply_t? replyLogin = amqp_login(connState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, strUserName.c_str(), strPassword.c_str());
?if (AMQP_RESPONSE_NORMAL != replyLogin.reply_type)
?{
??std::cout << "登陸消息服務(wù)器失敗" << std::endl;
??return;
?}
?amqp_channel_open(connState, 1);
?amqp_queue_declare_ok_t *r = amqp_queue_declare(
??connState, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
?amqp_bytes_t queueName = amqp_bytes_malloc_dup(r->queue);
?if (queueName.bytes == NULL)
?{
??amqp_bytes_free(queueName);
??amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
??amqp_destroy_connection(connState);
??return;
?}
?amqp_queue_bind(connState, 1, queueName, amqp_cstring_bytes(pszExchange),
??amqp_cstring_bytes(pszRoutingKey), amqp_empty_table);
?amqp_basic_consume(connState, 1, queueName, amqp_empty_bytes, 0, 1, 0,
??amqp_empty_table);
?amqp_frame_t frame;
?while (1)
?{
??amqp_rpc_reply_t ret;
??amqp_envelope_t envelope;
??amqp_maybe_release_buffers(connState);
??timeval tvTimeout;
??tvTimeout.tv_sec = 10;
??tvTimeout.tv_usec = 0;
??ret = amqp_consume_message(connState, &envelope, &tvTimeout, 0);
??if (AMQP_RESPONSE_NORMAL != ret.reply_type)
??{
???if (AMQP_STATUS_TIMEOUT == ret.library_error)
???{
????std::cout << "接收消息超時(shí)" << std::endl;
????continue;
???}
???std::cout << "連接消息服務(wù)器異常,清理資源退出" << std::endl;
???break;
??}
??else
??{
???std::string strRecvMsg((char*)envelope.message.body.bytes, envelope.message.body.len);
???std::string strGBK = UTF8ToGBK(strRecvMsg.c_str());
???amqp_destroy_envelope(&envelope);
??}
?}
?amqp_channel_close(connState, 1, AMQP_REPLY_SUCCESS);
?amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
?amqp_destroy_connection(connState);
8)amqp_basic_consume函數(shù)不能連續(xù)調(diào)用多次同時(shí)消費(fèi)多個(gè)隊(duì)列
代碼如下:
amqp_basic_consume(connState, 1, amqp_cstring_bytes("alarm"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
amqp_basic_consume(connState, 1, amqp_cstring_bytes("capture"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
只能執(zhí)行第一句代碼,第二句代碼會(huì)一直阻塞
9)登陸MQ服務(wù)器,進(jìn)行心跳交互代碼
?amqp_rpc_reply_t? replyLogin = amqp_login(conn, "/", 0, 131072, 120, AMQP_SASL_METHOD_PLAIN, strUserName.c_str(), strPassword.c_str());
第五個(gè)參數(shù),指定了跟服務(wù)器多少秒發(fā)送一次心跳,如果不發(fā)心跳,跟服務(wù)器在連接一段時(shí)間之后,斷開(kāi),當(dāng)然,也要考慮到長(zhǎng)連接也可能在網(wǎng)絡(luò)異常情況下斷開(kāi)
當(dāng)前名稱(chēng):RabbitMQ第五課使用技巧
網(wǎng)站鏈接:http://jinyejixie.com/article0/gpehoo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站策劃、自適應(yīng)網(wǎng)站、軟件開(kāi)發(fā)、虛擬主機(jī)、網(wǎng)站設(shè)計(jì)、手機(jī)網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)