云主机开年特惠28.8元/年,0元秒杀等你来抢!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 安全隔离版OpenClaw NEW OpenClaw云服务器专属“龙虾“套餐低至1.5折起
  • 天翼云新春焕新季 NEW 云主机开年特惠28.8元/年,0元秒杀等你来抢!
  • 云上钜惠 爆款云主机全场特惠,2核4G只要1.8折起!
  • 中小企业服务商合作专区 国家云助力中小企业腾飞,高额上云补贴重磅上线
  • 出海产品促销专区 NEW 爆款云主机低至2折,高性价比,不限新老速来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

息壤智算

领先开放的智算服务平台,提供算力、平台、数据、模型、应用“五位一体”智算服务体系,构建全流程的AI基础设施能力
AI Store
  • 算力市场
  • 模型市场
  • 应用市场
  • MCP市场
公共算力服务
  • 裸金属
  • 定制裸金属
训推服务
  • 模型开发
  • 训练任务
  • 服务部署
模型推理服务
  • 模型广场
  • 体验中心
  • 服务接入
应用托管
  • 应用实例
科研助手
  • 科研智能体
  • 科研服务
  • 开发机
  • 并行计算
大模型
  • DeepSeek-V3.1
  • DeepSeek-R1-0528
  • DeepSeek-V3-0324
  • Qwen3-235B-A22B
  • Qwen3-32B
智算一体机
  • 智算一体机
模型适配专家服务
  • 模型适配专家服务
算力服务商
  • 入驻算力服务商

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场进入AI Store创新解决方案公有云生态专区智云上海应用生态专区
建站工具
  • 新域名服务
  • SSL证书
  • 翼建站
企业办公
  • 安全邮箱
  • WPS 365 天翼云版
  • 天翼企业云盘(标准服务版)
灾备迁移
  • 云管家2.0
  • 翼备份(SaaS版)

定价

协助您快速了解云产品计费模式、价格详情,轻松预估上云成本
价格计算器
  • 动态测算产品价格
定价策略
  • 快速了解计费模式

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼信创云专区
  • 信创云专区
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
培训与认证
  • 天翼云学堂
  • 天翼云认证
开源社区
  • 魔乐社区
  • OpenTeleDB

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 服务保障
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家
我要反馈
  • 建议与反馈
  • 用户体验官
