活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 618智算钜惠季 爆款云主机2核4G限时秒杀,88元/年起!
  • 免费体验DeepSeek,上天翼云息壤 NEW 新老用户均可免费体验2500万Tokens,限时两周
  • 云上钜惠 HOT 爆款云主机全场特惠,更有万元锦鲤券等你来领!
  • 算力套餐 HOT 让算力触手可及
  • 天翼云脑AOne NEW 连接、保护、办公,All-in-One!
  • 中小企业应用上云专场 产品组合下单即享折上9折起,助力企业快速上云
  • 息壤高校钜惠活动 NEW 天翼云息壤杯高校AI大赛,数款产品享受线上订购超值特惠
  • 天翼云电脑专场 HOT 移动办公新选择,爆款4核8G畅享1年3.5折起,快来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

智算服务

打造统一的产品能力,实现算网调度、训练推理、技术架构、资源管理一体化智算服务
智算云(DeepSeek专区)
科研助手
  • 算力商城
  • 应用商城
  • 开发机
  • 并行计算
算力互联调度平台
  • 应用市场
  • 算力市场
  • 算力调度推荐
一站式智算服务平台
  • 模型广场
  • 体验中心
  • 服务接入
智算一体机
  • 智算一体机
大模型
  • DeepSeek-R1-昇腾版(671B)
  • DeepSeek-R1-英伟达版(671B)
  • DeepSeek-V3-昇腾版(671B)
  • DeepSeek-R1-Distill-Llama-70B
  • DeepSeek-R1-Distill-Qwen-32B
  • Qwen2-72B-Instruct
  • StableDiffusion-V2.1
  • TeleChat-12B

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场创新解决方案
办公协同
  • WPS云文档
  • 安全邮箱
  • EMM手机管家
  • 智能商业平台
财务管理
  • 工资条
  • 税务风控云
企业应用
  • 翼信息化运维服务
  • 翼视频云归档解决方案
工业能源
  • 智慧工厂_生产流程管理解决方案
  • 智慧工地
建站工具
  • SSL证书
  • 新域名服务
网络工具
  • 翼云加速
灾备迁移
  • 云管家2.0
  • 翼备份
资源管理
  • 全栈混合云敏捷版(软件)
  • 全栈混合云敏捷版(一体机)
行业应用
  • 翼电子教室
  • 翼智慧显示一体化解决方案

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
  • 天翼云EasyCoding平台
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼云东升计划
  • 适配中心
  • 东升计划
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
开放能力
  • EasyCoding敏捷开发平台
培训与认证
  • 天翼云学堂
  • 天翼云认证
