本篇內(nèi)容介紹了“Python MQTT客戶端怎么使用”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
成都創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),華容企業(yè)網(wǎng)站建設(shè),華容品牌網(wǎng)站建設(shè),網(wǎng)站定制,華容網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,華容網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競爭力。可充分滿足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。
paho-mqtt 可以說是 Python MQTT 開源客戶端庫中的佼佼者。它由 Eclipse 基金會(huì)主導(dǎo)開發(fā),除了 Python 庫以外,同樣支持各大主流的編程語言,比如 C++、Java、JavaScript、Golang 等。目前 Python 版本已經(jīng)實(shí)現(xiàn)了 3.1 和 3.1.1 MQTT 協(xié)議,在最新開發(fā)版中實(shí)現(xiàn)了 MQTT 5.0。
在基金會(huì)的支持下,以每年一個(gè)版本的速度更新,本文發(fā)布時(shí)的最新版本為 1.5.0(于 2019 年 8 月發(fā)布)。
在 GitHub 主頁上,它提供了從入門的快速實(shí)現(xiàn)到每一個(gè)函數(shù)的詳細(xì)解讀,涵蓋了從初學(xué)者到高級(jí)使用者需要了解的各個(gè)部分。即使遇到超出范圍的問題,在 Google 上搜索,可以得到近 20 萬個(gè)相關(guān)詞條,是目前最為流行的 MQTT 客戶端。
得到如此多的關(guān)注度,除了穩(wěn)定的代碼外,還有其易用性。Paho 的接口使用非常簡單優(yōu)雅,您只需要少量的代碼就能實(shí)現(xiàn) MQTT 的訂閱及消息發(fā)布。
pip3 install paho-mqtt
或者
git clone https://github.com/eclipse/paho.mqtt.python cd paho.mqtt.python python3 setup.py install
import paho.mqtt.client as mqtt # 連接的回調(diào)函數(shù) def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") client.subscribe("$SYS/#") # 收到消息的回調(diào)函數(shù) def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect("broker.emqx.io", 1883, 60) client.loop_forever()
import paho.mqtt.client as mqtt import time def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") client = mqtt.Client() client.on_connect = on_connect client.connect("broker.emqx.io", 1883, 60) for i in range(3): client.publish('a/b', payload=i, qos=0, retain=False) print(f"send {i} to a/b") time.sleep(1) client.loop_forever()
甚至,你可以通過一行代碼,實(shí)現(xiàn)訂閱、發(fā)布。
import paho.mqtt.subscribe as subscribe # 當(dāng)調(diào)用這個(gè)函數(shù)時(shí),程序會(huì)堵塞在這里,直到有一條消息發(fā)送到 paho/test/simple 主題 msg = subscribe.simple("paho/test/simple", hostname="broker.emqx.io") print(f"{msg.topic} {msg.payload}")
import paho.mqtt.publish as publish # 發(fā)送一條消息 publish.single("a/b", "payload", hostname="broker.emqx.io") # 或者一次發(fā)送多個(gè)消息 msgs = [{'topic':"a/b", 'payload':"multiple 1"}, ("a/b", "multiple 2", 0, False)] publish.multiple(msgs, hostname="broker.emqx.io")
HBMQTT 基于 Python asyncio 開發(fā),僅支持 3.1.1 的 MQTT 協(xié)議。由于使用 asyncio 庫,開發(fā)者需要使用 3.4 以上的 Python 版本。
CPU 的速度遠(yuǎn)遠(yuǎn)快于磁盤、網(wǎng)絡(luò)等 IO 操作,而在一個(gè)線程中,無論 CPU 執(zhí)行得再快,遇到 IO 操作時(shí),都得停下來等待讀寫完成,這無疑浪費(fèi)了許多時(shí)間。
為了解決這個(gè)問題,Python 加入了異步 IO 的特性。在 Python 3.4 中,正式將 asyncio 納入標(biāo)準(zhǔn)庫中,并在 Python 3.5 中,加入了 async/await 關(guān)鍵字。用戶可以很輕松的使用在函數(shù)前加入 async 關(guān)鍵字,使函數(shù)變成異步函數(shù)。
HBMQTT 便是建立在 asyncio 標(biāo)準(zhǔn)庫之上。它允許用戶顯示的設(shè)置異步斷點(diǎn),通過異步 IO,MQTT 客戶端在收取消息或發(fā)送消息時(shí),掛載當(dāng)前的任務(wù),繼續(xù)處理下一個(gè)。
不過 HBMQTT 的知名度卻小得多。在 Google 上搜索,關(guān)于 HBMQTT 僅有 6000 多個(gè)詞條,在 Stack Overflow 上只有 10 個(gè)提問數(shù)。這就意味著,如果選擇 HBMQTT 的話你需要很強(qiáng)的解決問題的能力。
有意思的是,HBMQTT 本身也是一個(gè) MQTT 服務(wù)器。你可以通過 hbmqtt 命令一鍵開啟。
$ hbmqtt [2020-08-28 09:35:56,608] :: INFO - Exited state new [2020-08-28 09:35:56,608] :: INFO - Entered state starting [2020-08-28 09:35:56,609] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connections=-1)
pip3 install hbmqtt
或者
git clone https://github.com/beerfactory/hbmqtt cd hbmqtt python3 setup.py install
import logging import asyncio from hbmqtt.client import MQTTClient, ClientException from hbmqtt.mqtt.constants import QOS_1, QOS_2 async def uptime_coro(): C = MQTTClient() await C.connect('mqtt://broker.emqx.io/') await C.subscribe([ ('$SYS/broker/uptime', QOS_1), ('$SYS/broker/load/#', QOS_2), ]) try: for i in range(1, 100): message = await C.deliver_message() packet = message.publish_packet print(f"{i}: {packet.variable_header.topic_name} => {packet.payload.data}") await C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#']) await C.disconnect() except ClientException as ce: logging.error("Client exception: %s" % ce) if __name__ == '__main__': formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) asyncio.get_event_loop().run_until_complete(uptime_coro())
import logging import asyncio import time from hbmqtt.client import MQTTClient from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 async def test_coro(): C = MQTTClient() await C.connect('mqtt://broker.emqx.io/') tasks = [ asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)), asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)), asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)), ] await asyncio.wait(tasks) logging.info("messages published") await C.disconnect() if __name__ == '__main__': formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) asyncio.get_event_loop().run_until_complete(test_coro())
更多使用細(xì)節(jié)情參考官方文檔:https://hbmqtt.readthedocs.io/en/latest/。
gmqtt 是由個(gè)人開發(fā)者開源的客戶端庫。默認(rèn)支持 MQTT 5.0 協(xié)議,如果連接的 MQTT 代理不支持 5.0 協(xié)議,則會(huì)降級(jí)到 3.1 并重新進(jìn)行連接。
相較于前兩者,gmqtt 還屬于初級(jí)開發(fā)階段,本文發(fā)布時(shí)的版本號(hào)是 0.6.7。但它是早期支持 MQTT 5.0 的 Python 庫之一,因此在網(wǎng)絡(luò)上知名度尚可。
同樣,它建立在 asyncio 庫上,因此需要使用 Python 3.4 以上的版本。
pip3 install gmqtt
或者
git clone https://github.com/wialon/gmqtt cd gmqtt python3 setup.py install
import asyncio import os import signal import time from gmqtt import Client as MQTTClient STOP = asyncio.Event() def on_connect(client, flags, rc, properties): print('Connected') def on_message(client, topic, payload, qos, properties): print(f'RECV MSG: {topic} {payload}') def on_subscribe(client, mid, qos, properties): print('SUBSCRIBED') def on_disconnect(client, packet, exc=None): print('Disconnected') def ask_exit(*args): STOP.set() async def main(broker_host): client = MQTTClient("client-id") client.on_connect = on_connect client.on_message = on_message client.on_subscribe = on_subscribe client.on_disconnect = on_disconnect # 連接 MQTT 代理 await client.connect(broker_host) # 訂閱主題 client.subscribe('TEST/#') # 發(fā)送測試數(shù)據(jù) client.publish("TEST/A", 'AAA') client.publish("TEST/B", 'BBB') await STOP.wait() await client.disconnect() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, ask_exit) loop.add_signal_handler(signal.SIGTERM, ask_exit) host = 'broker.emqx.io' loop.run_until_complete(main(host))
import asyncio import os import signal import time from gmqtt import Client as MQTTClient STOP = asyncio.Event() def on_connect(client, flags, rc, properties): print('Connected') client.subscribe('TEST/#', qos=0) def on_message(client, topic, payload, qos, properties): print(f'RECV MSG: {topic}, {payload}') def on_disconnect(client, packet, exc=None): print('Disconnected') def ask_exit(*args): STOP.set() async def main(broker_host): client = MQTTClient("client-id") client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect await client.connect(broker_host) client.publish('TEST/TIME', str(time.time()), qos=1) await STOP.wait() await client.disconnect() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, ask_exit) loop.add_signal_handler(signal.SIGTERM, ask_exit) host = 'broker.emqx.io' loop.run_until_complete(main(host))
在介紹完這三款 Python MQTT 客戶端庫之后,我們再來看看如何為自己選擇合適的 MQTT 客戶端庫。這三個(gè)客戶端各有自己的優(yōu)缺點(diǎn):
paho-mqtt 有著最優(yōu)秀的文檔,代碼風(fēng)格易于理解,同時(shí)有著強(qiáng)大的基金會(huì)支持,但目前文檔的版本還不支持 MQTT 5.0。
HBMQTT 使用 asyncio 庫實(shí)現(xiàn),可以優(yōu)化網(wǎng)絡(luò) I/O 帶來的延遲。但是代碼風(fēng)格不友好,同樣不支持 MQTT 5.0。
gmqtt 同樣通過 asyncio 庫實(shí)現(xiàn),相比 HBMQTT ,代碼風(fēng)格友好,最重要的是,它支持 MQTT 5.0。但開發(fā)進(jìn)程慢,未來前景不明。
因此,在選擇時(shí),您可以參考一下的思路:
如果您是正常開發(fā),想要將其運(yùn)用在生產(chǎn)環(huán)境中,paho-mqtt 無疑是最好的選擇,其穩(wěn)定性和代碼易讀性遠(yuǎn)遠(yuǎn)超過其它兩個(gè)庫。在遇到問題時(shí),優(yōu)秀的文檔和互聯(lián)網(wǎng)上大量的詞條,也能幫您找到更多的解決方案。
對(duì)于熟練使用 asyncio 庫的讀者,不妨嘗試一下 HBMQTT 和 gmqtt。
如果您想要學(xué)習(xí)、參與開源項(xiàng)目或者使用 MQTT 5.0, 則不妨試用一下 gmqtt,并嘗試為其共享一份代碼吧。
“Python MQTT客戶端怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
本文標(biāo)題:PythonMQTT客戶端怎么使用
本文地址:http://jinyejixie.com/article16/gpssgg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷、定制開發(fā)、企業(yè)建站、網(wǎng)站設(shè)計(jì)公司、App設(shè)計(jì)、自適應(yīng)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)