信息公告
  • 客户公告

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 息壤智算
  • 产品
  • 解决方案
  • 应用商城
  • 定价
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      分布式消息服务RabbitMQ_相关内容
      • 延迟消息
        前提条件 RabbitMQ 版本 ≥ 3.8 启用官方插件: rabbitmqplugins enable rabbitmqdelayedmessageexchange 天翼云RabbitMQ默认启用延迟交换器交换器插件,无需操作 使用步骤 1. 声明 xdelayedmessage 类型交换机 2. 指定底层路由类型(xdelayedtype) 3. 发送消息时在 Header 中设置 xdelay(毫秒) 代码示例 java //声明延时交换机 Map args new HashMap<>(); args.put("xdelayedtype","direct"); channel.exchangeDeclare("delayedexchange","xdelayedmessage",true,false,args); // 声明并绑定队列 channel.queueDeclare("targetqueue",true,false,false,null); channel.queueBind("targetqueue","delayedexchange","routingkey"); //发送延时消息(延迟 5 秒) Map headers new HashMap<>(); headers.put("xdelay",5000); AMQP.BasicProperties props new AMQP.BasicProperties.Builder() .headers(headers) .build(); channel.basicPublish("delayedexchange","routingkey",props,"延时消息".getBytes(StandardCharsets.UTF8)); 特点说明 延迟精度高,使用方式直观 延迟逻辑在 Exchange 层完成 依赖插件及 RabbitMQ 版本兼容性 注意事项 精度与可靠性 DLX + TTL 可能存在毫秒级到秒级误差 插件方案 延迟精度更高,更适合严格定时场景 资源消耗 延时消息在到期前会占用 Broker 内存 大量长延时消息需重点监控: 队列堆积 内存水位 磁盘使用率 消息顺序 延时消息可能打乱原始发送顺序 对顺序敏感的业务需在消费端自行处理
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        最佳实践
        延迟消息
      • 连接已开启SSL的RabbitMQ实例
        本章节主要介绍如何连接已开启SSL的RabbitMQ实例。 创建实例时开启SASLSSL访问,则数据加密传输,安全性更高。 DMS的RabbitMQ实例兼容开源协议,请参考RabbitMQ官网提供的不同语言的连接和使用向导:< 本节以DMS提供的demo为例,介绍VPC内访问与使用RabbitMQ的方法,假设RabbitMQ客户端部署在弹性云主机上。 前提条件 参考创建实例章节创建RabbitMQ示例,并记录创建时输入的用户名和密码。 创建完成后,单击实例名称,查看并记录实例详情中的“连接地址”。 已创建弹性云主机,并且弹性云主机的VPC、子网、安全组与RabbitMQ实例的VPC、子网、安全组保持一致。 已完成JDK安装及环境变量配置。 命令行模式连接实例 以下操作命令以Linux系统为例进行说明。 1、下载RabbitMQTutorialSSL.zip示例工程代码。 2、解压RabbitMQTutorialSSL.zip压缩包。 $ unzip RabbitMQTutorialSSL.zip 3、进入RabbitMQTutorialSSL目录,该目录下包含预编译好的jar文件。 $ cd RabbitMQTutorialSSL 4、运行生产消息示例。 $ java cp .:rabbitmqtutorialsll.jar Send host port user password 其中,host表示RabbitMQ实例的连接地址,port为RabbitMQ实例的监听端口(默认为5671),user表示RabbitMQ用户名,password表示用户名对应的密码。 图1 生产消息示例 使用Ctrl+C命令退出。 5、运行消费消息示例。 $ java cp .:rabbitmqtutorialsll.jar Recv host port user password 其中,host表示RabbitMQ实例的连接地址,port为RabbitMQ实例的监听端口(默认为5671),user表示RabbitMQ用户名,password表示用户名对应的密码。 图2 消费消息示例 如需停止消费使用Ctrl+C命令退出。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        连接实例
        连接已开启SSL的RabbitMQ实例
      • 购买类
        本节介绍了分布式消息服务RabbitMQ产品常见购买类问题。 到期后如何续费? 在集群列表中点击“续费”,进入购买时长页面,购买成功续费成功。 产品订购时可选资源池节点不一致? 已上线资源池节点的剩余容量达到一定比例后,为确保老客户权益,将不再面向新客户开放,产品订购时的可选资源节点范围以实际为准。 收费依据有哪些? 根据规格和磁盘容量收费。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        常见问题
        购买类
      • 仪表盘
        使用场景 RabbitMQ仪表盘支持实时监控消息流、排查生产消费异常、配置与维护队列 / 交换机 / 绑定、权限管理及集群状态查看,是 RabbitMQ 运维与开发调试的核心工具。 指标说明 指标 指标含义 可消费消息数 队列中已经准备好等待消费者去获取和处理的消息数量。 连接数 当前与 RabbitMQ 服务器建立的 TCP 连接总数。每个生产者或消费者客户端都需要与 RabbitMQ 建立一个 TCP 连接。 信道数 在所有 TCP 连接上打开的 AMQP 信道总数。信道是在 TCP 连接内部建立的虚拟连接,用于多路复用,以减少建立和关闭 TCP 连接的开销。 消费者数 当前所有队列上注册的消费者总数。一个消费者可以监听一个或多个队列。 交换器生产速率 单位时间内发送到某个交换器的消息数量。 交换器消费速率 单位时间内从某个交换器路由到其绑定队列的消息数量。 队列生产速率 单位时间内发送到某个队列的消息数量。 队列消费速率 单位时间内从某个队列成功投递给消费者并收到确认(ACK)的消息数量。 队列可消费消息数 某个队列中当前处于 "Ready" 状态的消息数量。 队列消费者数 当前正在监听某个队列的消费者数量。 VHost连接数 当前连接到某个特定 VHost 的 TCP 连接总数。 VHost信道数 当前在某个特定 VHost 的所有连接上打开的信道总数。 VHost每个连接的信道数 某个 VHost 的信道总数除以其连接总数得到的平均值。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        仪表盘
      • 【优惠】正式开放2年7折,3年5折包年折扣
        面向长期稳定客户,我们特别推出了更加优惠的包年订阅选项,旨在通过深度折扣,帮助客户显著降低资源单位成本,优化整体支出。 自2024年11月9日起订购和续订分布式消息服务RabbitMQ产品更长包周期即可享受2年7折,3年5折优惠。 注意 本次包年优惠适用于新资费产品范围,具体支持资源池请参阅
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        服务公告
        2024年
        【优惠】正式开放2年7折,3年5折包年折扣
      • 使用SSL证书连接
        本文主要介绍使用SSL证书连接。 创建实例时开启SASLSSL访问,则数据加密传输,安全性更高。 DMS的RabbitMQ实例兼容开源协议,请参考RabbitMQ官网提供的不同语言的连接和使用向导:< 本节以DMS提供的demo为例,介绍VPC内访问与使用RabbitMQ的方法,假设RabbitMQ客户端部署在弹性云主机上。 前提条件 参考创建RabbitMQ实例章节创建RabbitMQ示例,并记录创建时输入的用户名和密码。 创建完成后,单击实例名称,查看并记录实例详情中的“连接地址”。 已创建弹性云主机,并且弹性云主机的VPC、子网、安全组与RabbitMQ实例的VPC、子网、安全组保持一致。 已完成JDK安装及环境变量配置,具体操作请参考准备环境。 命令行模式连接实例 以下操作命令以Linux系统为例进行说明。 1、下载RabbitMQTutorialSSL.zip示例工程代码。 2、解压RabbitMQTutorialSSL.zip压缩包。 $ unzip RabbitMQTutorialSSL.zip 3、进入RabbitMQTutorialSSL目录,该目录下包含预编译好的jar文件。 $ cd RabbitMQTutorialSSL 4、运行生产消息示例。 $ java cp .:rabbitmqtutorialsll.jar Send host port user password 其中,host表示RabbitMQ实例的连接地址,port为RabbitMQ实例的监听端口(默认为5671),user表示RabbitMQ用户名,password表示用户名对应的密码。 图1 生产消息示例 使用Ctrl+C命令退出。 5、运行消费消息示例。 $ java cp .:rabbitmqtutorialsll.jar Recv host port user password 其中,host表示RabbitMQ实例的连接地址,port为RabbitMQ实例的监听端口(默认为5671),user表示RabbitMQ用户名,password表示用户名对应的密码。 图2 消费消息示例 如需停止消费使用Ctrl+C命令退出。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        快速入门
        步骤三:连接实例生产消费消息
        使用SSL证书连接
      • 【通知】通用型主机规格调整为白名单特性
        分布式消息服务RabbitMQ主机类型通用型规格调整为白名单特性,更多了解请查看计费项产品规格说明。 调整时间 2024年6月25日 影响范围 所有区域 调整影响 新用户默认不开放主机类型通用型规格订购开通,如需要该特性,请联系技术支持开通后使用。 已购买主机类型通用型规格实例的用户,原实例仍可正常使用,续费、扩容等操作不受影响。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        服务公告
        2024年
        【通知】通用型主机规格调整为白名单特性
      • 如何实现RabbitMQ的高性能
        本节介绍了如何实现RabbitMQ的高性能。 使用较小的队列长度 队列中存在大量消息时,会给内存使用带来沉重的负担,为了释放内存,RabbitMQ会将消息刷新到磁盘。这个过程通常需要时间,由于需要重建索引,重启包含大量消息的集群非常耗时。当刷盘的消息过多时,会阻塞队列处理消息,从而降低队列速度,对RabbitMQ节点的性能产生负面影响。 要获得最佳性能,应尽可能缩短队列。建议始终保持队列消息堆积的数量在0左右。 对于经常受到消息峰值影响的应用程序,和对吞吐量要求较高的应用程序,建议在队列上设置最大长度。这样可以通过丢弃队列头部的消息来保持队列长度,队列长度永远不会大于最大长度设置。 在队列声明时使用对应参数设置。 java //创建队列 HashMap map new HashMap<>(); //设置队列最大长度 map.put("xmaxlength",10 ); //设置队列溢出方式保留前10 map.put("xoverflow","rejectpublish" ); channel.queueDeclare(queueName,false,false,false,map); 当队列长度超过设置的最大长度时,RabbitMQ的默认做法是将队列头部的信息(队列中最老的消息)丢弃或变成死信。可以通过设置不同的overflow值来改变这种方式,如果overflow值设置为drophead,表示从队列前面丢弃或deadletter消息,保存后n条消息。如果overflow值设置为rejectpublish,表示最近发布的消息将被丢弃,即保存前n条消息。 自动删除不再使用的队列 客户端可能连接失败导致队列被残留,大量的残留队列会影响实例的性能。RabbitMQ提供三种自动删除队列的方法: 在队列中设置TTL策略:例如TTL策略设置为28天,当持续28天队列未被使用时,此队列将被删除。 使用autodelete队列:当最后一个消费者退出或通道/连接关闭(或与服务器的TCP连接丢失)时,autodelete队列会被删除。 使用exclusive queue:exclusive queue只能在创建它的连接中使用,当此连接关闭或消失时,exclusive queue会被删除。 设置方法如下: java boolean exclusive true; boolean autoDelete true; channel.queueDeclare(QUEUENAME, durable, exclusive, autoDelete, arguments);
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        最佳实践
        如何实现RabbitMQ的高性能
      • 【通知】产品订购、续订包周期2年和3年选项调整为白名单特性
        尊敬的天翼云客户,分布式消息服务RabbitMQ自2025年12月27日起,订购和续订2年、3年选项默认不开放,调整为白名单特性。 调整时间 2025年12月27日 影响范围 所有区域 调整影响 新订购和续订的实例默认不开放2年、3年选项,您可以选择1年包年选项,如仍需要23年包周期选项,请联系技术支持开通后使用。 已购买2年、3年且还在服务期间的实例仍可继续正常使用不受影响。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        服务公告
        2025年
        【通知】产品订购、续订包周期2年和3年选项调整为白名单特性
      • 网络异常自动恢复
        本文主要介绍在网络异常时如何自动恢复分布式消息服务RabbitMQ。 本章介绍客户端与服务端由于服务端重启、网络抖动等原因造成网络连接断开时,如何在客户端设置网络自动恢复。4.0.0及以上版本的Java客户端默认支持网络自动恢复,无需设置。 说明 如果应用程序使用Connection.Close方法关闭连接,则不会启用或触发网络自动恢复。 触发自动恢复的场景 以下场景,会触发网络自动恢复: 在连接的I/O循环中抛出未处理的异常 检测到Socket读取超时 检测到服务端心跳丢失 重试连接示例代码 客户端和服务端的初始连接失败,不会触发自动恢复,建议您编写对应的应用程序代码,通过重试连接来解决初始连接失败的问题。 以下示例演示了使用Java客户端通过重试连接解决初始连接失败的问题。 ConnectionFactory factory new ConnectionFactory(); // enable automatic recovery if using RabbitMQ Java client library prior to version 4.0.0. factory.setAutomaticRecoveryEnabled(true); // configure various connection settings try { Connection conn factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        最佳实践
        网络异常自动恢复
      • 如何实现RabbitMQ的高性能
        本文主要介绍如何实现RabbitMQ的高性能最佳实践。 本章节基于吞吐量和可靠性两个指标,指导您通过设置队列长度、集群负载均衡、优先队列数量等参数,实现RabbitMQ的高性能。 使用较小的队列长度 队列中存在大量消息时,会给内存使用带来沉重的负担,为了释放内存,RabbitMQ会将消息刷新到磁盘。这个过程通常需要时间,由于需要重建索引,重启包含大量消息的集群非常耗时。当刷盘的消息过多时,会阻塞队列处理消息,从而降低队列速度,对RabbitMQ节点的性能产生负面影响。 要获得最佳性能,应尽可能缩短队列。建议始终保持队列消息堆积的数量在0左右。 对于经常受到消息峰值影响的应用程序,和对吞吐量要求较高的应用程序,建议在队列上设置 最大长度 。这样可以通过丢弃队列头部的消息来保持队列长度,队列长度永远不会大于最大长度设置。 最大长度可以通过Policy设置,也可以通过在队列声明时使用对应参数设置。 在Policy中设置 在队列声明时使用对应参数设置 //创建队列 HashMap map new HashMap<>(); //设置队列最大长度 map.put("xmaxlength",10 ); //设置队列溢出方式保留前10 map.put("xoverflow","rejectpublish" ); channel.queueDeclare(queueName,false,false,false,map); 当队列长度超过设置的最大长度时,RabbitMQ的默认做法是将队列头部的信息(队列中最老的消息)丢弃或变成死信。可以通过设置不同的overflow 值来改变这种方式,如果overflow 值设置为 drophead ,表示从队列前面丢弃或deadletter消息,保存后n条消息。如果overflow 值设置为 rejectpublish ,表示最近发布的消息将被丢弃,即保存前n条消息。 说明 如果同时使用以上两种方式设置队列的最大长度,两者中较小的值将被使用。 超过队列最大长度的消息会被丢弃,请谨慎使用。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        最佳实践
        如何实现RabbitMQ的高性能
      • 手工部署RabbitMQ(CentOS 7.4)
        本文介绍了如何在天翼云上使用弹性云主机的Linux实例部署RabbitMQ。 本文介绍了如何在天翼云上使用弹性云主机的Linux实例部署RabbitMQ。RabbitMQ是采用Erlang语言实现AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。RabbitMQ凭借其高可靠、易扩展、高可用及丰富的功能特性成为目前非常热门的一款消息中间件。 前提条件 弹性云主机所在安全组添加了如下表所示的安全组规则,具体步骤参见为安全组添加安全组规则。 方向 类型 协议 端口/范围 源地址 入方向 IPv4 TCP 5672 0.0.0.0/0 入方向 IPv4 TCP 15672 0.0.0.0/0 操作步骤 1. 安装相关依赖包和perl。 登录弹性云主机。 执行以下命令,安装相关依赖包。 plaintext yum y install make gcc gccc++ m4 ncursesdevel openssldevel unixODBCdevel 执行如下命令,安装perl。 plaintext yum install perl 2. 安装erlang。 执行如下命令,下载erlang安装包。 plaintext wget 执行如下命令,解压缩安装包。 plaintext tar xzf otpsrc19.3.tar.gz 解压后生成一个“otpsrc19.3”文件夹。 执行如下命令,创建文件夹“erlang”。 plaintext mkdir /usr/local/erlang 执行如下命令,进入解压后生成的文件夹“otpsrc19.3”。 plaintext cd otpsrc19.3 执行如下命令,检查系统是否符合安装要求。 plaintext ./configure prefix/usr/local/erlang withoutjavac 执行如下命令,编译并安装Erlang。 plaintext make && make install 执行如下命令,配置erlang环境变量。 执行如下命令打开配置文件"profile"。 plaintext vi /etc/profile 按i键进入编辑模式。 修改打开的“profile”文件,将如下内容添加到文件末尾。 plaintext export PATH$PATH:/usr/local/erlang/bin 按Esc键退出编辑模式,并输入:wq保存后退出。 执行如下命令,使环境变量生效。 plaintext source /etc/profile 执行如下命令,检查安装结果。 plaintext erl version 回显类似如下信息,说明erlang安装成功。 plaintext [root@ecsrabbitmq ~]
        来自:
        帮助文档
        弹性云主机 ECS
        最佳实践
        搭建网站/应用最佳实践
        手工部署RabbitMQ(CentOS 7.4)
      • 【通知】云原生引擎调整为白名单特性
        分布式消息服务RabbitMQ引擎类型云原生引擎调整为白名单特性,更多了解请查看快速入门购买实例引擎类型说明。 调整时间 2024年7月5日 影响范围 所有区域 调整影响 新用户默认不开放云原生引擎订购开通,如需要该特性,请联系技术支持开通后使用。 已购买云原生引擎实例的用户,原实例仍可正常使用,续费、扩容等操作不受影响。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        服务公告
        2024年
        【通知】云原生引擎调整为白名单特性
      • 分布式消息服务RocketMQ(1)
        分布式消息服务RocketMQ是一款低成本、高可靠、高性能的消息中间件产品,兼容开源RocketMQ客户端,提供高效可靠的消息传递服务,解决分布式应用系统之间的消息数据通信难题,用于系统间的解耦,用户只需专注业务,无需部署运维,适用于电商、金融、政企等多样业务场景。
        来自:
      • 服务等级目标(SLO)
        产品名称 可用性SLO 弹性云主机 99.995% 物理机 99% 弹性伸缩服务 99.95% 云硬盘 99.999% 弹性文件服务 99.9% 对象存储(经典I型) 99.99% 对象存储(原生版)I型 99.95% 媒体存储 99.99% 弹性IP 99% 弹性负载均衡 99.95% VPN连接 99.95% 分布式缓存Redis 99.95% 分布式消息RocketMQ 99.95% 分布式消息RabbitMQ 99.95% 分布式消息Kafka 99.95% CDN加速 99.90%
        来自:
        帮助文档
        服务等级目标
        天翼云服务等级目标
        服务等级目标(SLO)
      • 步骤四:配置必须的监控告警
        本文主要介绍如何配置分布式消息服务RabbitMQ必须的监控告警。 本章节主要介绍部分监控指标的告警策略,以及配置操作。在实际业务中,建议按照表1告警策略,配置监控指标的告警规则。 表1 RabbitMQ实例配置告警的指标 指标名称 告警策略 指标说明 解决方案 内存高水位状态 告警阈值:原始值>1 连续触发次数:1 告警级别:致命 告警阈值为1表示触发内存高水位,会阻塞消息生产 加快消费 采用生产者确认的发送模式,并监控生产端消息生产速度和时长,当消息生产时长有明显增加时进行流控措施 磁盘高水位状态 告警阈值:原始值>1 连续触发次数:1 告警级别:致命 告警阈值为1表示触发磁盘高水位,会阻塞消息生产 减少惰性队列的消息堆积 减少持久化队列的消息堆积 删除队列 内存使用率 告警阈值:原始值>业务预期使用率(推荐30%) 连续触发次数:连续3~5个周期 告警级别:重要 该指标需要分别为每个节点设置内存使用率告警,避免触发内存高水位阻塞生产 加快消费 采用生产者确认的发送模式,并监控生产端消息生产速度和时长,当消息生产时长有明显增加时进行流控措施 CPU使用率 告警阈值:原始值>业务预期使用率(推荐70%) 连续触发次数:连续3~5个周期 告警级别:重要 该指标需要分别为每个节点设置CPU使用率告警,CPU使用率过高可能会影响生产速度 减少镜像队列个数 对于集群实例,建议扩容节点个数,然后进行节点间重平衡 可消费消息数 告警阈值:原始值>业务预期可消费消息数 连续触发次数:1 告警级别:重要 可消费消息数过多表示消息堆积 请参考消息堆积的解决办法 未确认消息数 告警阈值:原始值>业务预期未确认消息数 连续触发次数:1 告警级别:重要 未确认消息数过多可能会导致消息堆积 检查消费者是否异常 检查消费者逻辑是否消耗时间过长 连接数 告警阈值:原始值>业务预期连接数 连续触发次数:1 告警级别:重要 连接数突增可能是流量变大的预警 需检查业务是否正常,可参考其他告警 通道数 告警阈值:原始值>业务预期通道数 连续触发次数:1 告警级别:重要 通道数突增可能是流量变大的预警 需检查业务是否正常,可参考其他告警 Erlang进程数 告警阈值:原始值>业务预期进程数 连续触发次数:1 告警级别:重要 进程数突增可能是流量变大的预警 需检查业务是否正常,可参考其他告警
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        快速入门
        步骤四:配置必须的监控告警
      • Python
        本文介绍Python版本的RabbitMQ客户端连接指导,包括RabbitMQ客户端安装,以及生产、消费消息。 使用前请参考收集连接信息收集RabbitMQ所需的连接信息。 准备环境 1. 安装Python3 2. 安装kombu pip3 install kombu 生产消息 SSL认证方式 import ssl import sys from kombu import Connection, Exchange, Producer def Main(argv): cacertfile "certscacertificate.pem" certfile "certsclientrabbitmqcertificate.pem" privatekey "certsclientrabbitmqkey.pem" conn Connection('amqp://XXX:xxx/', loginmethod'EXTERNAL', ssl{ 'cacerts': cacertfile, 'keyfile': privatekey, 'certfile': certfile, 'certreqs': ssl.CERTREQUIRED, 'sslversion': ssl.PROTOCOLTLSv12, }) channel conn.channel() exchange Exchange("exampleexchange", type"direct") producer Producer(exchangeexchange, channelchannel, routingkey"testkey") message "Hello Rabbimtq" producer.publish(message, retryTrue) print('send message: %s' % message) if name 'main': Main(sys.argv) 非SSL认证方式 import sys from kombu import Connection, Exchange, Producer def Main(argv): rabbitmqurl 'amqp://USERNAME:PASSWORD@XXX:xxx/' conn Connection(rabbitmqurl, loginmethod'PLAIN') channel conn.channel() exchange Exchange("exampleexchange", type"direct") producer Producer(exchangeexchange, channelchannel, routingkey"testkey") message "Hello Rabbimtq" producer.publish(message, retryTrue) print('send message: %s' % message) if name 'main': Main(sys.argv) 消费消息 SSL认证方式 import sys import ssl from kombu import Connection, Exchange, Queue, Consumer def Main(argv): cacertfile "certscacertificate.pem" certfile "certsclientrabbitmqcertificate.pem" privatekey "certsclientrabbitmqkey.pem" conn Connection('amqp://XXX:xxx/', loginmethod'EXTERNAL', ssl{ 'cacerts': cacertfile, 'keyfile': privatekey, 'certfile': certfile, 'certreqs': ssl.CERTREQUIRED, 'sslversion': ssl.PROTOCOLTLSv12, }) exchange Exchange("exampleexchange", type"direct") queue Queue(name"examplequeue", exchangeexchange, routingkey"testkey") def processmessage(body, message): print("receive message: %s" % format(body)) message.ack() with Consumer(conn, queuesqueue, callbacks[processmessage], accept["text/plain"]): conn.drainevents(timeout2) if name 'main': Main(sys.argv)
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        开发指南
        Python
      • 分布式消息服务RocketMQ
        分布式消息服务RocketMQ是一款低成本、高可靠、高性能的消息中间件产品,兼容开源RocketMQ客户端,提供高效可靠的消息传递服务,解决分布式应用系统之间的消息数据通信难题,用于系统间的解耦,用户只需专注业务,无需部署运维,适用于电商、金融、政企等多样业务场景。
        来自:
        帮助文档
        分布式消息服务RocketMQ
      • 虚拟主机限制管理
        本节介绍了在RabbitMQ实例中如何创建、修改和删除虚拟主机限制策略。 背景信息 RabbitMQ虚拟主机限制管理是一种用于管理RabbitMQ虚拟主机的功能,它允许管理员对虚拟主机的资源和权限进行限制和管理。 虚拟主机是RabbitMQ中的逻辑隔离单位,它允许在同一台RabbitMQ服务器上创建多个独立的消息队列环境。每个虚拟主机都有自己的队列、交换机、绑定和权限设置。通过虚拟主机限制管理,管理员可以对RabbitMQ服务器上的虚拟主机进行细粒度的控制和管理。这有助于确保不同的应用程序或用户之间的隔离性、安全性和资源利用率,提高整个消息队列系统的可靠性和性能。 操作步骤 1.登录管理控制台。 2.进入RabbitMQ管理控制台。 3.在实例列表页在操作列,目标实例行点击“管理”。 4.点击“集群管理”后点击“虚拟主机限制”到达虚拟主机页面,点击“新建”按钮。 5.点击“新建”后出现以下界面,选择虚拟主机,添加限制策略内容。 限制项参数 说明 maxconnections 最大TCP连接。 maxqueues 最大队列数。 6.在目标虚拟主机限制行点击“删除”或“修改”,即可删除当前虚拟主机策略。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        虚拟主机管理
        虚拟主机限制管理
      • Java
        使用Maven方式引入依赖 com.rabbitmq amqpclient 5.7.0 生产消息 import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class RabbitmqProducer { // private final static String EXCHANGENAME "exchangeTest"; private final static String QUEUENAME "helloMQ"; // private final static String ROUTINGKEY "test"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // 创建连接工厂 ConnectionFactory factory new ConnectionFactory(); // 设置主机ip factory.setHost("127.0.0.1"); // 设置amqp的tcp端口号 factory.setPort(5672); // 设置用户名密码 factory.setUsername("YOUR USERNAME"); factory.setPassword("YOUR PASSWORD"); // 设置Vhost,需要在控制台先创建 factory.setVirtualHost("/"); // 基于网络环境合理设置超时时间 factory.setConnectionTimeout(30 1000); factory.setHandshakeTimeout(30 1000); factory.setShutdownTimeout(0); // 创建一个连接 Connection connection factory.newConnection(); // 创建一个频道 Channel channel connection.createChannel(); // 指定一个队列 channel.queueDeclare(QUEUENAME, false, false, false, null); for (int i 0; i < 100; i++) { // 发送的消息 String message "Hello rabbitMQ!" + i; // 往队列中发送一条消息,使用默认的交换器 channel.basicPublish("", QUEUENAME, null, message.getBytes(StandardCharsets.UTF8)); System.out.println(" Sent message: '" + message + "'"); TimeUnit.MILLISECONDS.sleep(100); } //关闭频道和连接 channel.close(); connection.close(); } }
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        开发指南
        Java
      • 创建Vhost
        本章节主要介绍如何创建Vhost。 操作场景 每个Vhost(Virtual Hosts)相当于一个相对独立的RabbitMQ服务器。Vhost用作逻辑隔离,分别管理Exchange、Queue和Binding,使得应用安全地运行在不同的Vhost上,相互之间不会干扰。一个实例下可以有多个Vhost,一个Vhost里可以有若干个Exchange和Queue。生产者和消费者连接RabbitMQ实例时,需要指定一个Vhost。Vhost的相关介绍,请参考官网文档Virtual Hosts。 本章节主要介绍创建Vhost的操作,有以下几种方式,您可以根据实际情况选择任意一种方式: 方式一:在控制台创建 方式二:使用RabbitMQ WebUI创建 方式三:调用API创建 方式一:在控制台创建 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 此处请选择RabbitMQ实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”>“分布式消息服务”>“RabbitMQ专享版”,进入分布式消息服务RabbitMQ专享版页面。 步骤 4 单击实例名称,进入实例详情页面。 步骤 5 在左侧导航栏选择“Vhost列表”,进入Vhost列表页面。 步骤 6 单击“创建Vhost”,弹出“创建Vhost”对话框。 步骤 7 设置Vhost的名称,单击“确定”。 创建成功后,在Vhost列表页面显示创建成功的Vhost。 图1 Vhost列表(控制台) “tracing”表示是否开启消息追踪功能。开启消息追踪后,您可以跟踪消息的转发路径。 说明 Vhost创建成功后,无法修改名称。 实例创建后,会自动创建一个名为“/”的Vhost。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        Vhost管理
        创建Vhost
      • 产品定义
        本文带您了解什么是分布式消息服务RocketMQ产品。 分布式消息服务RocketMQ是一款低成本、高可靠、高性能的消息中间件产品,兼容开源RocketMQ客户端,提供高效可靠的消息传递服务,解决分布式应用系统之间的消息数据通信难题,用于系统间的解耦,用户只需专注业务,无需部署运维,适用于电商、金融、政企等多样业务场景。 产品示意图 分布式消息服务RocketMQ发布订阅基本流程如下: 1、Producer连接Nameserver,产生数据放入不同的Topic; 2、对于RocketMQ,一个Topic可以分布在各个Broker上,我们可以把一个Topic分布在一个Broker上的子集定义为一个Topic分片; 3、将Topic分片再切分为若干等分,其中的一份就是一个Queue。每个Topic分片等分的Queue的数量可以不同,由用户在创建Topic时指定; 4、Consumer连接Nameserver,根据Broker分配的Queue来消费数据。 核心组件 分布式消息服务RocketMQ主要由Producer、Broker、Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。Broker在实际部署过程中对应1台或者多台服务器,每个Broker可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的Broker。Message Queue用于存储消息的物理地址,每个Topic中的消息地址存储于多个Message Queue中。ConsumerGroup由多个Consumer实例构成。更多信息请参见产品架构。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        产品简介
        产品定义
      • 环境准备
        本文主要介绍分布式消息服务RabbitMQ所需的环境准备。 创建RabbitMQ实例前,您需要提前准备相关依赖资源,包括虚拟私有云(Virtual Private Cloud,以下简称VPC)、子网和安全组,并配置安全组策略。每个RabbitMQ实例都部署在某个VPC中,并绑定具体的子网和安全组,通过这样的方式为RabbitMQ提供一个隔离的、用户自主配置和管理的虚拟网络环境以及安全保护策略,提升实例的安全性。 如果用户已有VPC,可重复使用,不需要多次创建。 创建VPC 1、登录管理控制台。 2、在管理控制台右上角单击,选择区域。 说明 此处请选择与您的应用服务相同的区域。 3、在管理控制台左上角单击,选择“网络 > 虚拟私有云”。 4、单击“创建虚拟私有云”,进入“创建虚拟私有云”界面。 5、根据界面提示创建虚拟私有云。如无特殊需求,界面参数均可保持默认。关于创建VPC的详细信息可以参考《虚拟私有云用户指南》。 创建虚拟私有云时,会同时创建子网,若需要额外创建子网,请参考6。如果不需要额外创建子网,请执行7。 6、在左侧导航栏,单击“子网”,进入“子网”界面。单击“创建子网”。根据界面提示创建子网。如无特殊需求,界面参数均可保持默认。 关于创建子网的详细信息可以参考《虚拟私有云用户指南》。 7、在左侧导航栏,选择“访问控制 > 安全组”,进入“安全组”界面。根据界面提示创建安全组。如无特殊需求,界面参数均可保持默认。 关于创建安全组的详细信息可以参考《虚拟私有云用户指南》。 使用RabbitMQ实例前,添加表1所示安全组规则,其他规则请根据实际需要添加。 说明 创建安全组后,系统默认添加入方向“允许安全组内的弹性云主机彼此通信”规则和出方向“放通全部流量”规则,此时使用内网通过同一个VPC访问RabbitMQ实例,无需添加表1的规则。 表1 安全组规则 方向 协议 端口 源地址 说明 入方向 TCP 5672 0.0.0.0/0 访问RabbitMQ实例(关闭SSL) 入方向 TCP 5671 0.0.0.0/0 访问RabbitMQ实例(开启SSL) 入方向 TCP 15672 0.0.0.0/0 访问Web界面UI地址(关闭SSL加密) 入方向 TCP 15671 0.0.0.0/0 访问Web界面UI地址(开启SSL加密)
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        环境准备
      • 消息堆积对业务的影响及解决办法
        本文主要介绍消息堆积对业务的影响及解决办法。 消息堆积对业务的影响 过量的消息堆积可能会引起内存或磁盘告警,从而造成所有connection阻塞,进而影响到其他队列的使用,导致整体服务质量的下降。 消息堆积产生的原因 1. 一般来说消息堆积是由于生产消息的速率远大于消费消息的速率所导致的。比如某个时间段消费端处理消息异常缓慢,发送一条消息只要3秒钟,而消费一条消息需要1分钟,每分钟发送20个消息,只有一个消息被消费端处理,这样队列中就会产生大量的消息堆积。 2. 消费者出现异常,生产者一直在发送消息,但是消费者不能消费,造成消息积压。 3. 消费者没有出现异常,但是消费者与队列间的订阅可能出现了异常,也会导致消息无法被消费从而造成堆积的情况。 4. 消费者正常,与队列间的订阅也正常,但是消费端的代码本身逻辑耗费时间长导致了消费能力降低,这时候就会出现1中的情况从而导致消息堆积。 解决消息堆积的办法 1. 生产速率较快,消费速率较慢 :您可以通过以下方法解决。 增加消费者数量提高消费速率。 采用生产者确认的发送模式,并监控生产端消息生产速度和时长,当消息生产时长有明显增加时进行流控措施。 2. 消费者异常 :建议排查消费者逻辑是不是有问题,优化程序。 3. 消费者与队列间的订阅异常 :建议排查队列和消费者之间的订阅是否正常。 4. 消费端的代码本身逻辑耗费时间长 :建议给消息设置过期时间,设置方法如下: 在生产消息时,设置消息过期时间。消息过期时间以expiration值体现。 在properties中设置expiration值,单位为毫秒(ms)。 AMQP.BasicProperties properties new AMQP.BasicProperties().builder() .deliveryMode(2) .contentEncoding("UTF8") .expiration("10000") .build(); String message "hello rabbitmq"; channel.basicPublish(exchange, routingKey, properties, message.getBytes(StandardCharsets.UTF8)); 在Web界面中设置expiration值,单位为毫秒(ms)。 登录Web界面,在“Exchanges”页签,单击Exchange名称,进入Exchange详情页。在“Publish message”区域,设置expiration值,如下图所示。 设置队列过期时间。队列过期时间以xmessagettl值体现。从消息进入队列开始计算,超过了配置的队列过期时间,消息会自动被删除。 在客户端代码中设置xmessagettl值,单位为毫秒(ms)。 Map arguments new HashMap (); arguments.put("xmessagettl", 10000); channel.queueDeclare(queueName, true, false, false, arguments); 在Web界面新建队列时,设置xmessagettl值,单位为毫秒(ms)。 登录Web界面,在“Exchanges”页签,新建队列时,设置xmessagettl值,如下图所示。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        常见问题
        复杂操作问题
        消息堆积对业务的影响及解决办法
      • 用户管理
        本节介绍了在RabbitMQ实例中如何创建、修改和删除用户。 背景信息 客户端访问分布式消息服务RabbitMQ 版服务端时,需要传入用户名和密码进行权限认证,认证通过才允许访问服务端。 操作步骤 创建用户 1.登录管理控制台。 2.进入RabbitMQ管理控制台。 3.在实例列表页在操作列,目标实例行点击“管理”。 4.点击“集群管理”后点击“用户”到达用户管理页面,点击“新建”按钮。 5.点击“新建”后出现以下画面,输入用户密码后点击“确定”即可创建。 修改用户 1.在用户管理页面,在目标用户行点击“修改”,即可重置用户密码。 删除用户 1.在用户管理页面,在目标用户行点击“删除”,即可删除用户。 获取用户token 仅云原生引擎支持 (1)登录管理控制台。 (2)进入RabbitMQ管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“集群管理”后点击“用户”到达用户管理页面,点击“Token”按钮。 在RabbitMQ中,获取用户令牌(token)的作用如下: 认证和授权:用户令牌用于认证和授权用户对RabbitMQ的访问权限。通过获取有效的用户令牌,可以验证用户的身份,并根据其权限配置来限制或授予其对虚拟主机、队列、交换机等资源的访问权限。 安全性:通过使用用户令牌进行认证,可以增加RabbitMQ系统的安全性。只有具有有效令牌的用户才能访问和执行相应的操作,从而减少了未经授权的访问和潜在的安全风险。 资源管理:用户令牌可以用于管理和限制用户对RabbitMQ资源的使用。通过为每个用户分配独立的令牌,可以控制其对虚拟主机、队列、交换机等资源的使用情况,避免资源滥用或过度消耗。 追踪和审计:通过用户令牌,可以追踪和记录用户对RabbitMQ的操作和访问历史。这对于安全审计、故障排查和性能优化等方面非常有用,可以帮助管理员了解系统的使用情况和问题定位。 总之,获取用户令牌是为了实现认证、授权和安全性,以及对RabbitMQ资源的管理和追踪。通过令牌,可以确保只有经过授权的用户才能访问和操作RabbitMQ,提高系统的安全性和可管理性。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        用户管理
      • 云审计服务支持的RabbitMQ操作列表
        本页面主要介绍分布式消息服务RabbitMQ对接的云审计服务使用和查看方法。 操作场景 本服务现已对接天翼云云审计服务,云审计服务提供对各种云资源操作的记录和查询功能,用于支撑合规审计、安全分析、操作追踪和问题定位等场景,同时提供事件跟踪功能,将操作日志转储至对象存储实现永久保存。 云审计可提供的功能服务具体如下: ● 记录审计日志:支持用户通过管理控制台或API接口发起的操作,以及各服务内部自触发的操作。 ● 审计日志查询:支持在管理控制台对7天内操作记录按照事件类型、事件来源、资源类型、筛选类型、操作用户和事件级别等多个维度进行组合查询。 使用限制 ● 云审计服务本身免费,包括时间记录以及7天内时间的存储和检索。 ● 用户通过云审计能查询到多久前的操作事件:7天。 ● 用户操作后多久可以通过云审计查询到数据:5分钟。 ● 其它限制请参考使用限制云审计。 操作步骤 1. 开通云审计服务。 参见开通云审计服务云审计。 2. 查看云审计事件。 参见查看审计事件云审计。 3. 在事件列表中,选择事件来源为“容器与中间件”,资源类型选择“分布式消息服务Kafka”,上方时间选择需要筛选的时间段。点击查询即可。 4. 在审计事件右侧点击详情,可以看到更详细的事件信息。 更多云审计相关使用说明和常见问题请参考用户指南、常见问题。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        云审计服务支持的关键操作
        云审计服务支持的RabbitMQ操作列表
      • 事件目标概述
        事件目标是事件规则的一部分,负责消费经事件规则过滤与转换后的事件。 事件目标 事件总线EventBridge支持以下事件目标: 函数计算 分布式消息服务RocketMQ 分布式消息服务RabbitMQ 分布式消息服务Kafka HTTP、HTTPS地址
        来自:
        帮助文档
        事件总线
        用户指南
        事件总线
        事件规则
        事件目标
        事件目标概述
      • 消息幂等
        如果消息重复消费会影响您的业务处理,要对消息做幂等处理。本文介绍消息幂等的概念、适用场景以及处理方法。 概念 在消息领域,幂等是指Consumer重复消费某条消息时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响。 例如,在支付场景下,Consumer消费扣款消息,对一笔订单执行扣款操作,扣款金额为500元。如果因网络不稳定等原因导致扣款消息重复投递,Consumer重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费500元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消息幂等。 适用场景 在互联网应用中,尤其在网络不稳定的情况下,分布式消息服务RabbitMQ的消息有可能会出现重复。如果消息重复消费会影响您的业务处理,请对消息做幂等处理。消息重复的可能原因如下: 发送时消息重复 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时Producer意识到消息发送失败并尝试再次发送消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息。 投递时消息重复 消息消费的场景下,消息已投递到Consumer并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,分布式消息服务RabbitMQ的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息。 负载均衡时消息重复(包括但不限于网络抖动、服务端重启以及Consumer应用重启) 当分布式消息服务RabbitMQ的服务端或客户端重启、扩容或缩容时,会触发Rebalance,此时Consumer可能会收到重复消息。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        最佳实践
        消息幂等
      • 服务内联委托管理
        可信云服务可以通过IAM委托的方式访问其他云服务的资源。可信实体为天翼云服务的IAM委托,包括普通云服务委托和云服务关联委托。本文介绍事件总线EventBridge的服务内联委托。 什么是服务内联委托 在某些场景下,事件总线EventBridge为了完成自身的某个功能,需要获取其他云服务的访问权限,因此,事件总线EventBridge创建了与云服务内联委托,即服务内联委托CtyunAssumeRoleForEventBridge。 使用事件总线EventBridge,系统提供的服务内联委托及其包含的系统权限策略如下: 服务内联委托:CtyunAssumeRoleForEventBridge 系统权限策略:CtyunAssumePolicyForEventBridge CtyunAssumeRoleForEventBridge 服务内联委托CtyunAssumeRoleForEventBridge具有获取访函数列表、函数详情以及调用函数的权限;具有对分布式消息服务Kafka、分布式消息服务RocketMQ、分布式消息服务MQTT与分布式消息服务RabbitMQ的管理员权限;具有专有网络VPC、VPCE的管理员权限。 服务内联委托CtyunAssumeRoleForEventBridge被授予权限策略CtyunAssumePolicyForEventBridge,该权限策略的内容如下: plaintext { "Version": "1.1", "Statement": [ { "Action": [ "cf:inst:InvokeFunction", "cf:inst:GetFunction", "cf:inst:ListFunctions", "KAFKA::", "MQ2::", "mqtt::", "AMQP::", "vpce::", "vpc::" ], "Resource": [ "" ], "Effect": "Allow" } ] } 以下是使用事件总线EventBridge时,需要使用服务内联委托的场景: 建立函数计算规则时,需要委托事件总线EventBridge具有获取访函数列表、函数详情以及调用函数的权限。 建立消息中间件事件源与事件目标时,需要委托事件总线EventBridge具有对分布式消息服务Kafka、分布式消息服务RocketMQ、分布式消息服务MQTT与分布式消息服务RabbitMQ的管理员权限。 建立网络端点时,需要委托事件总线EventBridge具有专有网络VPC、VPCE的管理员权限。
        来自:
        帮助文档
        事件总线
        产品简介
        服务内联委托管理
      • 自定义分布式消息服务MQTT事件源
        本文介绍如何在事件总线EventBridge管理控制台添加分布式消息服务MQTT类型的自定义事件源。 前提条件 事件总线EventBridge 开通事件总线EventBridge并授权。 创建自定义总线。 分布式消息服务MQTT 开通分布式消息服务MQTT并创建实例。 操作步骤 1. 登录事件总线管理控制台。 2. 在左侧导航栏,单击事件总线。 3. 在事件总线页面,单击目标总线名称。 4. 在左侧导航栏,单击事件源。 5. 在事件源 页面,单击添加事件源。 6. 在添加自定义事件源 面板,输入名称 和描述 ,事件提供方 选择分布式消息服务MQTT,选择或填入主题等配置然后单击确认,如图1所示。 图1 创建事件源时事件提供方选择分布式服务MQTT 参数说明 参数 说明 示例 名称 事件源名。 test MQTT实例 选择MQTT实例。 instancexxx MQTT主题 选择或输入MQTT的主题。 topicxxx 事件示例 plaintext { "id": "b5771f766cdf48edb1bad15418c", "source": "sourcetest", "specversion": "1.0", "subject": "ctyun:mqtt:bb9fdb42056f1xxxxxx2ac110002:dab4124xxxxxx7eb2774f45c6a6db69:topic/
        来自:
        帮助文档
        事件总线
        用户指南
        事件总线
        事件源
        自定义事件源类型
        自定义分布式消息服务MQTT事件源
      • RocketMQ触发器
        RocketMQ触发器 RocketMQ触发器可以订阅分布式消息服务RocketMQ并根据消息触发关联的工作流,借此能力,使得工作流可以消费指定topic的消息,执行自定义处理逻辑。 注意事项 RocketMQ触发器订阅的RocketMQ实例必须和工作流在相同地域。 前提条件 已创建工作流。 开通分布式消息服务RocketMQ实例(RocketMQ引擎类型),详情请参考开通RocketMQ实例。 创建Topic和GroupID。 创建用户,且默认Topic权限设置为:PUBSUB,默认消费组权限为SUB。详情请参考创建用户。 触发消息格式 有两种消息格式:RawData和CloudEvent格式,可在触发器配置里选择。 CloudEvent格式: plaintext [ { "id": "21000777109E05EF04B574B8A1DF0001", "source": "ctyun.faas.trigger.rocketmq", "specversion": "1.0", "type": "rocketmq:topic:sendmessage", "datacontenttype": "application/json", "subject": ":mqfunchckzeddbxjrockettest:testforfaas", "time": "573610703T16:18:39Z", "data": { "topic": "testforfaas", "properties": { "CLUSTER": "1dafcb4049ba42df96d80b7dd2f99c5e", "CONSUMESTARTTIME": "1747987057130", "KEYS": "webtest", "MAXOFFSET": "2", "MINOFFSET": "0", "TAGS": "17479870570970", "UNIQKEY": "21000777109E05EF04B574B8A1DF0001" }, "data": "WebTestTools174798gjkS" } } ] 参数 类型 示例值 描述 id string 21000777109E05EF04B574B8A1DF0001 事件ID。标识事件的唯一值。提取自RocketMQ消息。 source string ctyun.faas.trigger.rocketmq 事件源。RocketMQ触发器固定为ctyun.faas.trigger.rocketmq。 specversion string 1.0 CloudEvents协议版本。 type string rocketmq:topic:sendmessage 事件类型。 datacontenttype string application/json 参数data的内容形式。 subject string mqfunchckzeddbxjrockettest:testforfaas 事件主体。 time string 20250522T02:04:16Z 消息被触发的时间。 data object RocketMQ触发器独有消息格式,详细参见下文RawData描述。 RawData格式 是CloudEvent格式的子集,只包含原始rabbitmq消息的信息,消息结构相当于CloudEvent的data字段,具体如下: plaintext [ { "topic": "testforfaas", "properties": { "CLUSTER": "1dafcb4049ba42df96d80b7dd2f99c5e", "CONSUMESTARTTIME": "1747987204637", "KEYS": "webtest", "MAXOFFSET": "3", "MINOFFSET": "0", "TAGS": "17479872046050", "UNIQKEY": "2100077510A605EF04B574BAE2080001" }, "data": "WebTestTools17v6fg0J" } ] 参数 类型 示例 描述 topic string testforfaas Topic名称。 properties map 消息自定义属性。 properties.CLUSTER string 1dafcb4049ba42df96d80b7dd2f99c5e RocketMQ实例ID。 properties.CONSUMESTARTTIME string 1747987204637 Unix时间戳,毫秒。 properties.KEYS string webtest 消息的key。 properties.MAXOFFSET string 3 消息队列中的最大偏移量。 properties.MINOFFSET string 0 消息队列中的最小偏移量。 properties.TAGS string 17479872046050 消息标签。 properties.UNIQKEY string 2100077510A605EF04B574BAE2080001 消息唯一键。 data string WebTestTools17v6fg0J 消息体内容。
        来自:
        帮助文档
        函数计算
        用户指南
        云工作流
        控制台操作
        工作流调度
        RocketMQ触发器
      • 代码示例
        本节介绍了RabbitMQ接入的代码示例。 安全接入点(PLAIN、AMQPLAIN授权机制) java import com.rabbitmq.client.; import java.io.IOException; public class RabbitmqAmqpDemo { public static void main(String[] args) throws Exception { String host "192.168.0.0"; //安全接入点ip Integer port 5672; //安全接入点port String username "xxx"; //集群管理用户列表的用户名 String password "xxx"; //集群管理用户列表的密码 String vhost "/"; String exchangeName "extest"; String queueName "qutest"; ConnectionFactory connectionFactory new ConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); Connection connection connectionFactory.newConnection(); Channel channel connection.createChannel(); channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, "test"); String message "Hello Aop"; for (int i 0; i < 10; i++) { channel.basicPublish(exchangeName, "test", null, message.getBytes()); System.out.println("消息发送成功"); } Channel consumeChannel connection.createChannel(); Consumer consumer new DefaultConsumer(consumeChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String messageGet new String(body, "UTF8"); if (messageGet.equals(message)) { System.out.println("消息消费成功"); } } }; consumeChannel.setDefaultConsumer(consumer); consumeChannel.basicConsume(queueName, false, consumer); Thread.sleep(10000); } } SSL接入点(EXTERNAL授权机制) java import com.rabbitmq.client.; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import java.io.FileInputStream; import java.io.IOException; import java.security.KeyStore; public class RabbitmqExternalDemo { public static void main(String[] args) throws Exception { String host "192.168.0.0"; //SSL接入点ip int port 5671; //SSL接入点port //以下2个ssl文件可通过控制台获取安装包, 具体的获取方式可以查看2.2.1接入步骤的第二小节 String ksFile "D:tmpsslclientrabbitmqkey.p12"; String tksFile "D:tmpssltruststore"; String vhost "/"; String exchangeName "extest"; String queueName "qutest"; char[] keyPassphrase "W3zT98Zz9Io".toCharArray(); KeyStore ks KeyStore.getInstance("PKCS12"); ks.load(new FileInputStream(ksFile), keyPassphrase); KeyManagerFactory kmf KeyManagerFactory.getInstance("SunX509"); kmf.init(ks, keyPassphrase); char[] trustPassphrase null; trustPassphrase "W3zT98Zz9Io".toCharArray(); KeyStore tks KeyStore.getInstance("JKS"); tks.load(new FileInputStream(tksFile), trustPassphrase); TrustManagerFactory tmf TrustManagerFactory.getInstance("SunX509"); tmf.init(tks); SSLContext c SSLContext.getInstance("tlsv1.2"); c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); ConnectionFactory connectionFactory new ConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setVirtualHost(vhost); connectionFactory.setSaslConfig(DefaultSaslConfig.EXTERNAL); connectionFactory.useSslProtocol(c); Connection connection connectionFactory.newConnection(); Channel channel connection.createChannel(); channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, "test"); String message "Hello Aop"; for (int i 0; i < 10; i++) { channel.basicPublish(exchangeName, "test", null, message.getBytes()); System.out.println("消息发送成功"); } Channel consumeChannel connection.createChannel(); Consumer consumer new DefaultConsumer(consumeChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String messageGet new String(body, "UTF8"); if (messageGet.equals(message)) { System.out.println("消息消费成功"); } } }; consumeChannel.setDefaultConsumer(consumer); consumeChannel.basicConsume(queueName, false, consumer); Thread.sleep(10000); } }
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        最佳实践
        RabbitMQ接入
        代码示例
      • 1
      • ...
      • 3
      • 4
      • 5
      • 6
      • 7
      • ...
      • 531
      跳转至
      推荐热词
      天翼云运维管理审计系统天翼云云服务平台云服务备份云日志服务应用运维管理云手机云电脑天翼云云hbase数据库电信云大数据saas服务电信云大数据paas服务轻量型云主机天翼云客户服务电话应用编排服务天翼云云安全解决方案云服务总线CSB天翼云服务器配置天翼云联邦学习产品天翼云云安全天翼云企业上云解决方案天翼云产品天翼云视频云存储

      天翼云最新活动

      安全隔离版OpenClaw

      OpenClaw云服务器专属“龙虾“套餐低至1.5折起

      天翼云新春焕新季

      云主机开年特惠28.8元/年,0元秒杀等你来抢!

      云上钜惠

      爆款云主机全场特惠,2核4G只要1.8折起!

      中小企业服务商合作专区

      国家云助力中小企业腾飞,高额上云补贴重磅上线

      出海产品促销专区

      爆款云主机低至2折,高性价比,不限新老速来抢购!

      天翼云奖励推广计划

      加入成为云推官,推荐新用户注册下单得现金奖励

      产品推荐

      物理机 DPS

      多活容灾服务

      GPU云主机

      轻量型云主机

      弹性伸缩服务 AS

      弹性高性能计算 E-HPC

      天翼云CTyunOS系统

      训推服务

      AI Store

      推荐文档

      域名实名认证

      创建云间高速

      云间高速的优势

      分布式消息服务的功能

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 天翼云国际站
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2026 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号