成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

PythonMQTT客戶端怎么使用

本篇內(nèi)容介紹了“Python MQTT客戶端怎么使用”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

成都創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),華容企業(yè)網(wǎng)站建設(shè),華容品牌網(wǎng)站建設(shè),網(wǎng)站定制,華容網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,華容網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競爭力。可充分滿足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。

paho-mqtt

paho-mqtt 可以說是 Python MQTT 開源客戶端庫中的佼佼者。它由 Eclipse 基金會(huì)主導(dǎo)開發(fā),除了 Python 庫以外,同樣支持各大主流的編程語言,比如 C++、Java、JavaScript、Golang 等。目前 Python 版本已經(jīng)實(shí)現(xiàn)了 3.1 和 3.1.1 MQTT 協(xié)議,在最新開發(fā)版中實(shí)現(xiàn)了 MQTT 5.0。

在基金會(huì)的支持下,以每年一個(gè)版本的速度更新,本文發(fā)布時(shí)的最新版本為 1.5.0(于 2019 年 8 月發(fā)布)。

在 GitHub 主頁上,它提供了從入門的快速實(shí)現(xiàn)到每一個(gè)函數(shù)的詳細(xì)解讀,涵蓋了從初學(xué)者到高級(jí)使用者需要了解的各個(gè)部分。即使遇到超出范圍的問題,在 Google 上搜索,可以得到近 20 萬個(gè)相關(guān)詞條,是目前最為流行的 MQTT 客戶端。

得到如此多的關(guān)注度,除了穩(wěn)定的代碼外,還有其易用性。Paho 的接口使用非常簡單優(yōu)雅,您只需要少量的代碼就能實(shí)現(xiàn) MQTT 的訂閱及消息發(fā)布。

安裝

pip3 install paho-mqtt

或者

git clone https://github.com/eclipse/paho.mqtt.python
cd paho.mqtt.python
python3 setup.py install

訂閱者

import paho.mqtt.client as mqtt

# 連接的回調(diào)函數(shù)
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    client.subscribe("$SYS/#")
    
# 收到消息的回調(diào)函數(shù)
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
    
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("broker.emqx.io", 1883, 60)
client.loop_forever()

發(fā)布者

import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    
client = mqtt.Client()
client.on_connect = on_connect
client.connect("broker.emqx.io", 1883, 60)
for i in range(3):
    client.publish('a/b', payload=i, qos=0, retain=False)
    print(f"send {i} to a/b")
    time.sleep(1)

client.loop_forever()

甚至,你可以通過一行代碼,實(shí)現(xiàn)訂閱、發(fā)布。

import paho.mqtt.subscribe as subscribe

# 當(dāng)調(diào)用這個(gè)函數(shù)時(shí),程序會(huì)堵塞在這里,直到有一條消息發(fā)送到 paho/test/simple 主題
msg = subscribe.simple("paho/test/simple", hostname="broker.emqx.io")
print(f"{msg.topic} {msg.payload}")
import paho.mqtt.publish as publish

# 發(fā)送一條消息
publish.single("a/b", "payload", hostname="broker.emqx.io")
# 或者一次發(fā)送多個(gè)消息
msgs = [{'topic':"a/b", 'payload':"multiple 1"}, ("a/b", "multiple 2", 0, False)]
publish.multiple(msgs, hostname="broker.emqx.io")

HBMQTT

HBMQTT 基于 Python asyncio 開發(fā),僅支持 3.1.1 的 MQTT 協(xié)議。由于使用 asyncio 庫,開發(fā)者需要使用 3.4 以上的 Python 版本。

CPU 的速度遠(yuǎn)遠(yuǎn)快于磁盤、網(wǎng)絡(luò)等 IO 操作,而在一個(gè)線程中,無論 CPU 執(zhí)行得再快,遇到 IO 操作時(shí),都得停下來等待讀寫完成,這無疑浪費(fèi)了許多時(shí)間。

為了解決這個(gè)問題,Python 加入了異步 IO 的特性。在 Python 3.4 中,正式將 asyncio 納入標(biāo)準(zhǔn)庫中,并在 Python 3.5 中,加入了 async/await 關(guān)鍵字。用戶可以很輕松的使用在函數(shù)前加入 async 關(guān)鍵字,使函數(shù)變成異步函數(shù)。

HBMQTT 便是建立在 asyncio 標(biāo)準(zhǔn)庫之上。它允許用戶顯示的設(shè)置異步斷點(diǎn),通過異步 IO,MQTT 客戶端在收取消息或發(fā)送消息時(shí),掛載當(dāng)前的任務(wù),繼續(xù)處理下一個(gè)。

不過 HBMQTT 的知名度卻小得多。在 Google 上搜索,關(guān)于 HBMQTT 僅有 6000 多個(gè)詞條,在 Stack Overflow 上只有 10 個(gè)提問數(shù)。這就意味著,如果選擇 HBMQTT 的話你需要很強(qiáng)的解決問題的能力。

有意思的是,HBMQTT 本身也是一個(gè) MQTT 服務(wù)器。你可以通過 hbmqtt 命令一鍵開啟。

$ hbmqtt
[2020-08-28 09:35:56,608] :: INFO - Exited state new
[2020-08-28 09:35:56,608] :: INFO - Entered state starting
[2020-08-28 09:35:56,609] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connections=-1)

安裝

pip3 install hbmqtt

或者

git clone https://github.com/beerfactory/hbmqtt
cd hbmqtt
python3 setup.py install

訂閱者

import logging
import asyncio
from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_1, QOS_2

