消息收发 Eclipse Paho C SDK Golang Eclipse Paho Golang SDK Node.js MQTTJS PHP MosquittoPHP 主题规则 主题形式:父topic/子级topic1/子级topic2…。(父topic需要先创建) 使用MQTT消息队列发消息,会把消息以父Topic主题分类保存在kafka上,应用服务可通过kafka 客户端以父Topic为主题消费消息。 云端应用服务统一发送到kafka topic为mq2mqtt的主题队列上,移动端topic、会话属性Qos和cleansession保存在Record Header中,MQTT设备通过订阅移动端topic,实现云端到移动端通讯。 Kakfa header与mqtt属性映射如下表: Kafka Header MQTT属性 qoslevel Qos cleanSession cleanSession mqttTopic 主题 生产消费消息 MQTT客户端收发 使用MQTT SDK接入终端连接地址进行消息生产消费。 生产消息代码示例: import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class PubMsgTest { // 填入您在mqtt控制台创建的ACL账号密码。 private static final String USERNAME "yourusername"; private static final String AUTHPASSWORD "yourpassword"; public static void main(String[] args) { String topic "topic1/a/b/c"; String content "hello ctgmqtt service"; int qos 2; // 填写mqtt云消息服务的接入点。 String broker "tcp://localhost:1883"; // 指定连接客户端的id,该id可用于查询连接会话信息以及设备轨迹信息。 String clientId "ctgmqttclientpubtest"; MemoryPersistence persistence new MemoryPersistence(); try { MqttClient myClient new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setUserName(USERNAME); connOpts.setPassword(AUTHPASSWORD.toCharArray()); System.out.println("Connecting to broker: " + broker); myClient.connect(connOpts); System.out.println("Connected"); for (int i 0; i < 10; i++) { System.out.println("Publishing message: " + content); MqttMessage message new MqttMessage(content.getBytes()); message.setQos(qos); myClient.publish(topic, message); System.out.println("Message published"); } myClient.disconnect(); System.out.println("Disconnected"); System.exit(0); } catch (MqttException me) { // 打印详细的错误信息。 System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } } 接收消息代码示例: import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class SubMsgTest { // 填入您在mqtt控制台创建的ACL账号密码。 private static final String USERNAME "yourusername"; private static final String AUTHPASSWORD "yourpassword"; static String topic "topic1/a/b/c"; static int qos 2; public static void main(String[] args) { // 填写mqtt云消息服务的接入点。 String broker "tcp://localhost:1883"; // 指定连接客户端的id,该id可用于查询连接会话信息以及设备轨迹信息。 String clientId "ctgmqttclientsubtest"; MemoryPersistence persistence new MemoryPersistence(); try { MqttClient myClient getMqttClient(broker, clientId, persistence); MqttConnectOptions connOpts new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setUserName(USERNAME); connOpts.setPassword(AUTHPASSWORD.toCharArray()); myClient.connect(connOpts); } catch (MqttException me) { // 打印详细的错误信息。 System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } private static MqttClient getMqttClient(String broker, String clientId, MemoryPersistence persistence) throws MqttException { MqttClient myClient new MqttClient(broker, clientId, persistence); myClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { System.out.println("connected to broker: " + broker); try { myClient.subscribe(topic, qos); } catch (MqttException e) { throw new RuntimeException(e); } } @Override public void connectionLost(Throwable cause) { System.out.println("connection lost"); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("message is :" + message); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); return myClient; } }