什么是MQTT协议?
MQTT是一种轻量级的发布/订阅消息传输协议,设计用于低带宽和高延迟的网络环境,非常适合物联网设备之间的通信。其主要特点包括:
- 发布/订阅模型:支持多对多的消息传递。
- 轻量级设计:较低的网络开销。
- 支持QoS等级:提供不同的消息传递可靠性。
项目背景
本文的示例代码实现了一个基于Python的MQTT客户端。以下功能涵盖在代码中:
- 通过SSL安全连接到MQTT代理。
- 支持动态订阅多个主题。
- 异步处理消息,提高性能和扩展性。
- 提供自定义消息处理功能。
核心代码解析
以下是代码中的主要功能与模块解析:
MQTT 客户端类
1
2
3
4
5
6
7
|
class MQTTClient: def __init__( self , broker, port, username, password, ca_cert, topics): self .client = mqtt.Client() self .client.username_pw_set( self .username, self .password) self .client.tls_set(ca_certs = self .ca_cert) self .client.on_connect = self .on_connect self .client.on_message = self .on_message |
tls_set
:启用SSL/TLS以确保通信安全。- 主题订阅:在连接成功时,自动订阅指定的主题。
自定义消息处理
1
2
|
def set_message_handler( self , handler): self .custom_message_handler = handler |
用户可通过该方法传入自定义的回调函数,从而根据业务逻辑处理消息。
异步启动客户端
1
2
3
|
async def start_async( self ): self .connect() await asyncio.get_event_loop().run_in_executor( None , self .client.loop_forever) |
通过异步事件循环确保消息的高效处理,同时避免阻塞主线程。
示例代码集成
在主文件main.py
中,定义了如下流程:
- 初始化MQTT客户端并传入必要的参数。
- 注册一个自定义的消息处理函数。
- 利用
asyncio
实现消息处理和其他任务的并发执行。
1
2
3
4
5
|
async def on_mqtt_message(topic, payload): print (f "Custom handler: {topic} -> {payload}" ) mqtt_client.set_message_handler(on_mqtt_message) await mqtt_client.start_async() |
使用指南
安装依赖
确保安装了paho-mqtt
库:
1
|
pip install paho-mqtt |
配置MQTT代理
更新代码中的代理地址、端口、用户名、密码和证书路径。
运行程序
使用以下命令运行程序:
1
|
python main.py |
总结
快速搭建一个基于MQTT协议的实时通信系统。这种架构不仅适用于物联网场景,也可以在各种需要实时数据推送的应用中发挥作用,例如聊天应用和实时监控系统。
示例代码
mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
import paho.mqtt.client as mqtt from datetime import datetime import asyncio class MQTTClient: def __init__( self , broker, port, username, password, ca_cert, topics): """ 初始化 MQTT 客户端 """ self .broker = broker self .port = port self .username = username self .password = password self .ca_cert = ca_cert self .topics = topics self .client = mqtt.Client() # 配置 MQTT 客户端 self .client.username_pw_set( self .username, self .password) self .client.tls_set(ca_certs = self .ca_cert) self .client.on_connect = self .on_connect self .client.on_message = self .on_message self .custom_message_handler = None # 自定义消息处理器 def set_message_handler( self , handler): """ 设置自定义消息处理回调函数 """ self .custom_message_handler = handler def on_connect( self , client, userdata, flags, rc): """ 连接成功时的回调 """ if rc = = 0 : print ( "SSL连接成功" ) for topic in self .topics: client.subscribe(topic) print (f "已订阅主题: {topic}" ) else : print (f "连接失败,返回码: {rc}" ) def on_message( self , client, userdata, msg): """ 收到消息时的回调 """ current_time = datetime.now() payload = msg.payload.decode() print (f "收到消息: {msg.topic} -> {payload} 时间: {current_time}" ) if self .custom_message_handler and self .event_loop: asyncio.run_coroutine_threadsafe( self .custom_message_handler(msg.topic, payload), self .event_loop ) def connect( self ): """ 连接到 MQTT 服务器 """ self .client.connect( self .broker, self .port, keepalive = 60 ) async def start_async( self ): """ 异步运行 MQTT 客户端 """ self .connect() # 确保连接到 MQTT 服务器 print ( "Starting MQTT client loop..." ) # 异步运行 MQTT 客户端的事件循环 loop = asyncio.get_event_loop() await loop.run_in_executor( None , self .client.loop_forever) |
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
import asyncio from mqtt import MQTTClient # MQTT 配置 MQTT_BROKER = "你的服务器地址" MQTT_PORT = 8883 # 使用 SSL 的端口 MQTT_USERNAME = "用户名" MQTT_PASSWORD = "密码" CA_CERT = "./emqxsl-ca.crt" # CA 证书路径 TOPICS = [ "clients/disconnect" , "uhome/esp32" ] # 订阅的主题列表 async def main(): loop = asyncio.get_running_loop() mqtt_client = MQTTClient( broker = MQTT_BROKER, port = MQTT_PORT, username = MQTT_USERNAME, password = MQTT_PASSWORD, ca_cert = CA_CERT, topics = TOPICS ) async def on_mqtt_message(topic, payload): print (f "Custom handler: {topic} -> {payload}" ) mqtt_client.set_message_handler(on_mqtt_message) mqtt_client.event_loop = loop # 将事件循环传递给 MQTT 客户端 await mqtt_client.start_async() await asyncio.gather(websocket_task, periodic_task) if __name__ = = "__main__" : asyncio.run(main()) |