paho.mqtt.c开源库中有同步API和异步API接口,本文仅涉及异步API相关代码在linux平台下的使用
1.库头文件
#include "MQTTAsync.h"
2.关键结构体
mqtt全局变量
typedef struct MQTTAsync_struct
{
char* serverURI;//服务端的地址 tcp://xxx.xx.x.x:port
int ssl;//加密证书设置
int websocket;
Clients* c;//支持多客户端,设置客户端链表
/*一下都是全局的 */
MQTTAsync_connectionLost* cl; //连接终端时的回调
MQTTAsync_messageArrived* ma;//收到消息是的回调
MQTTAsync_deliveryComplete* dc;//发送完成时的回调
void* clContext; /* 连接丢失时的context*/
void* maContext; /*有消息 时的context*/
void* dcContext; /*发送完成 时的context*/
MQTTAsync_connected* connected;//连接完成时的回调
void* connected_context; /* 连接回调的context*/
MQTTAsync_disconnected* disconnected;
void* disconnected_context; /* 和断开回调的context*/
MQTTAsync_updateConnectOptions* updateConnectOptions;
void* updateConnectOptions_context;
/* 每次连接时(重连、自动重连)调用 */
MQTTAsync_command connect; /* 连接操作属性 */
MQTTAsync_command disconnect; /* 断连操作属性 */
MQTTAsync_command* pending_write;
List* responses;
unsigned int command_seqno;
MQTTPacket* pack;
/* 离线缓存 */
MQTTAsync_createOptions* createOptions;
int shouldBeConnected;
int noBufferedMessages; /* 本client当前缓存消息数量 */
/* 自动重连相关设置 */
int automaticReconnect;
int minRetryInterval;
int maxRetryInterval;
int serverURIcount;
char** serverURIs;
int connectTimeout;
int currentInterval;
int currentIntervalBase;
START_TIME_TYPE lastConnectionFailedTime;
int retrying;
int reconnectNow;
/* MQTT V5 属性 */
MQTTProperties* connectProps;
MQTTProperties* willProps;
} MQTTAsyncs;
异步调用过程,队列消息结构体,
typedef struct
{
int type;
MQTTAsync_onSuccess* onSuccess;
MQTTAsync_onFailure* onFailure;
MQTTAsync_onSuccess5* onSuccess5;
MQTTAsync_onFailure5* onFailure5;
MQTTAsync_token token;
void* context;
START_TIME_TYPE start_time;
MQTTProperties properties;
union
{
struct
{
int count;
char** topics;
int* qoss;
MQTTSubscribe_options opts;
MQTTSubscribe_options* optlist;
} sub;//订阅
struct
{
int count;
char** topics;
} unsub;//退订
struct
{
char* destinationName;
int payloadlen;
void* payload;
int qos;
int retained;
} pub;//发布
struct
{
int internal;
int timeout;
enum MQTTReasonCodes reasonCode;
} dis;//断开连接
struct
{
int currentURI;
int MQTTVersion; /**当前用来连接的mqtt版本 */
} conn;//连接
} details;
} MQTTAsync_command;
3.重要的锁
pahomqtt.c中异步API为了保证线程安全,用到了好几把全局锁:
mqttasync_mutex:异步设置、操作时,基本都需要加锁操作
socket_mutex:socket操作时使用
mqttcommand_mutex:mqtt内部command消息处理时,几乎都会操作这把锁
log_mutex:调用日志打印函数时,保证线程安全的锁
mutex_type实际上是互斥锁pthread_mutex_t
#define mutex_type pthread_mutex_t*
对锁的操作
CreateMutex:初始化创建锁,主要发生在MQTTAsync_init阶段
CloseHandle:关闭销毁锁
MQTTAsync_lock_mutex:加锁,内部实际调用pthread_mutex_lock
MQTTAsync_unlock_mutex:解锁,内部实际调用pthread_mutex_unlock
4.设置日志水平及回调
MQTTAsync_setTraceCallback()
->Log_setTraceCallback()
-->trace_callback = callback;
在Log_output时会调用trace_callback
MQTTAsync_setTraceLevel()
->Log_setTraceLevel
-->trace_output_level
在调用Log时Log_trace会打印库内部的相关消息,可以方便调试
5.设置客户端参数,创建客户端
MQTTAsync_createOptions_initializer包含了默认配置
{ {'M', 'Q', 'C', 'O'}, 2, 0, 100, MQTTVERSION_DEFAULT, 0, 0, 1, 1}
可以在此基础上进行修改某些参数,如:
.sendWhileDisconnected = 0;
.maxBufferedMessages = MQTT_MAX_BUFFERED_MESSAGES;
设置回调并创建新客户端
MQTTAsync_createWithOptions()
->Log_initialize:
->Socket_outInitialize:初始化socket模块
->Socket_setWriteCompleteCallback:设置写完成回调
->bstate->clients, MQTTAsync_handles, MQTTAsync_commands, m->responses链表的初始化
->创建一个MQTTAsyncs 并设置其中serverURI、,responses、shouldBeConnected,Clients中context、outboundMsgs、messageQueue、inboundMsgs、MQTTVersion
MQTTAsync_setConnected()
->connected_context
->connected
MQTTAsync_setCallbacks()
设置连接丢失、收发消息时回调
6.设置连接参数,开始连接
MQTTAsync_connectOptions_initializer
{ {'M', 'Q', 'T', 'C'}, 8, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
conn_opts.serverURIcount = mqtt_async.server_count;
conn_opts.serverURIs = mqtt_async.mqtt_urls;
conn_opts.keepAliveInterval = KEEPALIVE_INTERVAL;
conn_opts.cleansession = 0; // 0开启会话重用,1关闭
conn_opts.onSuccess = onConnect; // onconnetct != connectted
conn_opts.onFailure = onConnectFailure;
conn_opts.automaticReconnect = 1;
MQTTAsync_connect()
->MQTTAsync_sendThread:发送线程
->MQTTAsync_receiveThread:接收线程
->setRetryLoopInterval
->MQTTAsync_addCommand:向MQTTAsync_commands队列添加CONNECT类型命令
7.收发线程
MQTTAsync_sendThread()发送线程
->MQTTAsync_processCommand:不断的从MQTTAsync_commands中取命令
-->如果type为CONNET, MQTTProtocol_connect
--->Socket_new
---->socket(), connect()进行网络连接
-->如果type为PUBLISH, MQTTProtocol_startPublish
--->MQTTProtocol_createMessage()MQTTProtocol_startPublishCommon()
---->MQTTPacket_send_publish
----->MQTTPacket_sends
------>Socket_putdatas,writev() 完成消息的编码,写socket
-->其他类型
MQTTAsync_receiveThread():接收线程
->MQTTAsync_cycle:不断的从连接中读取一条消息
-->MQTTPacket_Factory
--->WebSocket_getch
---->WebSocket_getRawSocketData
----->Socket_getdata, recv()获取socket消息,并进行解码
->MQTTAsync_deliverMessage 调用messageArrived回调进行处理
->pack->header.bits.type,根据type进行不同的处理
-->如CONNACK,m->connect.onSuccess()回调
-->如DISCONNECT, m->disconnected()回调
8.其他API向MQTTAsync_commands添加消息
MQTTAsync_sendMessage
->MQTTAsync_send
-->MQTTAsync_addCommand():添加一个PUBLISH类型到MQTTAsync_commands
MQTTAsync_disconnect()
->MQTTAsync_disconnect1():添加一个DISCONNECT类型到MQTTAsync_commands
9.客户端销毁
MQTTAsync_destroy()
->MQTTAsync_freeResponses():清空res链表中的消息
->MQTTAsync_freeCommands():清空command中消息
->MQTTProtocol_freeClient():释放客户端