async def uptime_coro():
    C = MQTTClient()
    await C.connect('mqtt://broker.emqx.io/')
    await C.subscribe([
            ('$SYS/broker/uptime', QOS_1),
            ('$SYS/broker/load/#', QOS_2),
         ])
    try:
        for i in range(1, 100):
            message = await C.deliver_message()
            packet = message.publish_packet
            print(f"{i}:  {packet.variable_header.topic_name} => {packet.payload.data}")
        await C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
        await C.disconnect()
    except ClientException as ce:
        logging.error("Client exception: %s" % ce)
        
if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)
    asyncio.get_event_loop().run_until_complete(uptime_coro())

發(fā)布者

import logging
import asyncio
import time
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2

async def test_coro():
    C = MQTTClient()
    await  C.connect('mqtt://broker.emqx.io/')
    tasks = [
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)),
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)),
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)),
    ]
    await asyncio.wait(tasks)
    logging.info("messages published")
    await C.disconnect()
    
if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)
    asyncio.get_event_loop().run_until_complete(test_coro())

更多使用細(xì)節(jié)情參考官方文檔:https://hbmqtt.readthedocs.io/en/latest/。

gmqtt

gmqtt 是由個(gè)人開發(fā)者開源的客戶端庫。默認(rèn)支持 MQTT 5.0 協(xié)議,如果連接的 MQTT 代理不支持 5.0 協(xié)議,則會(huì)降級(jí)到 3.1 并重新進(jìn)行連接。

相較于前兩者,gmqtt 還屬于初級(jí)開發(fā)階段,本文發(fā)布時(shí)的版本號(hào)是 0.6.7。但它是早期支持 MQTT 5.0 的 Python 庫之一,因此在網(wǎng)絡(luò)上知名度尚可。

同樣,它建立在 asyncio 庫上,因此需要使用 Python 3.4 以上的版本。

安裝

pip3 install gmqtt

或者

git clone https://github.com/wialon/gmqtt
cd gmqtt
python3 setup.py install

訂閱者

import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    print('Connected')
    
def on_message(client, topic, payload, qos, properties):
    print(f'RECV MSG: {topic} {payload}')
    
def on_subscribe(client, mid, qos, properties):
    print('SUBSCRIBED')
    
def on_disconnect(client, packet, exc=None):
    print('Disconnected')
    
def ask_exit(*args):
    STOP.set()

async def main(broker_host):
    client = MQTTClient("client-id")
    
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_subscribe = on_subscribe
    client.on_disconnect = on_disconnect
    
    # 連接 MQTT 代理
    await client.connect(broker_host)
    
    # 訂閱主題
    client.subscribe('TEST/#')
    
    # 發(fā)送測試數(shù)據(jù)
    client.publish("TEST/A", 'AAA')
    client.publish("TEST/B", 'BBB')
    
    await STOP.wait()
    await client.disconnect()
    
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    
    loop.add_signal_handler(signal.SIGINT, ask_exit)
    loop.add_signal_handler(signal.SIGTERM, ask_exit)

    host = 'broker.emqx.io'
    loop.run_until_complete(main(host))

發(fā)布者

import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    print('Connected')
    client.subscribe('TEST/#', qos=0)
    
def on_message(client, topic, payload, qos, properties):
    print(f'RECV MSG: {topic}, {payload}')
    
def on_disconnect(client, packet, exc=None):
    print('Disconnected')
    
def ask_exit(*args):
    STOP.set()
    
async def main(broker_host):
    client = MQTTClient("client-id")
    
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    
    await client.connect(broker_host)
    
    client.publish('TEST/TIME', str(time.time()), qos=1)
    
    await STOP.wait()
    await client.disconnect()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    
    loop.add_signal_handler(signal.SIGINT, ask_exit)
    loop.add_signal_handler(signal.SIGTERM, ask_exit)
    
    host = 'broker.emqx.io'  
    loop.run_until_complete(main(host))

如何選擇

在介紹完這三款 Python MQTT 客戶端庫之后,我們再來看看如何為自己選擇合適的 MQTT 客戶端庫。這三個(gè)客戶端各有自己的優(yōu)缺點(diǎn):

paho-mqtt 有著最優(yōu)秀的文檔,代碼風(fēng)格易于理解,同時(shí)有著強(qiáng)大的基金會(huì)支持,但目前文檔的版本還不支持 MQTT 5.0。

HBMQTT 使用 asyncio 庫實(shí)現(xiàn),可以優(yōu)化網(wǎng)絡(luò) I/O 帶來的延遲。但是代碼風(fēng)格不友好,同樣不支持 MQTT 5.0。

gmqtt 同樣通過 asyncio 庫實(shí)現(xiàn),相比 HBMQTT ,代碼風(fēng)格友好,最重要的是,它支持 MQTT 5.0。但開發(fā)進(jìn)程慢,未來前景不明。

因此,在選擇時(shí),您可以參考一下的思路:

  • 如果您是正常開發(fā),想要將其運(yùn)用在生產(chǎn)環(huán)境中,paho-mqtt 無疑是最好的選擇,其穩(wěn)定性和代碼易讀性遠(yuǎn)遠(yuǎn)超過其它兩個(gè)庫。在遇到問題時(shí),優(yōu)秀的文檔和互聯(lián)網(wǎng)上大量的詞條,也能幫您找到更多的解決方案。

  • 對(duì)于熟練使用 asyncio 庫的讀者,不妨嘗試一下 HBMQTT 和 gmqtt。

  • 如果您想要學(xué)習(xí)、參與開源項(xiàng)目或者使用 MQTT 5.0, 則不妨試用一下 gmqtt,并嘗試為其共享一份代碼吧。

“Python MQTT客戶端怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

本文標(biāo)題:PythonMQTT客戶端怎么使用
本文地址:http://jinyejixie.com/article16/gpssgg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷、定制開發(fā)企業(yè)建站、網(wǎng)站設(shè)計(jì)公司App設(shè)計(jì)、自適應(yīng)網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)