Python客户端连接配置
更新时间 2025-06-16 11:59:53
最近更新时间: 2025-06-16 11:59:53
安装依赖
pip3 install paho-mqtt
代码示例
import ssl
import random
from paho.mqtt import client as paho_client
# 填入您在mqtt控制台创建的ACL账号密码。
USER_NAME = "your-user-name"
AUTH_PASSWORD = "your-password"
# 是否使用tls加密传输
isTls = True
class MQTTClient:
def __init__(self):
# 填写mqtt云消息服务的接入点,去掉:{端口号}部分
self.broker = "tcp://localhost"
# 指定连接客户端的id
self.client_id = "ctg-mqtt-client-test"
self.client = None
def on_connect(self, client, userdata, flags, rc):
# 连接建立成功
if rc == 0:
print("Connected to MQTT Broker!")
else:
print(f"Failed to connect, return code {rc}")
def on_disconnect(self, client, userdata, rc):
# 连接丢失
print(f"Disconnected with result code {rc}")
# 可以在这里添加重连逻辑
def on_message(self, client, userdata, msg):
# 收到消息的回调
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
def on_publish(self, client, userdata, mid):
# 成功发送消息到服务端
print(f"Message {mid} published successfully")
def connect_mqtt(self):
# 创建客户端实例
self.client = paho_client.Client(client_id=self.client_id)
self.client.username_pw_set(USER_NAME, AUTH_PASSWORD)
# 设置回调函数
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
self.client.on_publish = self.on_publish
# 设置TLS
if isTls:
# 创建不验证证书的SSL上下文
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
self.client.tls_set_context(context)
# 设置其他连接选项
self.client.connect(self.broker.replace("tcp://", "").replace("ssl://", ""),
port=8883 if isTls else 1883)
self.client.loop_start() # 启动网络循环
return self.client
def run(self):
try:
print(f"Connecting to broker: {self.broker}")
client = self.connect_mqtt()
# 这里可以添加订阅或发布逻辑
# 例如: client.subscribe("topic/test")
# 例如: client.publish("topic/test", "Hello World")
# 保持连接
while True:
pass
except Exception as e:
print(f"Error occurred: {e}")
if self.client:
self.client.disconnect()
print("Disconnected")
raise
if __name__ == '__main__':
mqtt_client = MQTTClient()
mqtt_client.run()