活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 安全隔离版OpenClaw NEW OpenClaw云服务器专属“龙虾“套餐低至1.5折起
  • 聚力AI赋能 天翼云大模型专项 大模型特惠专区·Token Plan 轻享包低至9.9元起
  • 青云志云端助力计划 NEW 一站式科研助手,海外资源安全访问平台,助力青年翼展宏图,平步青云
  • 企业出海解决方案 NEW 助力您的业务扬帆出海,通达全球!
  • 天翼云信创专区 NEW “一云多芯、一云多态”,国产化软件全面适配,国产操作系统及硬件芯片支持丰富
  • 中小企业服务商合作专区 国家云助力中小企业腾飞,高额上云补贴重磅上线
  • 云上钜惠 爆款云主机全场特惠,2核4G只要1.8折起!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

息壤智算

领先开放的智算服务平台,提供算力、平台、数据、模型、应用“五位一体”智算服务体系,构建全流程的AI基础设施能力
AI Store
  • 算力市场
  • 模型市场
  • 应用市场
公共算力服务
  • 裸金属
  • 定制裸金属
训推服务
  • 模型开发
  • 训练任务
  • 服务部署
Token服务
  • 模型广场
  • 体验中心
  • 服务接入
应用托管
  • 应用实例
科研助手
  • 科研智能体
  • 科研服务
  • 开发机
  • 并行计算
大模型
  • DeepSeek-V4-Flash
  • GLM-5.1
  • Qwen3.5-122B-A10B
  • DeepSeek-V3.2(旗舰版)
  • GLM-5(正式版)
智算一体机
  • 智算一体机
智能体引擎
  • 智能体引擎
智算安全专区
  • 大模型安全评测
  • 大模型安全护栏
模型适配专家服务
  • 模型适配专家服务
算力服务商
  • 入驻算力服务商

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场进入AI Store创新解决方案公有云生态专区智云上海应用生态专区
建站工具
  • 新域名服务
  • SSL证书
  • 翼建站
企业办公
  • 安全邮箱
  • WPS 365 天翼云版
  • 天翼企业云盘(标准服务版)
灾备迁移
  • 云管家2.0
  • 翼备份(SaaS版)

定价

协助您快速了解云产品计费模式、价格详情,轻松预估上云成本
价格计算器
  • 动态测算产品价格
定价策略
  • 快速了解计费模式

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼信创云专区
  • 信创云专区
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
培训与认证
  • 天翼云学堂
  • 天翼云认证
开源社区
  • 魔乐社区
  • OpenTeleDB

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 服务保障
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家
我要反馈
  • 建议与反馈
  • 用户体验官