魔乐社区
  • 魔乐社区

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 建议与反馈
  • 用户体验官
  • 服务保障
  • 客户公告
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 智算服务
  • 产品
  • 解决方案
  • 应用商城
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      文档中心

      分布式消息服务MQTT

      分布式消息服务MQTT

        • 产品动态
        • 服务公告
        • 2024
        • 【优惠】正式开放2年7折,3年5折包年折扣
        • 【通知】通用型主机规格调整为白名单特性,上线计算增强型主机规格
        • 【优惠】分布式消息服务MQTT新增包年优惠和产品资费价格下调
        • 产品介绍
        • 产品定义
        • 产品优势
        • 功能特性
        • 应用场景
        • 产品规格
        • 使用限制
        • 安全方案
        • 名词解释
        • 与其他服务关系
        • 计费说明
        • 产品资费
        • 计费项
        • 计费模式
        • 续费、到期与欠费
        • 退订
        • 快速入门
        • 入门指引
        • 环境准备
        • 创建实例
        • 连接实例
        • 消息收发
        • 用户指南
        • 实例管理
        • 创建实例
        • 实例列表
        • 实例详情
        • 续订和退订
        • 认证授权
        • 连接查询
        • 设备轨迹
        • 资源报表
        • 监控告警
        • API参考
        • 调用前须知
        • 概述
        • 如何调用API
        • 认证鉴权
        • 信息的获取
        • 基本签名流程
        • 公共参数
        • API
        • 设备客户端
        • 查设备轨迹
        • 客户端下线
        • 查设备连接信息
        • 批量查询设备在线状态
        • 查询设备订阅信息
        • 查询设备是否在线
        • 退订客户端主题
        • 认证授权
        • 认证用户列表
        • 创建认证用户
        • 删除认证用户
        • 查认证用户ACL
        • 认证用户授权
        • 回收认证用户权限
        • 修改认证用户密码
        • 消息发送
        • 从服务端应用向消息队列MQTT版发送单条消息
        • 第三方认证授权
        • 配置MySQL认证授权
        • 启用MySQL认证授权
        • 停用MySQL认证授权
        • 更新MySQL认证授权
        • 查询MySQL认证授权配置
        • 最佳实践
        • Java客户端连接配置
        • 常见问题
        • 计费类
        • 购买类
        • 操作类
        • 操作类常见问题
        • 如何配置MQTT开源Java SDK的客户端自动连接?
        • keepalive一般设置为多少秒比较好?
          无相关产品

          本页目录

          帮助中心分布式消息服务MQTT快速入门消息收发
          消息收发
          更新时间 2024-12-09 14:35:41
          • 新浪微博
          • 微信
            扫码分享
          • 复制链接
          最近更新时间: 2024-12-09 14:35:41
          分享文章
          • 新浪微博
          • 微信
            扫码分享
          • 复制链接
          本文为您分布式消息服务MQTT消息收发内容。

          消息数据存储

          终端消息数据按父topic存储至kafka队列,需先在控制台创建父topic;对未创建父topic的消息可正常收发,但不会存储至kafka队列。

          Kafka存储内容格式:

          {
          "clientId": 设备clientId,
          
          "topic": 主题,
          
          "payload": 消息内容,
          
          "ts": 发送的时间戳
          }
          

          会话机制

          终端 clean session=true,断线后会话信息清除,再次上线后之前所有的订阅关系以及离线消息丢失。 clean session=false断开连接的情况下,MQTT Broker也会为断连客户端保存一个会话,默认2小时,超期未重连订购关系清除;对于clean session=false的客户端断线重连后可接收Qos>0的离线消息。对于客户端因网络等各种原因断线,需要加上重连和订购关系重新订购机制。

          离线消息

          对于clean session=false的客户端,在未超出会话失效期,断线重连后可接收Qos>0的离线消息。

          系统主题

          系统主题 说明
          mq2mqtt 用于云端服务向终端发送消息。发往该主题消息会转发至MQTT Broker实现云端与移动端互通
          mqtt-device-connect 设备上线主题,内容 {"clientid":客户端ID,"ts":上线时间戳 }
          mqtt-device-disconnect 设备下线主题,内容 {"clientid":客户端ID,"ts":下线时间戳 }

          SDK支持

          分布式消息服务MQTT支持标准的MQTT协议,理论上适配所有的MQTT客户端SDK。

          推荐对应的第三方 SDK 如下表:

          语言/平台 推荐的第三方SDK
          Java Eclipse Paho SDK
          iOS MQTT-Client-Framework
          Android Eclipse Paho SDK
          JavaScript Eclipse Paho JavaScript
          Python Eclipse Paho Python SDK
          C Eclipse Paho C SDK
          C# Eclipse Paho C# SDK
          Golang Eclipse Paho Golang SDK
          Node.js MQTT-JS
          PHP Mosquitto-PHP

          主题规则

          主题形式:父topic/子级topic1/子级topic2…。(父topic需要先创建)

          使用MQTT消息队列发消息,会把消息以父Topic主题分类保存在kafka上,应用服务可通过kafka 客户端以父Topic为主题消费消息。

          云端应用服务统一发送到kafka topic为mq2mqtt的主题队列上,移动端topic、会话属性Qos和cleansession保存在Record Header中,MQTT设备通过订阅移动端topic,实现云端到移动端通讯。

          Kakfa header与mqtt属性映射如下表:

          Kafka Header MQTT属性
          qoslevel Qos
          cleanSession cleanSession
          mqttTopic 主题

          生产消费消息

          MQTT客户端收发

          使用MQTT SDK接入终端连接地址进行消息生产消费。

          生产消息代码示例:

          import org.eclipse.paho.client.mqttv3.MqttClient;
          import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
          import org.eclipse.paho.client.mqttv3.MqttException;
          import org.eclipse.paho.client.mqttv3.MqttMessage;
          import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
          
          public class PubMsgTest {
              // 填入您在mqtt控制台创建的ACL账号密码。
              private static final String USER_NAME = "your-user-name";
              private static final String AUTH_PASSWORD = "your-password";
          
              public static void main(String[] args) {
          
                  String topic = "topic-1/a/b/c";
                  String content = "hello ctg-mqtt service";
                  int qos = 2;
                  // 填写mqtt云消息服务的接入点。
                  String broker = "tcp://localhost:1883";
                  // 指定连接客户端的id,该id可用于查询连接会话信息以及设备轨迹信息。
                  String clientId = "ctg-mqtt-client-pub-test";
                  MemoryPersistence persistence = new MemoryPersistence();
          
                  try {
                      MqttClient myClient = new MqttClient(broker, clientId, persistence);
                      MqttConnectOptions connOpts = new MqttConnectOptions();
                      connOpts.setCleanSession(true);
                      connOpts.setUserName(USER_NAME);
                      connOpts.setPassword(AUTH_PASSWORD.toCharArray());
                      System.out.println("Connecting to broker: " + broker);
                      myClient.connect(connOpts);
                      System.out.println("Connected");
                      for (int i = 0; i < 10; i++) {
                          System.out.println("Publishing message: " + content);
                          MqttMessage message = new MqttMessage(content.getBytes());
                          message.setQos(qos);
                          myClient.publish(topic, message);
                          System.out.println("Message published");
                      }
                      myClient.disconnect();
                      System.out.println("Disconnected");
                      System.exit(0);
                  } catch (MqttException me) {
                      // 打印详细的错误信息。
                      System.out.println("reason " + me.getReasonCode());
                      System.out.println("msg " + me.getMessage());
                      System.out.println("cause " + me.getCause());
                      System.out.println("excep " + me);
                      me.printStackTrace();
                  }
              }
          }
          

          接收消息代码示例:

          import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
          import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
          import org.eclipse.paho.client.mqttv3.MqttClient;
          import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
          import org.eclipse.paho.client.mqttv3.MqttException;
          import org.eclipse.paho.client.mqttv3.MqttMessage;
          import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
          
          public class SubMsgTest {
          
              // 填入您在mqtt控制台创建的ACL账号密码。
              private static final String USER_NAME = "your-user-name";
              private static final String AUTH_PASSWORD = "your-password";
          
              static String topic = "topic-1/a/b/c";
              static int qos = 2;
          
              public static void main(String[] args) {
          
                  // 填写mqtt云消息服务的接入点。
                  String broker = "tcp://localhost:1883";
                  // 指定连接客户端的id,该id可用于查询连接会话信息以及设备轨迹信息。
                  String clientId = "ctg-mqtt-client-sub-test";
                  MemoryPersistence persistence = new MemoryPersistence();
                  try {
                      MqttClient myClient = getMqttClient(broker, clientId, persistence);
                      MqttConnectOptions connOpts = new MqttConnectOptions();
                      connOpts.setCleanSession(true);
                      connOpts.setUserName(USER_NAME);
                      connOpts.setPassword(AUTH_PASSWORD.toCharArray());
                      myClient.connect(connOpts);
                  } catch (MqttException me) {
                      // 打印详细的错误信息。
                      System.out.println("reason " + me.getReasonCode());
                      System.out.println("msg " + me.getMessage());
                      System.out.println("cause " + me.getCause());
                      System.out.println("excep " + me);
                      me.printStackTrace();
                  }
              }
          
              private static MqttClient getMqttClient(String broker, String clientId, MemoryPersistence persistence) throws MqttException {
                  MqttClient myClient = new MqttClient(broker, clientId, persistence);
                  myClient.setCallback(new MqttCallbackExtended() {
                      @Override
                      public void connectComplete(boolean reconnect, String serverURI) {
                          System.out.println("connected to broker: " + broker);
                          try {
                              myClient.subscribe(topic, qos);
                          } catch (MqttException e) {
                              throw new RuntimeException(e);
                          }
                      }
          
                      @Override
                      public void connectionLost(Throwable cause) {
                          System.out.println("connection lost");
                      }
          
                      @Override
                      public void messageArrived(String topic, MqttMessage message) throws Exception {
                          System.out.println("message is :" + message);
                      }
          
                      @Override
                      public void deliveryComplete(IMqttDeliveryToken token) {
                      }
                  });
                  return myClient;
              }
          }
          

          MQTT发送Kafka接收

          终端设备使用MQTT SDK接入终端连接地址进行消息发布,云端应用服务使用Kafka sdk接入

          服务端连接地址按父主题进行数据消费。

          MQTT发送顺序消息kafka接收顺序消息

          创建父主题,类型分区顺序,父主题以orderMsg2mq-开头。

          终端设备使用MQTT SDK接入终端连接地址进行消息发布,云端应用服务使用kafka sdk接入

          服务端连接地址按父主题进行分区顺序数据消费

          Kafka发送MQTT接收

          云端应用服务使用kafka sdk接入服务端连接地址往系统主题mq2mqtt下发指令,终端设备使用MQTT SDK接入终端连接地址接收;

          示例:

          import org.apache.kafka.clients.producer.KafkaProducer;
          import org.apache.kafka.clients.producer.Producer;
          import org.apache.kafka.clients.producer.ProducerRecord;
          import org.apache.kafka.common.header.internals.RecordHeader;
          import org.apache.kafka.common.header.internals.RecordHeaders;
          
          import java.util.Properties;
          
          public class PubMsgTest {
          
              public static void main(String[] args) {
                  Properties props = new Properties();
                  // 填入您在mqtt控制台查看到的kafka接入点信息。
                  props.put("bootstrap.servers", "localhost:9092");
                  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          
                  Producer<Object, String> producer = new KafkaProducer<>(props);
          
                  // 这里需要指定系统内置的流转到mqtt服务端的特殊kafka主题,一般命名为:mq2mqtt。
                  String topic = "mq2mqtt";
                  // kafka消息需要在header中指定需要发往mqtt订阅的主题以及一些会话属性、qos等级等信息。
                  RecordHeaders headers = new RecordHeaders();
                  headers.add(new RecordHeader("qosLevel", "2".getBytes()));
                  headers.add(new RecordHeader("cleanSession", "true".getBytes()));
                  // 这里需要指定您的mqtt客户端订阅的主题,支持topic filter。
                  headers.add(new RecordHeader("mqttTopic", "topic-1/a/b/c".getBytes()));
                  byte[] payload = new byte[1026 * 1024];
                  for (int i = 0; i < 1026 * 1024; i++) {
                      payload[i] = 'x';
                  }
                  try {
                      for (int i = 0; i < 1000; i++) {
                          String message = "Message- " + i + " " + new String(payload);
                          producer.send(new ProducerRecord<>(topic, null, null, null, message, headers));
          //                System.out.println("Sent: " + message);
                          System.out.println("sent successfully");
                      }
                  } catch (Exception e) {
                      e.printStackTrace();
                  } finally {
                      producer.close();
                  }
              }
          }
          
          文档反馈

          建议您登录后反馈,可在建议与反馈里查看问题处理进度

          鼠标选中文档,精准反馈问题

          选中存在疑惑的内容,即可快速反馈问题,我们会跟进处理

          知道了

          上一篇 :  连接实例
          下一篇 :  用户指南
          搜索 关闭
          ©2025 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
          公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
          备案 京公网安备11010802043424号 京ICP备 2021034386号
          ©2025天翼云科技有限公司版权所有
          京ICP备 2021034386号
          备案 京公网安备11010802043424号
          增值电信业务经营许可证A2.B1.B2-20090001
          用户协议 隐私政策 法律声明