信息公告
  • 客户公告

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2026智能云生态大会
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 息壤智算
  • 产品
  • 解决方案
  • 应用商城
  • 定价
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      微消息队列MQTT版_相关内容
      • 【通知】通用型主机规格调整为白名单特性,上线计算增强型主机规格
        为保障更优的性能,分布式消息服务MQTT上线并主推计算增强型主机类型,原通用型规格调整为白名单特性,更多产品类型信息请参阅产品规格。 调整时间 2024年11月9日 影响范围 所有区域 调整影响 新用户默认不开放主机类型通用型规格订购开通,如需要该特性,请联系技术支持开通后使用。 已购买主机类型通用型规格实例的用户,原实例仍可正常使用,续费等操作不受影响。
        来自:
        帮助文档
        分布式消息服务MQTT
        服务公告
        2024年
        【通知】通用型主机规格调整为白名单特性,上线计算增强型主机规格
      • 基于事件流实现RabbitMQ消息路由
        步骤二:测试验证 1. 登录分布式消息服务RabbitMQ控制台。 2. 在左侧导航栏,单击实例列表,选择事件流的源实例。 3. 在队列管理页面,选择源对应队列,点击生产拨测。 4. 在对话框输入想要发送的消息,然后点击发送。 5. 发送消息后返回实例列表,选择事件流的目标实例,进入管理。 6. 在队列管理页面,选择目标对应队列,点击消费拨测,查询目标队列对应的消息。 7. 查看查询到消息体与消息属性是否与预期一致,如图2所示。 图2 在分布式消息服务RabbitMQ管理控制台中查看消息详情
        来自:
        帮助文档
        事件总线
        最佳实践
        基于事件流实现消息路由
        基于事件流实现RabbitMQ消息路由
      • 管理死信队列
        导出死信消息 1. 登录分布式消息服务RocketMQ控制台。 2. 单击RocketMQ实例的名称,进入实例详情页面。 3. 在左侧导航栏,单击“死信队列”,进入“死信队列”页面。 4. 在待导出的死信消息所在行,单击“导出消息”。 导出JSON格式的文件。 说明:如果需要批量导出死信消息,勾选待导出的多条死信消息,单击“批量导出消息”。 导出的消息字段说明如表1所示。 表1 消息字段说明 消息字段 字段说明 msgid 消息ID。 instanceid 实例ID。 topic Topic名称。 storetimestamp 存储消息的时间。 borntimestamp 产生消息的时间。 reconsumetimes 重试次数。 body 消息体。 bodycrc 消息体校验和。 storesize 存储大小。 propertylist 消息属性列表。lname:属性名称。lvalue:属性值。 bornhost 产生消息的主机IP。 storehost 存储消息的主机IP。 queueid 队列ID。 queueoffset 在队列中的偏移量。 重新投递死信消息 死信消息由于某些原因无法正常被消费者消费,请排查相关原因并解决,然后在控制台重新投递死信消息给消费者消费。 1. 登录分布式消息服务RocketMQ控制台。 2. 单击RocketMQ实例的名称,进入实例详情页面。 3. 在左侧导航栏,单击“死信队列”,进入“死信队列”页面。 4. 选择以下任意一种方法重新投递死信消息。 在待重新投递的死信消息所在行,单击“重投”。 如需批量重新投递死信消息,勾选待重新投递的死信消息,单击“批量重投”。 须知: 死信消息重新投递成功后,此死信消息依然存在死信队列中,不会被删除。避免多次重复投递,造成重复消费。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        用户指南
        管理死信队列
      • 死信和TTL
        介绍分布式消息服务RabbitMQ死信和TTL功能。 死信 称为死信的信息,需要如下几个条件: 消息被消费者拒绝(通过basic.reject 或者 back.nack),并且设置 requeuefalse。 消息过期,队列设置了TTL(Time To Live)时间并且消息过期。 超过了队列的长度限制消息被丢弃。 为队列配置死信交换机,并在申明队列时指定“xdeadletterexchange”和“xdeadletterroutingkey”参数。队列根据“xdeadletterexchange”将死信消息发送到死信交换机中,并根据“xdeadletterroutingkey”为死信消息设置死信路由Key。 以下示例演示在Java客户端配置死信交换机和路由 channel.exchangeDeclare("some.exchange.name", "direct"); Map args new HashMap (); args.put("xdeadletterexchange", "some.exchange.name"); args.put("xdeadletterroutingkey", "someroutingkey"); channel.queueDeclare("myqueue", false, false, false, args); TTL RabbitMQ可以对消息和队列设置TTL. 目前有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message, 消费者将无法再收到该消息。 设置队列TTL 通过队列属性设置消息TTL的方法是在queue.declare方法中加入xmessagettl参数,单位为ms. 以下示例演示在Java客户端设置队列TTL。 Map argss new HashMap (); argss.put("xmessagettl",6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss);
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        高级特性
        死信和TTL
      • 连接实例
        介绍分布式消息服务RabbitMQ连接实例操作内容。 场景描述 连接RabbitMQ实例的场景包括: 1. 消息队列通信:连接RabbitMQ实例可以用于构建分布式系统中的消息队列通信。不同的应用程序或服务可以通过RabbitMQ实例发送和接收消息,实现解耦和异步通信。 2. 任务队列:连接RabbitMQ实例可以用于构建任务队列,将任务提交到RabbitMQ中,然后由消费者从队列中获取任务并进行处理。这样可以实现任务的分发和负载均衡,提高系统的处理能力和可伸缩性。 3. 发布/订阅模式:连接RabbitMQ实例可以用于实现发布/订阅模式,其中发布者将消息发布到交换器,然后订阅者可以从交换器中订阅感兴趣的消息。这样可以实现消息的广播和多个消费者的并行处理。 4. 日志收集:连接RabbitMQ实例可以用于实现日志收集系统,应用程序可以将日志消息发送到RabbitMQ中,然后由日志消费者从队列中获取日志消息并进行处理和存储。 5. 系统集成:连接RabbitMQ实例可以用于实现不同系统之间的集成,通过将消息发送到RabbitMQ中,其他系统可以从队列中获取消息并进行处理,实现系统之间的数据交换和通信。 总之,连接RabbitMQ实例可以应用于各种场景,包括消息队列通信、任务队列、发布/订阅模式、日志收集和系统集成,提供了一种可靠和灵活的消息传递机制。 操作步骤 RabbitMQ是一个开源的消息队列中间件,支持生产者和消费者之间的异步通信。在上述资源准备完成后,接下来需要编译工程生产消费,主要分以下几个步骤: 1、编写生产者代码:使用编程语言编写一个生产者程序。该程序将连接到RabbitMQ服务器,并将消息发送到队列中。 2、编写消费者代码:同样使用编程语言编写一个消费者程序。该程序将连接到RabbitMQ服务器,并从队列中接收消息。 3、运行生产者和消费者:运行生产者程序,它将发送消息到队列中。然后运行消费者程序,它将从队列中接收并处理消息。 4、验证结果:检查生产者和消费者程序的输出,确保消息被正确发送和接收。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        实例管理
        连接实例
      • 环境准备
        本文为您分布式消息服务MQTT的环境准备内容。 在创建天翼云分布式消息服务MQTT实例之前,您需要做一些准备工作。 首先,您需要设置一个虚拟私有云(VPC),这是一个隔离的网络环境,用于托管MQTT实例。 接下来,您需要创建一个子网,它是VPC内部的一个子网络,用于划分不同的部分和区域。 最后,您需要配置一个安全组,用于控制入站和出站的流量规则,以保证MQTT实例的安全性。 每个分布式消息服务MQTT实例都会被部署在特定的VPC中,并与特定的子网和安全组相关联。这种方式可以让您自主配置和管理MQTT实例的网络环境,并提供安全保护策略。如果您已经有了现成的VPC、子网和安全组,可以重复使用它们,无需重复创建。这样可以节省时间和资源,并确保一致性和可靠性。 VPC和子网 VPC和子网可重复使用,您也可以使用不同的VPC和子网来配置MQTT实例,您可根据实际需要进行配置。在创建VPC和子网时应注意如下要求: VPC与使用的天翼云分布式消息服务MQTT服务应在相同的区域。 如无特殊需求,创建VPC和子网的配置参数使用默认配置即可。 创建VPC和子网的操作请参考虚拟私有云创建VPC、子网搭建私有网络。 若需要在已有VPC上创建和使用新的子网,请参考虚拟私有云子网管理创建子网。
        来自:
        帮助文档
        分布式消息服务MQTT
        快速入门
        环境准备
      • 产品定义
        开源对比 相较于开源自建RocketMQ,分布式消息服务RocketMQ在自动化部署、运维监控、增强能力、延迟消息/定时消息、ACL访问控制等方面更具优势。更多信息请参见开源对比。 支持的消息类型 分布式消息服务RocketMQ支持的消息类型包括普通消息、顺序消息、事务消息与延时消息。 普通消息:RocketMQ中无特性的消息,普通消息主要包含同步消息和异步消息两种。 顺序消息:指消费消息的顺序要同发送消息的顺序一致,在RocketMQ中,主要有两种有序消息:全局有序消息和局部有序消息(又叫普通有序消息、分区有序消息)。 事务消息:提供类似X/Open XA的分布式事务功能来确保业务发送方和MQ消息的最终一致性,其本质是通过半消息(prepare消息和commit消息)的方式把分布式事务放在MQ端来处理。 延时消息:生产者将消息发送到消息队列RocketMQ服务端,设计消费时延,在预设的时间后才可以被消费者消费。 更多信息请参见功能特性。 功能特性 分布式消息服务RocketMQ的功能特性主要体现在以下几个方面: 访问接口 支持通过API调用,创建队列、查询消息监控指标、查询消息内容等。 队列能力 支持多种消息类型,包括普通队列(高并发场景)、FIFO有序队列(顺序消息场景)、严格有序队列。 消息能力 支持消息过滤、消息复用、消息重试、消息回溯、消息数据主动删除以及消息广播等能力。 安全防护 提供云审计进行租户管理操作的记录。 运维监控 提供主题、订阅组、生产者、消费者、队列的管理;同时支持集群、主题、队列多维度指标监控。 更多信息请参见功能特性。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        产品简介
        产品定义
      • 【下线公告】关于天翼云二类节点分布式消息服务RabbitMQ下线部分插件的公告
        天翼云计划于2025年12月10日 00:00(北京时间)下线二类节点分布式消息服务RabbitMQ的部分插件,具体下线插件如下: rabbitmqamqp10 rabbitmqmqtt rabbitmqsharding rabbitmqstomp rabbitmqtracing rabbitmqwebmqtt rabbitmqwebstomp 下线范围 下线范围为天翼云二类节点:北京2、成都3、福州、广州4、贵州、杭州、昆明、南昌、内蒙3、青岛、上海4、深圳、石家庄、苏州、太原、芜湖、武汉2、西安2、华北、长沙2、兰州、重庆、郑州、乌鲁木齐。 下线原因 开启上述插件后,容易引起业务过载风险、影响业务系统稳定。 rabbitmqwebstomp插件通过临时队列提供WebSocket能力,同时其连接和释放依赖于心跳机制,过多的临时队列以及过多连接数会导致内存高水位,从而导致节点状态异常。 下线影响 未开启待下线插件的实例,插件列表中将不再显示这些插件。 已开启待下线插件的实例,仍然可以使用。如果您在控制台关闭这些插件后,也将不再显示。 如有任何问题可通过天翼云官网(www.ctyun.cn)提交工单或服务热线(4008109889)与我们联系。 消减措施 已开启插件的RabbitMQ实例,建议参考设置RabbitMQ告警规则,配置相应的告警,提前预知并消减风险。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        服务公告
        【下线公告】关于天翼云二类节点分布式消息服务RabbitMQ下线部分插件的公告
      • 续费、到期与欠费
        本文为您介绍分布式消息服务MQTT续费、到期与欠费内容。 到期前续费 手动续订:对于包年式消息服务MQTT,用户在资源到期前进行续费操作,可以延长原有资源到期时间,避免资源到期后冻结或超过保留期后被系统回收。详细操作请参考费用中心续订管理手动续订。 自动续订:自动续订仅针对采用包月、包年计费模式的资源,详细操作请参考费用中心续订管理自动续订。 到期处理 到期后,分布式消息服务MQTT进入保留期,您将不能正常访问及使用天翼云分布式消息服务MQTT,但对于您存储在分布式消息服务MQTT中的数据予以保留。 若您在到期后15天内续费,自资源续订解冻开始,计算新的服务有效期,按照新的服务有效期计算费用; 若到期15天后您仍未续费,存储在分布式消息服务MQTT中的数据将被删除。 欠费原因 在按需计费的模式下帐号的余额不足。 按需欠费资源冻结规则 欠费后,资源进入保留期,您将不能正常访问及使用分布式消息服务MQTT,已开通的实例资源将予以保留。 若您在保留期内充值,充值后系统会自动扣减欠费金额。 若保留期到期您仍未充值,MQTT实例资源将被释放。
        来自:
        帮助文档
        分布式消息服务MQTT
        计费说明
        续费、到期与欠费
      • 操作类
        本节介绍了分布式消息服务RabbitMQ产品常见操作类问题。 无法被路由的消息,去了哪里? 如果没有任何设置,无法路由的消息会被直接丢弃。 无法路由的情况:Routing key不正确。 解决方案: 1.使用mandatorytrue配合ReturnListener,实现消息回发。 2.声明交换机时,指定备份交换机。 多个消费者监听一个队列时,消息如何分发? 1.RoundRobin(轮询) 默认的策略,消费者轮流、平均地收到消息。 2.Fair dispatch(公平分发) 如果要实现根据消费者的处理能力来分发消息,给空闲地消费者发送更多消息,可以用basicQos(int prefetchcount)来设置。prefetchcount含义:当消费者有多条消息没有响应ACK时,不再给这个消费者发送消息。 消息在什么时候会变成Dead Letter(死信)? 1.消息被拒绝并且没有设置重新入队:(NACK Reject ) && requeue false 2.消息过期(消息或者队列的TTL设置) 3.消息堆积,并且队列达到最大长度,先入队的消息编程DL。 解决方案:可以在声明队列时,指定一个Dead Letter Exchange,来实现Dead Letter的转发,保证消息不会丢失。 如何进行消息持久化? 所谓持久化,就是RabbitMQ将内存中的数据(比如交换机、队列、消息等)固化到磁盘,以防止异常情况的发生时造成数据丢失。 持久化分类 说明 交换机持久化 在创建Exchange时设置durable参数参数。channel.exchangeDeclare(EXCHANGENAME, "direct", true); 队列持久化 同样也是设置设置durable参数。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。channel.queueDeclare(QUEUENAME, true, false, false, null); 消息持久化 即使交换机、队列持久化不会因为重启丢失,但是存储在队列中的消息仍然会丢失。解决的办法就是设置消息的投递模式为2,即代表持久化(JAVA)。理论上,可以将所有的消息都设置为持久化,但是这会严重影响RabbitMQ性能,因为写入到磁盘的速度可比写入到内存的速度慢非常多。因此,在选择是否要持久化消息时,需要在可靠性和吞吐量之间做一个权衡。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        常见问题
        操作类
      • 分布式消息产品选型
        特性 Kafka RabbitMQ RocketMQ 功能 支持功能较少,不支持延迟发送,消息重试等功能 功能丰富,支持多个队列种类(优先级队列、延迟队列、死信队列镜像队列等),提供丰富的策略分配 功能完善,支持事务消息、定时消息、事务消息等 单机吞吐量 十万级 万级 几万级 稳定性 队列/分区多时性能不稳定 消息堆积时,性能不稳定 队列较多、消息堆积时性能保持稳定 可用性 非常高(分布式)具有主备故障自动切换 较高,基于主从架构实现高可用性 非常高(分布式)具有主备故障自动切换 选型建议 性能要求高,数据量大,适合产生大量数据的互联网服务的数据收集业务,如日志采集处理、需对接大数据应用等,kafka是首选 数据量少,吞吐量需求不大;数据可靠性要求较高,对功能丰富性要求极高 可靠性要求很高且性能要求较高的场景以及业务削峰场景,如电商、订单处理等
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        产品简介
        分布式消息产品选型
      • 生产消费
        消费消息 消费者需要创建一个连接到RabbitMQ服务器,然后创建一个通道(Channel)来进行消息的订阅。在订阅消息之前,消费者通常需要先声明一个队列,以确保能够正确地接收和处理消息。 一旦连接和通道建立完成,消费者可以使用basicConsume()方法来订阅指定的队列,并注册一个回调函数来处理接收到的消息。当有消息到达队列时,RabbitMQ会将消息推送给消费者,消费者的回调函数将被调用,从而可以对消息进行处理。 消费者可以根据自己的需求设置消息的确认机制。在默认情况下,消费者在接收到消息后,会自动向RabbitMQ发送一个确认(ack)消息,表示已成功接收并处理该消息。如果消费者在处理消息时发生错误,可以选择不发送确认消息,从而使消息重新进入队列,以便其他消费者重新处理。 通过使用RabbitMQ,消费者可以实现解耦,即它们可以独立地进行开发和部署。消费者可以根据自己的处理能力和负载来接收和处理消息,从而实现负载均衡和水平扩展。 代码示例: import com.rabbitmq.client.; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class RabbitmqConsumer { //队列名称 private final static String QUEUENAME "Hello,RabbitMQ"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory new ConnectionFactory(); //设置主机ip factory.setHost("YOUR HOST IP"); //设置amqp的端口号 factory.setPort(YOUR PORT); //设置用户名密码 factory.setUsername("YOUR USER NAME"); factory.setPassword("YOUR USER PASSWORD"); //设置Vhost,需要在控制台先创建 factory.setVirtualHost("YOUR VHOST"); //基于网络环境合理设置超时时间 factory.setConnectionTimeout(30 1000); factory.setHandshakeTimeout(30 1000); factory.setShutdownTimeout(0); Connection connection factory.newConnection(); Channel channel connection.createChannel(); //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。 channel.queueDeclare(QUEUENAME, false, false, false, null); System.out.println(" [] Waiting for messages. To exit press CTRL+C"); Consumer consumer new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message new String(body, StandardCharsets.UTF8); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUENAME, true, consumer); } } 完成上述步骤后,可以在控制台查看消费者是否启动成功。 完成以上所有步骤后,就成功接入了RabbitMQ服务,可以用消息队列进行消息发送和订阅了。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        快速入门
        生产消费
      • 与其他服务关系
        本文为您介绍分布式消息服务MQTT与其他服务关系。 虚拟私有云(CTVPC ,Virtual Private Cloud) 虚拟私有云为分布式消息服务MQTT提供一个逻辑隔离的区域,构建一个安全可靠、可配置和管理的虚拟网络环境。更多信息请参见虚拟私有云。 弹性云主机(CTECS,Elastic Cloud Server) 弹性云主机由 CPU、内存、镜像、云硬盘组成,同时结合VPC、安全组、数据多副本保存等能力,打造一个既高效又可靠安全的计算环境,确保分布式消息服务MQTT持久稳定运行。更多信息请参见弹性云主机。 云硬盘(CTEVS,Elastic Volume Service) 云硬盘是一种可弹性扩展的块存储设备,可以为分布式消息服务MQTT提供高性能、高可靠的块存储服务。更多信息请参见云硬盘。 弹性IP(Elastic IP,EIP) 弹性IP是可以独立申请的公网 IP 地址,包括公网IP地址与公网出口带宽服务。可以与分布式消息服务MQTT动态绑定和解绑,实现云资源的互联网访问。针对需要公网访问分布式消息服务MQTT实例的需求,用户可开通弹性IP后,在MQTT实例页面进行绑定。更多信息请参见弹性IP。
        来自:
        帮助文档
        分布式消息服务MQTT
        产品介绍
        与其他服务关系
      • 分布式消息产品选型
        特性 Kafka RabbitMQ RocketMQ 功能 支持功能较少,不支持延迟发送,消息重试等功能 功能丰富,支持多个队列种类(优先级队列、延迟队列、死信队列镜像队列等),提供丰富的策略分配 功能完善,支持事务消息、定时消息、事务消息等 单机吞吐量 十万级 万级 几万级 稳定性 队列/分区多时性能不稳定 消息堆积时,性能不稳定 队列较多、消息堆积时性能保持稳定 可用性 非常高(分布式)具有主备故障自动切换 较高,基于主从架构实现高可用性 非常高(分布式)具有主备故障自动切换 选型建议 性能要求高,数据量大,适合产生大量数据的互联网服务的数据收集业务,如日志采集处理、需对接大数据应用等,kafka是首选。 数据量少,吞吐量需求不大;数据可靠性要求较高,对功能丰富性极高 可靠性要求很高且性能要求较高的场景以及业务削峰场景,如电商、订单处理等。
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        分布式消息产品选型
      • 续订和退订
        本文为您介绍分布式消息服务MQTT续订和退订内容。 场景描述 分布式消息服务MQTT为用户提供全面周到的服务,支持用户续订和退订的需求。 续订:针对包周期消息实例服务,用户可在到期前进行服务周期延长操作,即续订操作。 退订:用户如不需要继续使用该分布式消息服务MQTT实例,可进行删除实例操作,即退订操作。 续订 1、登录MQTT消息控制台,可以看到当前租户下面的实例列表。 2、点击需要变更实例栏 > 更多 > 续订。 3、 进入到续订页面,在弹出来的确认窗口选择续订时长,点击确认即可。 退订 1、登录MQTT消息控制台,可以看到当前租户下面的实例列表。 2、点击需要变更实例栏 > 更多 > 退订。 3、 进入到退订页面,在弹出来的确认窗口点击确认即可。 注意 退订的实例处于冻结状态,请务必在实例退订前停止全部的应用。 在申请退订前,请做好数据备份工作,退订后数据将保留15个自然日,15天后相关数据将不予保留,且不会进行备份,务必谨慎操作。
        来自:
        帮助文档
        分布式消息服务MQTT
        用户指南
        实例管理
        续订和退订
      • 典型应用场景
        本文主要介绍分布式消息服务RabbitMQ的典型应用场景。 RabbitMQ作为一款热门的消息队列中间件,具备高效可靠的消息异步传递机制,主要用于不同系统间的数据交流和传递,在企业解决方案、金融支付、电信、电子商务、社交、即时通信、视频、物联网、车联网等众多领域都有广泛应用。 异步通信 将业务中属于非核心或不重要的流程部分,使用消息异步通知的方式发给目标系统,这样主业务流程无需同步等待其他系统的处理结果,从而达到系统快速响应的目的。 如网站的用户注册场景,在用户注册成功后,还需要发送注册邮件与注册短信,这两个流程使用RabbitMQ消息服务通知邮件发送系统与短信发送系统,从而提升注册流程的响应速度。 图1 串行发送注册邮件与短信流程 图2 借助消息队列异步发送注册邮件与短信流程 错峰流控与流量削峰 在电子商务系统或大型网站中,上下游系统处理能力存在差异,处理能力高的上游系统的突发流量可能会对处理能力低的某些下游系统造成冲击,需要提高系统的可用性的同时降低系统实现的复杂性。电商大促销等流量洪流突然来袭时,可以通过队列服务堆积缓存订单等信息,在下游系统有能力处理消息的时候再处理,避免下游订阅系统因突发流量崩溃。消息队列提供亿级消息堆积能力,3天的默认保留时长,消息消费系统可以错峰进行消息处理。 另外,在商品秒杀、抢购等流量短时间内暴增场景中,为了防止后端应用被压垮,可在前后端系统间使用RabbitMQ消息队列传递请求。 图3 消息队列应对秒杀大流量场景
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        产品简介
        典型应用场景
      • 基于消息队列RocketMQ实现全链路灰度
        本章节介绍如何基于消息队列RocketMQ实现全链路灰度 概述 本文介绍在使用消息队列(RocketMQ)这种异步场景下,可以在不修改业务代码的情况下,实现异步场景的灰度,从而实现全链路灰度。本文介绍基于消息队列RocketMQ实现全链路灰度。 背景介绍 在大多数业务场景中对于消息的灰度并没有RPC调用那么严格,但是当全链路灰度调用中涉及到消息消费时,如果消息消费没有按照全链路流量规则路由,则会导致通过消息产生的流量逃逸,从而破坏全链路规则,导致出现一些不符合预期的情况。 如下图所示,本文分别部署网关、appa、appagray、appb、appbgray、appc、appcgray以及RocketMQ,模拟一个真实的全链路灰度场景。 通过网关调用appa应用的接口,当满足路由规则后,灰度流量会被路由到appagray,appagray又会调用appbgray,随后由appbgray发送灰度消息,appcgray将会收到灰度消息,而appc不会收到灰度消息。 前提条件 1. 用户已开通微服务治理中心企业版。 2. 用户已开通云容器引擎。 3. 用户已部署RocketMQ,且RocketMQ版本在4.5.0以上,broker.conf中已配置enablePropertyFiltertrue。 部署Demo应用 准备自建入口网关msgczuul,准备应用msgcappa,msgcappb和msgcappc。调用过程是msgcappa –> msgcappb > msgcappc。 步骤1:在云容器引擎中安装微服务治理插件: 1. 登录“云容器引擎”控制台。 2. 在左侧菜单栏选择“集群”,点击目标集群。 3. 在集群管理页面点击“插件”“插件市场”,选择“cubems”插件安装。 步骤2:为应用开启微服务治理能力: 1. 登录“云容器引擎”控制台。 2. 左侧菜单栏选择“集群”,点击目标集群。 3. 在集群管理页面点击“工作负载”“无状态”,选择目标命名空间。 4. 在Deployment列表页选择指定Deployment,并点击“全量替换”,进入Deployment编辑页。 5. 在Deployment编辑页点击“显示高级设置”,新增“Pod标签”: mseCubeMsAutoEnable:on。 6. 在发布应用时,配置指定环境变量,可指定注入微服务治理中心的应用名、命名空间和标签等信息。 环境变量配置如下: 环境变量名 环境变量值 MSEAPPNAME 接入到微服务治理中心的应用名。 MSESERVICETAG 应用标签信息,如灰度应用可配置gray。 MSENAMESPACE(选填) 接入到微服务治理中心的命名空间,默认为:default。 7. 完成编辑后点击“提交”,重新发布容器即可接入。 appa应用的配置 基线: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "appa" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "appa" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "appa" spec: containers: env: name: "MSEAPPNAME" value: "appa" image: "镜像仓库域名/xxx/appa:latest" imagePullPolicy: "Always" name: "appa" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi" 灰度: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "appa" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "appa" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "appa" spec: containers: env: name: "MSEAPPNAME" value: "appa" name: "MSESERVICETAG" value: "gray" image: "镜像仓库域名/xxx/appa:latest" imagePullPolicy: "Always" name: "appa" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi" appb应用的配置 基线: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "appb" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "appb" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "appb" spec: containers: env: name: "MSEAPPNAME" value: "appb" image: "镜像仓库域名/xxx/appb:latest" imagePullPolicy: "Always" name: "appb" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi" 灰度: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "appb" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "appb" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "appb" spec: containers: env: name: "MSEAPPNAME" value: "appb" name: "MSESERVICETAG" value: "gray" image: "镜像仓库域名/xxx/appb:latest" imagePullPolicy: "Always" name: "appb" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi" appc应用的配置 基线: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "appc" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "appc" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "appc" spec: containers: env: name: "MSEAPPNAME" value: "appc" image: "镜像仓库域名/xxx/appc:latest" imagePullPolicy: "Always" name: "appc" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi" 灰度: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "appc" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "appc" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "appc" spec: containers: env: name: "MSEAPPNAME" value: "appc" name: "MSESERVICETAG" value: "gray" image: "镜像仓库域名/xxx/appc:latest" imagePullPolicy: "Always" name: "appc" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi" zuul应用的配置: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "zuul" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "zuul" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "zuul" spec: containers: env: name: "MSEAPPNAME" value: "zuul" image: "镜像仓库域名/xxx/zuul:latest" imagePullPolicy: "Always" name: "zuul" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi"
        来自:
        帮助文档
        微服务引擎
        最佳实践
        基于消息队列RocketMQ实现全链路灰度
      • RabbitMQ队列迁移
        本文主要介绍分布式消息服务RabbitMQ的队列迁移最佳实践。 在RabbitMQ集群上,队列在各个节点分布不均衡会导致部分节点压力过大,无法更有效的利用集群。这可能是扩容节点、删除队列等原因导致的。 设置队列负载均衡的方法如下: 删除队列重建 通过Policy修改master节点方式 删除队列重建 1. 登录RabbitMQ WebUI页面。 2. 在“Overview”页签中,单击“Download broker definitions”,导出元数据。 3. 停止生产,等待数据消费完,然后删除原有队列。 在“Overview”页签中,确认数据是否已消费完。 可消费消息数(Ready)和未确认的消息数(Unacked)都为0时,说明消费完成。 等数据消费完后,删除原有队列。 在“Queues”页签,单击需要删除的队列名称,进入队列详情页面。 单机“Delete Queue”,删除队列。 4. 在“Overview”页签中,上传2中导出的元数据。 在“Overview”页签中,单击“选择文件”,选择2中导出的元数据。 单击“Upload broker definitions”,上传元数据。 上传成功后,显示如下信息。 实例会自动将队列均衡创建在各个节点上,在“Queues”页签中查看队列分布详情。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        最佳实践
        RabbitMQ队列迁移
      • 消息持久化
        介绍分布式消息服务RabbitMQ消息持久化功能。 使用场景 在生产过程中,难免会发生服务器宕机的事情,RabbitMQ也不例外,可能由于某种特殊情况下的异常而导致RabbitMQ宕机从而重启,那么这个时候对于消息队列里的数据,包括交换机、队列以及队列中存在消息恢复就显得尤为重要了。RabbitMQ本身带有持久化机制,包括交换机、队列以及消息的持久化。持久化的主要机制就是将信息写入磁盘,当RabbtiMQ服务宕机重启后,从磁盘中读取存入的持久化信息,恢复数据。 设置交换器持久化 (1)登录管理控制台。 (2)进入RabbitMQ管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“交换器管理”后,点击“新建”按钮。 (5)点击“新建”后出现以下窗口,是否持久化选择是。 设置队列持久化 (1)登录管理控制台。 (2)进入RabbitMQ管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“队列管理”后,点击“新建”按钮。 (5)点击“新建”后出现以下窗口,是否持久化选择是。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        高级特性
        消息持久化
      • 路由到分布式消息服务RabbitMQ
        结果验证 1. 使用事件源触发一个事件事件。 2. 您可以在分布式消息RabbitMQ 控制台确认是否接收到事件。 1. 登录分布式消息服务RabbitMQ控制台,然后在左侧导航栏选择实例列表。 2. 在实例列表页面单击目标实例名称。 3. 在选择队列管理,进入目标队列详情,进入消息消费拨测页面。 4. 在消息查询页面,点击消费即可查看消息内容, 如图2所示。 图2 在分布式消息服务RabbitMQ管理控制台中查看消息详情
        来自:
        帮助文档
        事件总线
        用户指南
        事件流
        事件目标
        路由到分布式消息服务RabbitMQ
      • 分布式消息服务RabbitMQ事件目标
        结果验证 1. 使用事件总线发布消息功能,发送一个自定义事件。 2. 您可以在分布式消息RabbitMQ 控制台确认是否接收到事件,如图2所示。 1. 登录分布式消息服务RabbitMQ控制台,然后在左侧导航栏选择实例列表。 2. 在实例列表页面单击目标实例名称。 3. 在选择队列管理,进入目标队列详情,进入消息消费拨测页面。 4. 在消息查询页面,点击消费即可查看消息内容。 图2 在分布式消息服务RabbitMQ管理控制台中查看消息详情
        来自:
        帮助文档
        事件总线
        用户指南
        事件总线
        事件规则
        事件目标
        目标服务类型
        分布式消息服务RabbitMQ事件目标
      • 查看监控数据
        本章节主要介绍如何查看分布式消息服务RabbitMQ的监控数据。 操作场景 云监控对RabbitMQ实例的运行状态进行日常监控,可以通过控制台直观的查看RabbitMQ实例各项监控指标。 前提条件 已创建RabbitMQ专享版实例,且实例中有可消费的消息。 操作步骤 1. 登录管理控制台。 2. 在管理控制台右上角单击,选择区域。 说明 此处请选择与您的应用服务相同的区域。 3. 在管理控制台左上角单击,选择“企业中间件”>“分布式消息服务”>“RabbitMQ专享版”,进入分布式消息服务RabbitMQ专享版页面。 4. 通过以下任意一种方法,查看监控数据。 在RabbitMQ实例名称后,单击“查看监控数据”。跳转到云监控页面,查看实例、节点和队列的监控数据,数据更新周期为1分钟。 单击RabbitMQ实例名称,进入实例详情页。在左侧导航栏单击“监控”,进入监控页面,查看实例、节点和队列的监控数据,数据更新周期为1分钟。 在“队列”的监控页面中,如果队列在默认Vhost下,会直接显示队列名。如果队列不在默认Vhost下,队列名称显示为:队列所在的Vhost名称队列名。例如:队列test01在Vhost13142708中,此时监控中显示的队列名为“Vhost13142708test01”。 图1 队列的监控页面
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        监控
        查看监控数据
      • 产品优势
        灵活及时 队列处理能力支持按需自动扩展,及时且方便地完成系统扩展,消息投递时间可至毫秒级,从而保证消息及时性。分布式消息服务RocketMQ具有很高的灵活性,可以满足各种不同的业务需求。主要体现: 支持多种消息模型:RocketMQ支持多种消息模型,包括消息队列模型和发布/订阅模型。在消息队列模型中,消息发送方将消息发送到一个队列,消息接收方从队列中读取消息。这种模型适用于顺序消息和事务性消息等场景。而在发布/订阅模型中,消息发送方将消息发布到一个主题,所有订阅该主题的消费者都会收到消息。这种模型适用于实时通知、数据分发等场景。 支持灵活的消息过滤机制:RocketMQ可以通过对消息的属性进行过滤,只有满足条件的消息才会被消费者接收。这样可以实现消息的动态路由和选择性消费,提升系统的灵活性和效率。 支持消息延迟发送和定时消费:RocketMQ可以设置消息的延迟时间,使消息在指定的时间后才能被消费者接收。这对于实现定时任务和延迟处理非常有用。 良好的可扩展性:RocketMQ采用了分布式架构,并且支持主从复制和消息分区机制。可以根据业务需求,动态扩展消息生产者、消息消费者和消息存储节点的数量,以满足大规模消息处理和高并发访问的要求。 高可靠 集群节点采用主备模式,具有主备故障自动切换功能;并且提供对消息的持久化能力,多副本冗余;提供消息数据自动删除功能。分布式消息服务RocketMQ具有高可靠性的特点,以下是RocketMQ实现高可靠性的关键特性: 主从复制:RocketMQ采用了主从复制的架构,在生产者发送消息时,消息会首先写入主节点,并异步复制到多个从节点。这样即使主节点发生故障,从节点也能够接管并继续提供服务。 可靠消息存储:RocketMQ使用Write Ahead Log (WAL)技术来保证消息的可靠存储。在消息写入之前,会先将消息写入磁盘的顺序文件中,然后再写入内存。当RocketMQ重启时,可以通过检查磁盘上的文件来恢复之前未被消费的消息。 消息可重复存储:RocketMQ使用消息的唯一ID来确保消息的幂等性。如果一条消息因为网络问题或其他原因发送失败,RocketMQ可以根据消息的ID判断是否已经成功发送过,并避免消息的重复发送。 容灾备份:RocketMQ支持Broker集群模式和多数据中心的部署方式,可以将消息数据进行容灾备份。当某个Broker节点发生故障时,其他节点可以继续提供服务,确保系统的可用性。 高可用性设计:RocketMQ采用了多个Broker节点组成的集群,并通过主从复制和故障切换来实现高可用性。当某个Broker节点发生故障时,其他节点会自动接管其工作,保证消息的正常处理。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        产品简介
        产品优势
      • RocketMQ .NET SDK
        说明 分布式消息服务RocketMQ兼容了社区版 HTTP SDK,您可以使用社区版 HTTP SDK接入分布式消息服务RocketMQ。 前提条件 1. 下载社区 C SDK到本地并解压。 2. 使用Visual Studio打开sln文件导入工程。 发送普通消息 using System; using Aliyun.MQ; using Aliyun.MQ.Model; namespace MQ.Sample { public class Producer { // 填写分布式消息服务RocketMQ控制台HTTP接入点 private const string endpoint "${HTTPENDPOINT}"; // 填写AccessKey,在管理控制台创建 private const string accessKeyId "${ACCESSKEY}"; // 填写SecretKey 在管理控制台创建 private const string secretAccessKey "${SECRETKEY}"; // 消息所属的Topic,在消息队列RocketMQ版控制台创建。 private const string topicName "${TOPIC}"; // Topic所属实例ID,默认实例为空 private const string instanceId "${INSTANCEID}"; private static MQClient client new Aliyun.MQ.MQClient(accessKeyId, secretAccessKey, endpoint); static MQProducer producer client.GetProducer(instanceId, topicName); static void Main(string[] args) { try { // 循环发送4条消息。 for (int i 0; i messages null; try { messages consumer.ConsumeMessage( 3, // 一次最多消费3条(最多可设置为16条) 3 // 长轮询时间3秒(最多可设置为30秒) ); } catch (Exception exp1) { if (exp1 is MessageNotExistException) { Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId); continue; } Console.WriteLine(exp1); Thread.Sleep(2000); } if (messages null) { continue; } List handlers new List<>(); Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:"); // 处理业务逻辑 foreach (Message message in messages) { Console.WriteLine(message); handlers.Add(message.ReceiptHandle); } // Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样 try { consumer.AckMessage(handlers); Console.WriteLine("Ack message success:"); foreach (string handle in handlers) { Console.Write("t" + handle); } Console.WriteLine(); } catch (Exception exp2) { // 某些消息的句柄可能超时了会导致确认不成功 if (exp2 is AckMessageException) { AckMessageException ackExp (AckMessageException)exp2; Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId); foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems) { Console.WriteLine("tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage); } } } } catch (Exception ex) { Console.WriteLine(ex); Thread.Sleep(2000); } } } }}
        来自:
        帮助文档
        分布式消息服务RocketMQ
        SDK参考
        RocketMQ .NET SDK
      • Python客户端连接配置
        安装依赖 plaintext pip3 install pahomqtt 代码示例 python import ssl import random from paho.mqtt import client as pahoclient 填入您在mqtt控制台创建的ACL账号密码。 USERNAME "yourusername" AUTHPASSWORD "yourpassword" 是否使用tls加密传输 isTls True class MQTTClient: def init(self): 填写mqtt云消息服务的接入点,去掉:{端口号}部分 self.broker "tcp://localhost" 指定连接客户端的id self.clientid "ctgmqttclienttest" self.client None def onconnect(self, client, userdata, flags, rc): 连接建立成功 if rc 0: print("Connected to MQTT Broker!") else: print(f"Failed to connect, return code {rc}") def ondisconnect(self, client, userdata, rc): 连接丢失 print(f"Disconnected with result code {rc}") 可以在这里添加重连逻辑 def onmessage(self, client, userdata, msg): 收到消息的回调 print(f"Received {msg.payload.decode()} from {msg.topic} topic") def onpublish(self, client, userdata, mid): 成功发送消息到服务端 print(f"Message {mid} published successfully") def connectmqtt(self): 创建客户端实例 self.client pahoclient.Client(clientidself.clientid) self.client.usernamepwset(USERNAME, AUTHPASSWORD) 设置回调函数 self.client.onconnect self.onconnect self.client.ondisconnect self.ondisconnect self.client.onmessage self.onmessage self.client.onpublish self.onpublish 设置TLS if isTls: 创建不验证证书的SSL上下文 context ssl.createdefaultcontext() context.checkhostname False context.verifymode ssl.CERTNONE self.client.tlssetcontext(context) 设置其他连接选项 self.client.connect(self.broker.replace("tcp://", "").replace("ssl://", ""), port8883 if isTls else 1883) self.client.loopstart()
        来自:
        帮助文档
        分布式消息服务MQTT
        最佳实践
        Python客户端连接配置
      • 查询消息
        查询消息轨迹 1. 登录分布式消息服务RocketMQ控制台。 2. 单击RocketMQ实例的名称,进入实例详情页面。 3. 在左侧导航栏,单击“消息查询”,进入“消息查询”页面。 4. 选择以下任意一种方法,查询消息。 按Topic查询:“Topic”选择待查询消息的Topic名称,“队列”选择待查询消息的队列,“存储时间”选择待查询消息的时间段,单击“查询”。 按Message ID查询:“Topic”选择待查询消息所在的Topic名称,“消息ID”输入待查询消息的Message ID,单击“查询”。 按Message Key查询:“Topic”选择待查询消息所在的Topic名称,“消息ID”输入待查询消息的Message Key,单击“查询”。 5. 在待查询消息所在行,单击“消息轨迹”,查看消息的轨迹,确定是否生产/消费成功。 消息轨迹的参数说明如表1所示。 表1 消息轨迹的参数说明 参数 参数说明 生产者状态 生产者状态如下: 1.发送成功:消息发送成功,服务端已经成功存储消息。 2.提交成功:允许消费者消费此事务消息。 3.回滚:事务消息将被丢弃,不允许消费者消费此事务消息。 4.未知,待确认:事务消息状态暂时无法确定,等待固定时间后,服务端向生产者进行消息回查。 生产耗时 生产者发送消息的耗时。 生产地址 生产者的IP地址和端口号。 消费者状态 消费者状态如下: 1.消费成功消费超时消费异常 2.消费返回NULL消费失败 消费时间 消费消息的时间。 消费耗时 消费者消费消息的耗时。 消费地址 消费者的IP地址和端口号。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        用户指南
        消息查询
        查询消息
      • 删除队列
        本章节指导您通过RabbitMQ WebUI页面或调用API方式删除队列。 本章节指导您通过RabbitMQ WebUI页面或调用API方式删除队列。 方法一:在WebUI页面删除单个队列:在WebUI页面的“Queues”页签中,删除单个队列。 方法二:通过Policy批量删除队列:新增与待删除队列的前缀名称相同、且队列过期时间(TTL)为1毫秒的策略,通过此策略实现批量删除队列。 方法三:调用API删除单个队列:在RabbitMQ实例未开启SSL时,通过调用API删除单个队列。 方法四:调用API批量删除队列:在RabbitMQ实例未开启SSL时,通过编写Shell脚本循环调用API执行删除命令,实现批量删除队列。 说明 删除队列前,请确保队列中的消息已经全部被消费,否则未消费的消息将和队列一起被删除。 方法一:在WebUI页面删除单个队列 步骤 1 登录RabbitMQ WebUI页面。 步骤 2 在“Queues”页签,单击需要删除的队列名称,进入队列详情页面。 图1 队列列表 步骤 3 单击“Delete Queue”,删除单个队列。 图2 删除单个队列 方法二:通过Policy批量删除队列 新增与待删除队列的前缀名称相同、且队列TTL为1毫秒的策略,通过此策略实现批量删除队列。 步骤 1 登录RabbitMQ WebUI页面。 步骤 2 在“Admin > Policies”页面中,新增一条策略。 图3 通过Policy批量删除队列 Name:填写策略名称。 Pattern:队列匹配模式,填写队列名称,会匹配前缀同名的队列。例如:设置为“.”时,表示匹配所有队列。设置为“.queuename”时,表示匹配队列名前缀为queuename的所有队列。 Apply to:选择“Queues”。 Priority:可选参数,策略优先级,数字越大,优先级越高。 Definition:定义TTL,单位为毫秒。填写“expires”参数,值设置为“1”,表示队列过期时间为1毫秒。 步骤 3 单击“Add policy”。 在“Queues”页签,查看队列是否成功删除。 步骤 4 队列成功删除后,在“Admin > Policies”页面中,在步骤2中新增的策略后,单击“Clear”,删除策略。 如果保留此策略,它对后续新建的队列依然生效,可能会出现误删除队列的情况。 图4 删除策略
        来自:
      • 延迟消息
        总结 开源 RabbitMQ 不提供原生延时语义 可通过两种方式实现延时消息能力: 死信队列 + TTL:通用、稳定,但灵活性有限 延时消息插件:精度高、使用简单,但依赖部署环境 实际选型需综合考虑: 延迟精度要求 运维复杂度 消息规模与资源成本
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        最佳实践
        延迟消息
      • 实例类(1)
        RabbitMQ实例是否支持持久化,如何定时备份数据? RabbitMQ支持消息数据持久化,可从客户端连接RabbitMQ并设置消息持久化,也可在RabbitMQ集群管理工具界面创建队列时设置消息持久化。 不支持客户自定义定时备份数据,或从界面触发备份数据。 RabbitMQ实例开启SSL后,证书怎么获取? RabbitMQ实例开启SSL后只做单向认证,不需要证书。 RabbitMQ实例的SSL开关是否支持修改? 不支持动态修改,即如果实例创建时没有选择开启,创建完成之后,不支持修改,建议在实例创建时将开关打开。 RabbitMQ实例是否支持扩容? 单机版本的RabbitMQ实例支持扩大存储空间,以及扩容代理规格,暂不支持缩容。 集群版本的RabbitMQ实例支持扩大存储空间和代理个数,以及扩容代理规格,暂不支持缩容。 如何清空队列数据? 1. 登录Web UI。 2. 在“Queues”页签,单击需要清空数据的队列名称,进入队列详情页面。 3. 单击“Purge Messages”,清空队列数据。 RabbitMQ支持双向认证吗? 不支持。 RabbitMQ支持升级CPU和内存吗? RabbitMQ支持扩容/缩容代理规格,具体请参见变更实例规格。 如何关闭RabbitMQ的WebUI? 创建RabbitMQ实例后,如果想要关闭RabbitMQ的WebUI,只要您在安全组入方向中不开放15672端口(实例未开启SSL时的端口)或者15671(实例开启SSL时的端口),此时就无法登录WebUI界面。
        来自:
      • 使用限制
        介绍分布式消息服务RabbitMQ的功能使用限制。 限制项 约束和限制 描述 版本 当前服务端版本有3.8.9和3.8.35 兼容AMQP 091协议的客户端版本。 连接数 RabbitMQ单机和集群实例,不同实例规格的连接数上限不一致,具体限制,请参考产品规格。 。 通道数 2GB 磁盘剩余空间低于2GB会触发磁盘高水位,生产者流程被阻塞。 clusterpartitionhandling pauseminority 当集群发生网络分区时,代理会检查自己是否处于“少数派”(存储分区的代理数小于等于总代理数的一半称为少数派)。少数派中的代理将会自动关闭服务并定期检测网络状态,待分区恢复之后重新启动服务。如果未开启镜像队列,发生分区时少数派上的队列将无法生产消费。此策略相当于放弃了可用性而选择了数据一致性。 rabbitmqdelayedmessageexchange 插件延迟时间存在1%左右的误差,可能提前或者推迟发送消息给消费者。 实例是否开启消息延迟功能。 延时消息最大延时时间 7天 延时消息的最大延时时间。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        产品简介
        使用限制
      • 实例类
        RabbitMQ实例是否支持持久化,如何定时备份数据? RabbitMQ支持消息数据持久化,可从客户端连接RabbitMQ并设置消息持久化,也可在RabbitMQ集群管理工具界面创建队列时设置消息持久化。 不支持客户自定义定时备份数据,或从界面触发备份数据。 RabbitMQ实例开启SSL后,证书怎么获取? RabbitMQ实例开启SSL后只做单向认证,不需要证书。 RabbitMQ实例的SSL开关是否支持修改? 不支持动态修改,即如果实例创建时没有选择开启,创建完成之后,不支持修改,建议在实例创建时将开关打开。 RabbitMQ实例是否支持扩容? 单机版本的RabbitMQ实例支持扩大存储空间,以及扩容代理规格,暂不支持缩容。 集群版本的RabbitMQ实例支持扩大存储空间和代理个数,以及扩容代理规格,暂不支持缩容。 如何清空队列数据? 1. 登录Web UI。 2. 在“Queues”页签,单击需要清空数据的队列名称,进入队列详情页面。 3. 单击“Purge Messages”,清空队列数据。 RabbitMQ支持双向认证吗? 不支持。 RabbitMQ支持升级CPU和内存吗? RabbitMQ支持扩容/缩容代理规格,具体请参见变更实例规格。 如何关闭RabbitMQ的WebUI? 创建RabbitMQ实例后,如果想要关闭RabbitMQ的WebUI,只要您在安全组入方向中不开放15672端口(实例未开启SSL时的端口)或者15671(实例开启SSL时的端口),此时就无法登录WebUI界面。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        常见问题
        实例类
      • 计费项
        本文对分布式消息服务RocketMQ的计费项进行说明。 4.0版本 以下适用于华东1、华北2、西南1、华南2、上海36、青岛20、长沙42、南昌5、武汉41、杭州7、西南2贵州、太原4、郑州5、西安7、呼和浩特3 节点。 分布式消息服务RocketMQ费用计算分为两部分,一部分为实例费用,一部分为存储空间费用。 计费项 含义 适用的计费模式 实例费用 用户选择的选择的实例规格节点数计费,实例规格单价请参考产品资费,节点数为Name Server集群固定3节点+Broker(代理) 2节点。 按需/包周期 存储空间费用 用户选择的实例存储空间计费(每个实例规格您都可以选择高IO、超高IO等多种不同的云硬盘类型以满足您的业务需求)。 按需/包周期 3.0版本 以下适用于南京3、上海7、重庆2、乌鲁木齐27、石家庄20、内蒙6、晋中、北京5 节点。 分布式消息服务RocketMQ按照消息队列的规格版本可分为基础版、中级版和高级版。 计费项:消息队列的规格版本。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        计费说明
        计费项
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • ...
      • 275
      跳转至
      推荐热词
      天翼云运维管理审计系统天翼云云服务平台云服务备份云日志服务应用运维管理云手机云电脑天翼云云hbase数据库电信云大数据saas服务电信云大数据paas服务轻量型云主机天翼云客户服务电话应用编排服务天翼云云安全解决方案云服务总线CSB天翼云服务器配置天翼云联邦学习产品天翼云云安全天翼云企业上云解决方案天翼云产品天翼云视频云存储

      天翼云最新活动

      安全隔离版OpenClaw

      OpenClaw云服务器专属“龙虾“套餐低至1.5折起

      聚力AI赋能 天翼云大模型专项

      大模型特惠专区·Token Plan 轻享包低至9.9元起

      青云志云端助力计划

      一站式科研助手,海外资源安全访问平台,助力青年翼展宏图,平步青云

      企业出海解决方案

      助力您的业务扬帆出海,通达全球!

      天翼云信创专区

      “一云多芯、一云多态”,国产化软件全面适配,国产操作系统及硬件芯片支持丰富

      中小企业服务商合作专区

      国家云助力中小企业腾飞,高额上云补贴重磅上线

      云上钜惠

      爆款云主机全场特惠,2核4G只要1.8折起!

      天翼云奖励推广计划

      加入成为云推官,推荐新用户注册下单得现金奖励

      产品推荐

      多活容灾服务 MDR

      GPU云主机

      轻量型云主机

      弹性高性能计算 E-HPC

      天翼云CTyunOS系统

      训推服务

      AI Store

      知识库问答

      人脸检测

      推荐文档

      产品优势

      删除备份数据

      产品详情

      慢查询日志

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 息壤智算平台
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 天翼云国际站
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2026 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号