云主机开年特惠28.8元/年,0元秒杀等你来抢!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 安全隔离版OpenClaw NEW OpenClaw云服务器专属“龙虾“套餐低至1.5折起
  • 天翼云新春焕新季 NEW 云主机开年特惠28.8元/年,0元秒杀等你来抢!
  • 云上钜惠 爆款云主机全场特惠,2核4G只要1.8折起!
  • 中小企业服务商合作专区 国家云助力中小企业腾飞,高额上云补贴重磅上线
  • 出海产品促销专区 NEW 爆款云主机低至2折,高性价比,不限新老速来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

息壤智算

领先开放的智算服务平台,提供算力、平台、数据、模型、应用“五位一体”智算服务体系,构建全流程的AI基础设施能力
AI Store
  • 算力市场
  • 模型市场
  • 应用市场
  • MCP市场
公共算力服务
  • 裸金属
  • 定制裸金属
训推服务
  • 模型开发
  • 训练任务
  • 服务部署
模型推理服务
  • 模型广场
  • 体验中心
  • 服务接入
应用托管
  • 应用实例
科研助手
  • 科研智能体
  • 科研服务
  • 开发机
  • 并行计算
大模型
  • DeepSeek-V3.1
  • DeepSeek-R1-0528
  • DeepSeek-V3-0324
  • Qwen3-235B-A22B
  • Qwen3-32B
智算一体机
  • 智算一体机
模型适配专家服务
  • 模型适配专家服务
算力服务商
  • 入驻算力服务商

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场进入AI Store创新解决方案公有云生态专区智云上海应用生态专区
建站工具
  • 新域名服务
  • SSL证书
  • 翼建站
企业办公
  • 安全邮箱
  • WPS 365 天翼云版
  • 天翼企业云盘(标准服务版)
灾备迁移
  • 云管家2.0
  • 翼备份(SaaS版)

定价

协助您快速了解云产品计费模式、价格详情,轻松预估上云成本
价格计算器
  • 动态测算产品价格
定价策略
  • 快速了解计费模式

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼信创云专区
  • 信创云专区
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
培训与认证
  • 天翼云学堂
  • 天翼云认证
开源社区
  • 魔乐社区
  • OpenTeleDB

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 服务保障
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家
我要反馈
  • 建议与反馈
  • 用户体验官
信息公告
  • 客户公告

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 息壤智算
  • 产品
  • 解决方案
  • 应用商城
  • 定价
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      消息队列RabbitMQ版_相关内容
      • 自动续订
        本文主要介绍了自动续订规则及操作流程。 为避免由于未及时对资源采取续订操作,资源被到期冻结或超期释放,客户购买包月包年产品后,可设置开通自动续订。开通自动续订后,系统将在资源到期前自动续订,无需客户再手动操作。 适用范围 自动续订仅针对采用包月、包年计费模式的资源。 已到期资源不支持设置/修改自动续订。 自动续订仅适于 付费用户 ,不适用于非付费用户。 目前支持设置自动续订的产品有:弹性云主机、GPU云主机、物理机、云桌面、云硬盘、对象存储经典版、云主机备份、专属云计算独享型、专属云存储独享型、弹性IP、共享带宽、天翼云SDWAN、云间高速、关系数据库MySQL版、关系数据库Postgre SQL版、分布式关系数据库、分布式缓存服务Redis版、分布式缓存服务Memcache、文档数据库服务、云HBASE数据库、分布式消息服务RocketMQ、分布式消息服务RabbitMQ、分布式消息服务Kafka、Web应用防火墙、服务器安全卫士、域名无忧、DDos高防IP、云解析、登录保护、网站安全监测、内容安全。 单次最多支持20个资源实例批量续订。 开通、变更、关闭自动续订 用户在续订管理页可开通自动续订功能,变更自动续约周期,或关闭自动续订。 不关闭自动续订的情况下,只要预付费账户余额充足,或为后付费客户,系统将持续按设定的周期自动续订下去。 预付费用户可在官网自主控制自动续订功能的开通、变更、关闭。后付费用户需要客户经理协助开启自动续订权限后才可以自主管理。
        来自:
        帮助文档
        费用中心
        订单管理
        续订管理
        自动续订
      • 产品定义
        本文主要介绍 产品定义。 Kafka是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用。 分布式消息服务Kafka是一款基于开源社区版Kafka提供的消息队列服务,向用户提供计算、存储和带宽资源独占式的Kafka专享实例。使用分布式消息服务Kafka,资源按需申请,即买即用,您将有更多精力专注于业务快速开发,不用考虑部署和运维。 关于Kafka的帮助手册阅读指引 受限于篇幅,我们提供的Kafka帮助手册重点描述产品相关的内容,以及与开源社区版Kafka的差异部分,例如Kafka的产品规格、控制台操作、客户端对接等。 如果您需要了解Kafka入门知识或消息生产、消费等方面的技术细节,请查阅Kafka官网资料。
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        产品定义
      • 消息幂等
        如果消息重复消费会影响您的业务处理,要对消息做幂等处理。本文介绍消息幂等的概念、适用场景以及处理方法。 概念 在消息领域,幂等是指Consumer重复消费某条消息时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响。 例如,在支付场景下,Consumer消费扣款消息,对一笔订单执行扣款操作,扣款金额为500元。如果因网络不稳定等原因导致扣款消息重复投递,Consumer重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费500元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消息幂等。 适用场景 在互联网应用中,尤其在网络不稳定的情况下,分布式消息服务RabbitMQ的消息有可能会出现重复。如果消息重复消费会影响您的业务处理,请对消息做幂等处理。消息重复的可能原因如下: 发送时消息重复 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时Producer意识到消息发送失败并尝试再次发送消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息。 投递时消息重复 消息消费的场景下,消息已投递到Consumer并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,分布式消息服务RabbitMQ的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息。 负载均衡时消息重复(包括但不限于网络抖动、服务端重启以及Consumer应用重启) 当分布式消息服务RabbitMQ的服务端或客户端重启、扩容或缩容时,会触发Rebalance,此时Consumer可能会收到重复消息。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        最佳实践
        消息幂等
      • Kafka触发器
        Kafka触发器可以订阅天翼云提供的分布式消息队列Kafka实例,并根据消息触发关联的函数,借此能力,使得函数可以消费指定Topic的消息,执行自定义处理逻辑。 注意事项 Kafka触发器订阅的Kafka实例必须和函数计算的函数实例在相同地域。 前提条件 已创建函数。 已开通分布式消息Kafka实例(KAFKA引擎版),详情请参考创建分布式消息服务Kafka实例。 已创建Topic,创建GroupID(可选)。 操作步骤 1. 登录函数计算控制台,点击目标函数,进入函数详情。 2. 选择详情下顶部的配置选项卡。 3. 在配置选项卡 中,选择左边的触发器选项卡。 4. 点击创建触发器 ,在弹出的右抽屉中选择Kafka触发器,配置参数解释如下。 配置项 操作 示例 触发器类型 选择Kafka触发器。 Kafka触发器 名称 填写自定义的触发器名称。 kafkatrigger 版本或别名 默认值为LATEST,支持选择任意函数版本或函数别名。 LATEST Kafka 实例 选择已创建的Kafka实例。 Topic 选择已创建的Kafka实例的Topic。 Group ID 快速创建:推荐方案。自动创建以 GROUPFCTrigger{triggername}{uuid} 命名的 Group ID。 使用已有:选择Kafka实例已有的GroupID,请您注意不要与已有的业务混用GroupID,否则会影响已有的消息收发。 消费任务并发数 消费者的并发数量,有效取值范围为[1,20],建议不超过Topic的分区数。该值同时影响投递到函数的并发数。 消费位点 选择消息的消费位点,即触发器从kafka消息队列开始拉取消息的位置。 最早位点 :从最早位点开始消费。 最新位点:从最新位点开始消费。 最新位点 调用方式 选择函数调用方式。 同步调用 :指触发器消费topic消息后投递到函数是同步调用,会等待函数响应后继续下一个消息投递。但消费任务并发数大于1时,多个消费者有可能会并发消费消息并投递,并发的情况视Topic队列本身积存的消息而定。 异步调用:指触发器消费topic消息后投递到函数是异步调用,不会等待函数响应,可以快速消费事件。 同步调用 触发器启用状态 创建触发器后是否立即启用。默认选择开启,即创建触发器后立即启用触发器。 启用 推送配置 批量推送条数:批量推送的最大值,积压值达到后立刻推送,取值范围为 [1, 10000]。 批量推送间隔:批量推送的最大时间间隔,达到后立刻推送,单位秒,取值[0,15]。默认0无需等待,数据直接推送。 推送格式:函数收到的事件格式,详情请查阅触发器事件消息格式。 重试策略 消息推送函数失败后重试的策略,共两种: 指数退避:指数退避重试,重试5次,重试周期为2,4,8,16,32(秒)。 线性退避:线性退避重试,重试5次,重试周期为1,2,3,4,5(秒)。 容错策略 当重试次数耗尽后仍然失败时的处理方式: 允许容错:当异常发生并超过重试策略配置时直接丢弃。 禁止容错:当异常发生并超过重试策略配置时继续阻塞执行。 死信队列 当容错策略为:允许容错时,可以额外开启死信队列。当开启死信队列时且异常发生并超过重试策略配置时,消息会被投递到指定的消息队列里,当前只支持投递到kafka和rocketmq
        来自:
        帮助文档
        函数计算
        用户指南
        事件触发
        Kafka触发器
      • 产品规格
        本节介绍分布式消息服务RocketMQ相关的产品规格,以便您正确理解和使用。 (1)以下适用于华东1、华北2、西南1、华南2、上海36、青岛20、长沙42、南昌5、武汉41、杭州7、西南2贵州、太原4、郑州5、西安7、呼和浩特3 节点。 注意 通用型规格已调整为白名单特性,如需了解该规格参数请联系技术支持。 ctgmq引擎已调整为白名单特性,如需了解该引擎请联系技术支持。 单机版实例面向用户体验和业务测试场景,无法保证性能和高可用。如果需要在生产环境使用RocketMQ实例,建议购买集群版实例。 天翼云分布式消息服务RocketMQ版产品规格由以下五个维度定义: 资源规格:定义使用的弹性云服务器的规格类型。 代理个数:即Broker数量,定义实例的规模,天翼云分布式消息RocketMQ每个Broker由一个Master节点(主节点)和一个Slave节点(备节点)组成,详细见产品架构。 存储容量:定义单个代理可以保存的存储容量。 单个代理TPS:定义单个代理的TPS性能。 单个代理消费组数上限:定义单个代理可以创建的消费组数量。 天翼云分布式消息服务RocketMQ支持的产品规格如下所示: 说明 TPS(Transaction per second)是指每秒可以生产消息和消费消息的总次数,可以理解为对应规格每秒生产消息和消费消息的总吞吐量。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        产品简介
        产品规格
      • 查看云审计日志
        本章节主要介绍如何查看分布式消息服务RabbitMQ的云审计日志。 查看RabbitMQ云审计日志,请参考《云审计服务 用户指南》的“查看追踪事件”章节。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        云审计服务支持的关键操作
        查看云审计日志
      • 产品优势
        分布式消息服务Kafka版完全兼容开源社区版本,旨在为用户提供便捷高效的消息队列。业务无需改动即可快速迁移上云,为您节省维护和使用成本。 一键式部署,免去集群搭建烦恼 专享实例只需要在实例管理界面选好规格配置,提交订单。后台将自动创建部署完成一整套Kafka实例。 兼容开源,业务零改动迁移上云 兼容社区版Kafka的API,具备原生Kafka的所有消息处理特性。 业务系统基于开源的Kafka进行开发,只需加入少量认证安全配置,即可使用分布式消息服务Kafka,做到无缝迁移。 说明:Kafka专享实例兼容开源社区Kafka 1.1.0和2.3.0版本。在客户端使用上,推荐使用和服务端版本一致的版本。 安全保证 独有的安全加固体系,提供业务操作云端审计,消息存储加密等有效安全措施。 在网络通信方面,除了提供SASL认证,还借助虚拟私有云(VPC)和安全组等加强网络访问控制。 数据高可靠 Kafka专享实例支持消息持久化,多副本存储机制。副本间消息同步、异步复制,数据同步或异步落盘多种方式供您自由选择。 集群架构与跨AZ部署,服务高可用 Kafka后台为多集群部署,支持故障自动迁移和容错,保证业务的可靠运行。 Kafka专享实例支持跨AZ部署,节点部署在不同的AZ,进一步保障服务高可用。 无忧运维 公有云提供一整套完整的监控告警等运维服务,故障自动发现和告警,避免724小时人工值守。Kafka专享实例自动上报相关监控指标,如分区数、主题数、堆积消息数等,并支持配置监控数据发送规则,您可以在第一时间通过短信、邮件等获得业务消息队列的运行使用和负载状态。 海量消息堆积与弹性扩容 内建的分布式集群技术,使得服务具有高度扩展性。分区数可配置多达100个,存储空间弹性扩展,保证在高并发、高性能和大规模场景下的访问能力,轻松实现百亿级消息的堆积和访问能力。 多规格灵活选择 Kafka专享实例的带宽与存储资源可灵活配置,并且自定义Topic的分区数。
        来自:
        帮助文档
        专属云分布式消息服务Kafka
        产品简介
        产品优势
      • 创建消息通知主题
        本章节主要介绍创建消息通知主题。 操作场景 确定创建消息通知主题后,您可在消息通知服务的“主题管理”页面中,对对应的主题“添加订阅”,选择不同方式(例如短信或者邮件等)进行订阅;订阅成功后,若作业失败,则系统将会自动发送消息到您指定的订阅终端。 操作步骤 1.在“资源管理 > 队列管理”页面,单击左上角“创建消息通知主题”。 2.选择队列,单击“确定”。 说明 选择队列时,可以选择单个队列,也可以选择所有队列。 如果单个队列和所有队列的终端不一致,当选择了单个队列,同时选择了所有队列进行订阅时,在所有队列的消息通知中将不包含该队列的消息。 创建消息通知主题后,只有在订阅队列上创建的Spark作业失败时才会收到消息通知。 3.单击“主题管理”,跳转至消息通知服务“主题管理”页面。 4.在对应主题的“操作”栏中,单击“添加订阅”,选择“协议”,确定订阅方式。 5.通过单击邮件中的链接确认后,将收到“订阅成功”的信息。 6.在消息通知服务的“订阅”页面,对应的订阅状态为“已确认”,表示订阅成功。
        来自:
        帮助文档
        数据湖探索
        用户指南
        队列管理
        创建消息通知主题
      • 心跳检测
        本章节主要介绍分布式消息服务RabbitMQ的心跳检测特性。 RabbitMQ实例提供了心跳功能,以确保应用程序层及时发现中断的连接和完全无响应的对端。心跳还可以防止某些网络设备在一段时间内由于没有活动而中断TCP连接。 心跳超时时间 心跳超时时间定义了对等TCP连接在多长时间后被服务端和客户端视为关闭。 在RabbitMQ服务端和客户端分别设置心跳超时时间,服务端和客户端会对配置的心跳超时时间进行协商,客户端必须配置该值来发送心跳。RabbitMQ官方团队维护的3个客户端(Java、.NET、Erlang语言)的心跳超时时间协商逻辑如下: 服务端和客户端设置的心跳超时时间都不为0时,两者间较小的值生效。 服务端和客户端任意一端设置的心跳超时时间为0,另一端不为0时,非0的值生效。 服务端和客户端的心跳超时时间都设置为0时,表示禁用心跳。 更多关于心跳检测的说明,请参考Detecting Dead TCP Connections with Heartbeats and TCP Keepalives。 心跳帧 心跳帧发送时间间隔为心跳超时时间/2,该值有时也被称为心跳间隔。客户端在两次错过心跳后,会被认为是不可达的。不同的客户端会以不同的方式显示这一点,但TCP连接将被关闭。当客户端检测到服务端由于心跳而无法访问时,需要重新连接。 任何流量(如协议操作、消息发布、消息确认、心跳帧等)都会被认为是有效的心跳。如果连接上有其他流量,客户端可以选择发送心跳帧,也可以选择不发送。如果连接上没有其他流量,客户端必须发送心跳帧。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        高级特性
        心跳检测
      • 基于消息队列RocketMQ实现全链路灰度
        本章节介绍如何基于消息队列RocketMQ实现全链路灰度 概述 本文介绍在使用消息队列(RocketMQ)这种异步场景下,可以在不修改业务代码的情况下,实现异步场景的灰度,从而实现全链路灰度。本文介绍基于消息队列RocketMQ实现全链路灰度。 背景介绍 在大多数业务场景中对于消息的灰度并没有RPC调用那么严格,但是当全链路灰度调用中涉及到消息消费时,如果消息消费没有按照全链路流量规则路由,则会导致通过消息产生的流量逃逸,从而破坏全链路规则,导致出现一些不符合预期的情况。 如下图所示,本文分别部署网关、appa、appagray、appb、appbgray、appc、appcgray以及RocketMQ,模拟一个真实的全链路灰度场景。 通过网关调用appa应用的接口,当满足路由规则后,灰度流量会被路由到appagray,appagray又会调用appbgray,随后由appbgray发送灰度消息,appcgray将会收到灰度消息,而appc不会收到灰度消息。 前提条件 1. 用户已开通微服务治理中心企业版。 2. 用户已开通云容器引擎。 3. 用户已部署RocketMQ,且RocketMQ版本在4.5.0以上,broker.conf中已配置enablePropertyFiltertrue。 部署Demo应用 准备自建入口网关msgczuul,准备应用msgcappa,msgcappb和msgcappc。调用过程是msgcappa –> msgcappb > msgcappc。 步骤1:在云容器引擎中安装微服务治理插件: 1. 登录“云容器引擎”控制台。 2. 在左侧菜单栏选择“集群”,点击目标集群。 3. 在集群管理页面点击“插件”“插件市场”,选择“cubems”插件安装。 步骤2:为应用开启微服务治理能力: 1. 登录“云容器引擎”控制台。 2. 左侧菜单栏选择“集群”,点击目标集群。 3. 在集群管理页面点击“工作负载”“无状态”,选择目标命名空间。 4. 在Deployment列表页选择指定Deployment,并点击“全量替换”,进入Deployment编辑页。 5. 在Deployment编辑页点击“显示高级设置”,新增“Pod标签”: mseCubeMsAutoEnable:on。 6. 在发布应用时,配置指定环境变量,可指定注入微服务治理中心的应用名、命名空间和标签等信息。 环境变量配置如下: 环境变量名 环境变量值 MSEAPPNAME 接入到微服务治理中心的应用名。 MSESERVICETAG 应用标签信息,如灰度应用可配置gray。 MSENAMESPACE(选填) 接入到微服务治理中心的命名空间,默认为:default。 7. 完成编辑后点击“提交”,重新发布容器即可接入。 appa应用的配置 基线: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "appa" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "appa" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "appa" spec: containers: env: name: "MSEAPPNAME" value: "appa" image: "镜像仓库域名/xxx/appa:latest" imagePullPolicy: "Always" name: "appa" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi" 灰度: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "appa" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "appa" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "appa" spec: containers: env: name: "MSEAPPNAME" value: "appa" name: "MSESERVICETAG" value: "gray" image: "镜像仓库域名/xxx/appa:latest" imagePullPolicy: "Always" name: "appa" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi" appb应用的配置 基线: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "appb" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "appb" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "appb" spec: containers: env: name: "MSEAPPNAME" value: "appb" image: "镜像仓库域名/xxx/appb:latest" imagePullPolicy: "Always" name: "appb" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi" 灰度: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "appb" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "appb" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "appb" spec: containers: env: name: "MSEAPPNAME" value: "appb" name: "MSESERVICETAG" value: "gray" image: "镜像仓库域名/xxx/appb:latest" imagePullPolicy: "Always" name: "appb" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi" appc应用的配置 基线: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "appc" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "appc" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "appc" spec: containers: env: name: "MSEAPPNAME" value: "appc" image: "镜像仓库域名/xxx/appc:latest" imagePullPolicy: "Always" name: "appc" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi" 灰度: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "appc" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "appc" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "appc" spec: containers: env: name: "MSEAPPNAME" value: "appc" name: "MSESERVICETAG" value: "gray" image: "镜像仓库域名/xxx/appc:latest" imagePullPolicy: "Always" name: "appc" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi" zuul应用的配置: apiVersion: "apps/v1" kind: "Deployment" metadata: name: "zuul" namespace: "default" spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: name: "zuul" template: metadata: labels: mseCubeMsAutoEnable: "on" name: "zuul" spec: containers: env: name: "MSEAPPNAME" value: "zuul" image: "镜像仓库域名/xxx/zuul:latest" imagePullPolicy: "Always" name: "zuul" ports: containerPort: 26160 livenessProbe: tcpSocket: port: 26160 initialDelaySeconds: 10 periodSeconds: 30 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi"
        来自:
        帮助文档
        微服务引擎
        最佳实践
        基于消息队列RocketMQ实现全链路灰度
      • 产品规格
        本文为您介绍分布式消息服务MQTT产品规格。 注意 通用型规格已调整为白名单特性,如需了解该规格参数请联系技术支持。 单机版实例面向用户体验和业务测试场景,无法保证性能和高可用。如果需要在生产环境使用MQTT实例,建议购买集群版实例。 集群版 Intel计算增强型 实例规格 代理个数 单个代理最大连接数 单个代理最大支持消息读写TPS总和 单个连接最大订阅数 mqtt.2u4g.cluster 39 10000 8000 30 mqtt.4u8g.cluster 39 20000 16000 30 mqtt.8u16g.cluster 39 40000 32000 30 mqtt.12u24g.cluster 39 60000 48000 30 mqtt.16u32g.cluster 39 80000 64000 30 单机版 Intel计算增强型 实例规格 代理个数 单个代理最大连接数 单个代理最大支持消息读写TPS总和 单个连接最大订阅数 mqtt.2u4g.single 1 10000 8000 30 mqtt.4u8g.single 1 20000 16000 30 mqtt.8u16g.single 1 40000 32000 30
        来自:
        帮助文档
        分布式消息服务MQTT
        产品介绍
        产品规格
      • 安全分析
        生产者 是用来构建并传输数据到服务端的逻辑概念,负责把数据放入消息队列。 订阅器 用于订阅态势感知(专业版)管道消息,一个管道可由多个订阅器进行订阅,态势感知(专业版)通过订阅器进行消息分发。 消费者 是用来接收并处理数据的运行实体,负责通过订阅器把态势感知(专业版)管道中的消息进行消费并处理。 消息队列 是数据存储和传输的实际容器。 威胁检测模型 是一种被训练的AI智能识别算法模型。能针对特定威胁,自动化的完成数据汇聚、分析和报警,这种检测模式具备较好的泛化能力,防躲避能力强,可在不同业务系统中发挥同等效果,应对复杂的新型攻击。
        来自:
        帮助文档
        态势感知(专业版)(新版)
        产品介绍
        基本概念
        安全分析
      • 使用IAM授权的云服务
        云终端 云服务名称 区域 控制台 OpenAPI 系统策略 云服务名称 区域 支持IAM 支持企业项目 支持IAM 支持企业项目 系统策略 天翼云电脑(政企版) 全局 是 是 是 是 有 天翼云手机 全局 是 是 是 是 有 CDN与边缘 云服务类别 区域 控制台 OpenAPI 系统策略 云服务类别 区域 支持IAM 支持企业项目 支持IAM 支持企业项目 系统策略 边缘安全加速平台 全局 是 是 是 是 有 容器与中间件 云服务名称 区域 控制台 OpenAPI 系统策略 云服务名称 区域 支持IAM 支持企业项目 支持IAM 支持企业项目 系统策略 分布式消息服务Kafka 全局 是 是 是 是 有 分布式消息服务RabbitMQ 全局 是 是 是 是 有 分布式消息服务RocketMQ 全局 是 是 是 是 有 微服务云应用平台 全局 是 是 是 是 有 云容器引擎 全局 是 是 是 是 有 弹性容器实例 全局 是 是 是 是 有 函数计算 全局 是 是 是 是 有 云日志服务LTS 全局 是 是 是 是 有 容器镜像服务 全局 是 是 是 是 有 分布式消息服务MQTT 全局 是 是 是 是 有 应用服务网格 全局 是 是 是 是 有 微服务引擎 全局 是 是 是 是 有
        来自:
        帮助文档
        统一身份认证(一类节点)
        产品概述
        使用IAM授权的云服务
      • 查看客户端连接地址
        本章节主要介绍如何查看客户端连接地址。 分布式消息服务RabbitMQ支持通过Web界面查看客户端连接地址。 说明 客户端处于连接RabbitMQ实例时,才可以查看客户端连接地址。 操作步骤 步骤 1 登录RabbitMQ Web界面。 步骤 2 在导航栏单击“Connections”,进入“Connections”页面。 步骤 3 查看客户端连接地址,如图1所示。 图1 客户端连接地址 同一个客户端可以作为生产者生产消息,也可以作为消费者消费消息,连接IP地址是相同的,如图1,此时我们无法区分哪个是生产者IP地址,哪个是消费者IP地址。如果想要直观体现生产者/消费者IP地址,您可以在客户端中设置“clientProperties”参数,通过此参数来标明生产者/消费者IP地址,示例如下。 //配置客户端连接参数 HashMap clientProperties new HashMap<>(); clientProperties.put("connectionname", "producer"); connectionFactory.setClientProperties(clientProperties); //创建连接 Connection connection connectionFactory.newConnection(); 设置“clientProperties”参数后,连接地址显示如图2所示。 图2 客户端连接地址(分区生产者/消费者IP地址)
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        连接实例
        查看客户端连接地址
      • 普通集成
        产品分类 产品名称 存储 混合存储网关、云硬盘、弹性文件服务、媒体存储、云备份、云硬盘备份、对象存储、云主机备份、海量文件服务 OceanFS、云容灾、对象存储(经典版)I型、并行文件 网络与CDN 弹性IP、天翼云SDWAN、NAT网关、VPN连接、云间高速、虚拟私有云、弹性负载均衡、共享流量包、智能边缘云、云专线CDA、应用加速、边缘安全加速平台、CDN加速、智能DNS、VPC终端节点、全站加速、科研助手 计算 弹性云主机、天翼云电脑(政企版)、函数计算、弹性高性能计算、镜像服务、弹性伸缩服务、物理机、云骁智算平台、数据加密 数据库 关系数据库SQL Server版、关系数据库PostgreSQL版、分布式关系型数据库、关系数据库MySQL版、数据传输服务、文档数据库服务、数据管理服务、分布式缓存服务Redis版、时序数据库Influx版、分布式融合数据库HTAP、云数据库ClickHouse 监控与管理 云日志服务、应用性能监控 应用服务 API网关、EasyCoding敏捷开发平台、软件开发生产线CodeArts 数据库 关系型数据库MySQL版(CTRDS)、关系型数据库PostgreSQL版、关系型数据库SQL Server版、云数据库GaussDB、分布式关系型数据库、分布式融合数据库HTAP、文档数据库服务、时序数据库Influx版、分布式缓存服务Redis版 人工智能 AI能力开放平台、慧聚一站式智算服务平台 大数据 云搜索服务、大数据管理平台 DataWings、翼MapReduce 容器与中间件 应用服务网格、弹性容器实例、分布式容器云平台、容器镜像服务、分布式消息服务RabbitMQ、分布式消息服务RocketMQ、微服务应用平台MSAP、云容器引擎、分布式消息服务Kafka、微服务引擎注册配置中心、微服务引擎MSE、应用性能监控apm、分布式容器云平台CCSE ONE 安全及管理 云防火墙(原生版)、统一身份认证、密钥管理、服务器安全卫士(原生版)、云监控、Web应用防火墙(原生版)、证书管理服务、DDoS高防(边缘云版)、云堡垒机、Web应用防火墙(边缘云版)、云审计 企业应用 云通信短信 视频 视频直播、智能视图服务、云点播 专属云 专属云(计算独享型) 价格 账务 其他 数据库审计、云日志服务、轻量型云主机、云原生网关、API网关
        来自:
        帮助文档
        函数计算
        用户指南
        云工作流
        工作流集成
        普通集成
      • 收发定时/延时消息
        分布式消息服务RocketMQ版支持任意时间的定时消息,最大推迟时间可达到40天。 定时消息即生产者生产消息到分布式消息服务RocketMQ版后,消息不会立即被消费,而是延迟到设定的时间点后才会发送给消费者进行消费。 收发消息前,请参考收集连接信息收集RocketMQ所需的连接信息。 准备环境 1. 在命令行输入python,检查是否已安装Python。得到如下回显,说明Python已安装。 PS C:> python Python 3.9.9 (tags/v3.9.9:ccb0e6a, Nov 15 2021, 18:08:50) [MSC v.1929 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information. 如果未安装Python,请使用以下命令安装: pip install rocketmqclientpython 2. 安装librocketmq库和rocketmqclientpython。 说明 建议下载rocketmqclientcpp2.2.0,获取librocketmq库。 3. 将librocketmq.so添加到系统动态库搜索路径。 1. 查找librocketmq.so的路径。 find / name librocketmq.so 2. 将librocketmq.so添加到系统动态库搜索路径。 ln s /查找到的librocketmq.so路径/librocketmq.so /usr/lib sudo ldconfig 以下示例代码中的参数说明如下,请参考收集连接信息获取参数值。 GROUP:表示消费组名称。 ENDPOINT:表示实例连接地址和端口。 TOPIC:表示Topic名称。 发送消息 参考如下示例代码。 import datetime from rocketmq.client import Producer, Message from rocketmq.exceptions import RocketMQException endpoint "${ENDPOINT}" 填写分布式消息服务RocketMQ控制台Namesrv接入点 accesskey "${ACCESSKEY}" 填写AccessKey 在分布式消息服务RocketMQ控制台用户管理菜单中创建的用户ID accesssecret "${SECRETKEY}" 填写SecretKey 在分布式消息服务RocketMQ控制台用户管理菜单中创建的用户密钥 topic "${TOPIC}" 填写Topic,在管理控制台创建 producergroup "${GROUP}" 生产者组group 初始化生产者 producer Producer(producergroup) producer.setnamesrvaddr(endpoint) producer.setsessioncredentials(accesskey, accesssecret, "") 启动生产者 try: producer.start() except RocketMQException as e: print('start producer error:', e) exit(1) msg Message(topic) msg.setbody("Hello RocketMQ") delaytime 10 发送任意延迟消息,时间单位为毫秒,如下所示:消息将在10s后投递 delaytimestamp int((datetime.datetime.now() + datetime.timedelta(secondsdelaytime)).timestamp() 1000) msg.setproperty('STARTDELIVERTIME', str(delaytimestamp)) 发送消息 try: result producer.sendsync(msg) print('send result:', result) except RocketMQException as e: print('send message error:', e) producer.shutdown() exit(1) 关闭生产者实例,释放资源 producer.shutdown()
        来自:
        帮助文档
        分布式消息服务RocketMQ
        开发指南
        Python
        收发定时/延时消息
      • 监控告警类
        本章节主要介绍监控告警类问题。 云监控无法展示RabbitMQ监控数据 监控数据无法展示,可能原因:队列名称开头包含特殊字符,例如点号“.”、下划线“”,建议删除带特殊字符的队列。 云监控显示通道数一直上升报警有影响吗? 一个连接最大通道数是2047,超过后再创建通道数会失败,建议排查是否为资源没有释放导致的。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        常见问题
        监控告警类
      • 节点重启后消费者如何重连
        本文介绍节点重启后消费者如何重连,以Java中使用的RabbitMQ客户端amqpclient为例。 amqpclient自带重连机制,但是自带的重连机制只会重试一次,一次连不上后就不会再执行了,这时如果消费者没有做额外的重试机制,那么这个消费者就彻底丧失的消费能力。 amqpclient在节点断连后,根据与通道建立的节点不同,产生不同的错误。 如果通道连接的是队列所在的节点,消费者就会收到一个shutdown信号,这时amqpclient的重连机制就会生效,尝试重新连接服务端。如果连上了,这个通道就会继续连接消费。如果连不上,就会执行channel.close方法,关闭这个通道。 如果通道连接的不是队列所在的节点,消费者不会触发关闭动作,而是由服务端发送的一个取消动作,这个动作对amqpclient来说并不是异常行为,所以日志上不会有明显的报错,但是连接最终还是会关闭。 amqpclient出现上面两种错误时,会分别回调handleShutdownSignal以及handleCancel方法,您可以通过重写这两种方法,在回调时执行重写的重连逻辑,就能在通道关闭后重新创建消费者的新通道继续消费。 以下提供一个简单的代码示例,能够解决上面的两种错误,实现消费者的持续消费。 import com.rabbitmq.client.; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class MyRabbitConsumer { public static void main(String... args) throws IOException, TimeoutException { ConnectionFactory factory new ConnectionFactory(); factory.setHost("192.168.x.x"); factory.setPort(5672); factory.setUsername("name"); factory.setPassword("password"); Connection connection factory.newConnection(); createNewConnection(connection); } public static void createNewConnection(Connection connection) { try { Channel channel connection.createChannel(); channel.basicQos(64); channel.basicConsume("queue1", false, new CustomConsumer(channel, connection)); } catch (Exception e) { createNewConnection(connection); } } static class CustomConsumer implements Consumer { private final Channel channel; private final Connection connection; public CustomConsumer(Channel channel, Connection connection) { channel channel; connection connection; } @Override public void handleConsumeOk(String consumerTag) {} @Override public void handleCancelOk(String consumerTag) {} @Override public void handleCancel(String consumerTag) throws IOException { createNewConnection(connection); } @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { createNewConnection(connection); } @Override public void handleRecoverOk(String consumerTag) {} @Override public void handleDelivery(String consumerTag, Envelope env, AMQP.BasicProperties prop, byte[] body) throws IOException { String message new String(body, StandardCharsets.UTF8); System.out.println("收到消息: " + message); channel.basicAck(env.getDeliveryTag(), false); } } }
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        最佳实践
        节点重启后消费者如何重连
      • 事件目标概述
        事件目标是事件规则的一部分,负责消费经事件规则过滤与转换后的事件。 事件目标 事件总线EventBridge支持以下事件目标: 函数计算 分布式消息服务RocketMQ 分布式消息服务RabbitMQ 分布式消息服务Kafka HTTP、HTTPS地址
        来自:
        帮助文档
        事件总线
        用户指南
        事件总线
        事件规则
        事件目标
        事件目标概述
      • 产品定义
        本节主要介绍分布式消息服务Kafka的产品简介 分布式消息服务Kafka 是一个分布式、高吞吐量、高可用的消息队列服务,针对开源的 Kafka 提供全托管服务,解决开源产品长期以来的痛点,用户只需专注于业务开发,无需部署运维,低成本、更弹性、更可靠,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,是大数据生态中不可或缺的产品之一。 关于Kafka的帮助手册阅读指引 考虑到篇幅的限制,我们提供的Kafka用户手册主要描述了产品相关的信息,以及与开源社区版Kafka的差异,如天翼云Kafka的产品规格、控制台操作、API接口调用,以及客户端对接等方面。 如果您需要了解Kafka的基础入门知识或者消息的生产和消费等技术细节,请查阅Kafka官网资料。 产品架构 Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。 Topic:主题 一类消息的集合。 Partition:分区,topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。 Segment:partition物理上由多个segment组成。 offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。 Producer:消息和数据生成者,一般为应用调用API进行消息生产,并向Kafka的Topic发布消息。 Consumer:消息订阅者,也成为消息消费者,负责向 Kafka Broker 读取消息并进行消费。 Consumer Group:一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致,Consumer Group 和 Topic 的关系是 N:N,同一个 Consumer Group 可以订阅多个 Topic,同一个 Topic 也可以被多个 Consumer Group 订阅。 更多信息请参见名词解释。
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        产品定义
      • 旧资费
        说明 分布式消息服务RocketMQ旧资费根据实例规格分为基础版、中级版和高级版,按照不同版本收费。 目前在 上海7、南京3、乌鲁木齐27、北京5、晋中、内蒙6 资源池开放订购。 实例资费 实例类型 标准资费(元/月) 按需标准资费(元/小时) 实例说明 基础版 1546 3.23 TPS:5000条/秒, Topic数上限:50,存储空间:200GB 中级版 2276 4.75 TPS:10000条/秒, Topic数上限:200,存储空间:500GB 高级版 3792 7.90 TPS:20000条/秒, Topic数上限:500,存储空间:1000GB
        来自:
        帮助文档
        分布式消息服务RocketMQ
        计费说明
        产品资费
        旧资费
      • 分布式消息服务RocketMQ事件源
        参数 说明 示例 实例名称 前提条件中已创建的分布式消息服务RocketMQ版实例。 xxx Topic 当前实例中的Topic。 topic1 Group 消费组名。 快速创建 :自动创建以GIDEVENTBRIDGExxx命名的Group ID。 使用已有 :选择当前实例中已创建的Group,请不要与已有业务的Group混用,以免影响已有的消息收发。 group1 消费位点 开始消费的位置。 最新位点 :从最新位点开始消费。 最新位点 Tag 用于过滤消息的Tag值,非必填。 tag1
        来自:
        帮助文档
        事件总线
        用户指南
        事件流
        事件源
        分布式消息服务RocketMQ事件源
      • 收发事务消息
        分布式消息服务RocketMQ版的事务消息支持在业务逻辑与发送消息之间提供事务保证,通过两阶段的方式提供对事务消息的支持,事务消息交互流程如图1所示。 图1 事务消息交互流程 事务消息生产者首先发送半消息,然后执行本地事务。如果执行成功,则发送事务提交,否则发送事务回滚。服务端在一段时间后如果一直收不到提交或回滚,则发起回查,生产者在收到回查后重新发送事务提交或回滚。消息只有在提交之后才投递给消费者,消费者对回滚的消息不可见。 收发事务消息前,请参考收集连接信息收集RocketMQ所需的连接信息。 准备环境 1. 在命令行输入python,检查是否已安装Python。得到如下回显,说明Python已安装。 PS C:> python Python 3.9.9 (tags/v3.9.9:ccb0e6a, Nov 15 2021, 18:08:50) [MSC v.1929 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information. 如果未安装Python,请使用以下命令安装: pip install rocketmqclientpython 2. 安装librocketmq库和rocketmqclientpython。 说明 建议下载rocketmqclientcpp2.2.0,获取librocketmq库。 3. 将librocketmq.so添加到系统动态库搜索路径。 1. 查找librocketmq.so的路径。 find / name librocketmq.so 2. 将librocketmq.so添加到系统动态库搜索路径。 ln s /查找到的librocketmq.so路径/librocketmq.so /usr/lib sudo ldconfig 以下示例代码中的参数说明如下,请参考收集连接信息获取参数值。 GROUP:表示消费组名称。 ENDPOINT:表示实例连接地址和端口。 TOPIC:表示Topic名称。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        开发指南
        Python
        收发事务消息
      • 支持的监控指标
        主题监控指标 表主题支持的监控指标 指标ID 指标名称 指标含义 取值范围 测量对象 监控周期(原始指标) topicbytesinrate 生产流量 该指标用于统计每秒生产的字节数。单位:Byte/s、KB/s、MB/s、GB/s 说明 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 0~500000000 Kafka实例队列 1分钟 topicbytesoutrate 消费流量 该指标用于统计每秒消费的字节数。单位:Byte/s、KB/s、MB/s、GB/s 说明 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 0~500000000 Kafka实例队列 1分钟 topicdatasize 队列数据容量 该指标用于统计队列当前的消息数据大小。单位:Byte、KB、MB、GB、TB、PB 说明 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 0~5000000000000 Kafka实例队列 1分钟 topicmessages 队列消息总数 该指标用于统计队列当前的消息总数。单位:Count 说明 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 ≥ 0 Kafka实例队列 1分钟 topicmessagesinrate 消息生产速率 该指标用于统计每秒生产的消息数量。单位:Count/s 说明 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 0~500000 Kafka实例队列 1分钟 partitionmessages 分区消息数 该指标用于统计分区中当前的消息个数。单位:Count 说明 在“主题”页签,当“监控类型”为“分区监控”时,才包含该指标。 ≥ 0 Kafka实例队列 1分钟 producedmessages 生产消息数 该指标用于统计目前生产的消息总数。单位:Count 说明 在“主题”页签,当“监控类型”为“分区监控”时,才包含该指标。 ≥ 0 Kafka实例队列 1分钟
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        监控
        支持的监控指标
      • RocketMQ PHP SDK
        说明 分布式消息服务RocketMQ兼容了社区版 HTTP SDK,您可以使用社区版 HTTP SDK接入分布式消息服务RocketMQ。 前提条件: 1. 在PHP安装目录下的composer.json文件中加入社区PHP SDK 依赖。 2. 使用Composer安装依赖。 composer install 发送普通消息 client new MQClient( // 填写分布式消息服务RocketMQ控制台HTTP接入点 "${HTTPENDPOINT}", // 填写AccessKey,在管理控制台创建 "${ACCESSKEY}", // 填写SecretKey 在管理控制台创建 "${SECRETKEY}" ); // 所属的 Topic $topic "${TOPIC}"; // Topic所属实例ID,默认实例为空NULL $instanceId "${INSTANCEID}"; $this>producer $this>client>getProducer($instanceId, $topic); } public function run() { try { for ($i 1; $i putProperty("a", $i); // 设置消息KEY $publishMessage>setMessageKey("MessageKey"); $result $this>producer>publishMessage($publishMessage); print "Send mq message success. msgId is:" . $result>getMessageId() . ", bodyMD5 is:" . $result>getMessageBodyMD5() . "n"; } } catch (Exception $e) { printr($e>getMessage() . "n"); } }}$instance new NormalProducerExample();$instance>run();?> 消费普通消息 client new MQClient( // 填写分布式消息服务RocketMQ控制台HTTP接入点 "${HTTPENDPOINT}", // 填写AccessKey,在管理控制台创建 "${ACCESSKEY}", // 填写SecretKey 在管理控制台创建 "${SECRETKEY}" ); // 所属的 Topic $topic "${TOPIC}"; // 您在控制台创建的 Consumer ID(Group ID) $groupId "${GROUPID}"; // Topic所属实例ID,默认实例为空NULL $instanceId "${INSTANCEID}"; $this>consumer $this>client>getConsumer($instanceId, $topic, $groupId, "TagA"); } public function run() { // 在当前线程循环消费消息,建议是多开个几个线程并发消费消息 while (True) { try { // 长轮询消费消息 // 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回 $messages $this>consumer>consumeMessage( 3, // 一次最多消费3条(最多可设置为16条) 3 // 长轮询时间3秒(最多可设置为30秒) ); } catch (MQExceptionMessageResolveException $e) { // 当出现消息Body存在不合法字符,无法解析的时候,会抛出此异常。 // 可以正常解析的消息列表。 $messages $e>getPartialResult()>getMessages(); // 无法正常解析的消息列表。 $failMessages $e>getPartialResult()>getFailResolveMessages(); $receiptHandles array(); foreach ($messages as $message) { // 处理业务逻辑。 $receiptHandles[] $message>getReceiptHandle(); printf("MsgID %sn", $message>getMessageId()); } foreach ($failMessages as $failMessage) { // 处理存在不合法字符,无法解析的消息。 $receiptHandles[] $failMessage>getReceiptHandle(); printf("Fail To Resolve Message. MsgID %sn", $failMessage>getMessageId()); } $this>ackMessages($receiptHandles); continue; } catch (Exception $e) { if ($e instanceof MQExceptionMessageNotExistException) { // 没有消息可以消费,接着轮询 printf("No message, contine long polling!RequestId:%sn", $e>getRequestId()); continue; } printr($e>getMessage() . "n"); sleep(3); continue; } print "consume finish, messages:n"; // 处理业务逻辑 $receiptHandles array(); foreach ($messages as $message) { $receiptHandles[] $message>getReceiptHandle(); printf("MessageID:%s TAG:%s BODY:%s nPublishTime:%d, FirstConsumeTime:%d, nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%sn", $message>getMessageId(), $message>getMessageTag(), $message>getMessageBody(), $message>getPublishTime(), $message>getFirstConsumeTime(), $message>getConsumedTimes(), $message>getNextConsumeTime(), $message>getMessageKey()); printr($message>getProperties()); } // $message>getNextConsumeTime()前若不确认消息消费成功,则消息会重复消费 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样 printr($receiptHandles); try { $this>ackMessages($receiptHandles); } catch (Exception $e) { if ($e instanceof MQExceptionAckMessageException) { // 某些消息的句柄可能超时了会导致确认不成功 printf("Ack Error, RequestId:%sn", $e>getRequestId()); foreach ($e>getAckMessageErrorItems() as $errorItem) { printf("tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%sn", $errorItem>getReceiptHandle(), $errorItem>getErrorCode(), $errorItem>getErrorCode()); } } } print "ack finishn"; } } public function ackMessages($receiptHandles) { try { $this>consumer>ackMessage($receiptHandles); } catch (Exception $e) { if ($e instanceof MQExceptionAckMessageException) { // 某些消息的句柄可能超时,会导致消费确认失败。 printf("Ack Error, RequestId:%sn", $e>getRequestId()); foreach ($e>getAckMessageErrorItems() as $errorItem) { printf("tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%sn", $errorItem>getReceiptHandle(), $errorItem>getErrorCode(), $errorItem>getErrorCode()); } } } }}$instance new ConsumerExample();$instance>run();?>
        来自:
        帮助文档
        分布式消息服务RocketMQ
        SDK参考
        RocketMQ PHP SDK
      • 支持的监控指标
        指标ID 指标名称 指标含义 取值范围 测量对象 监控周期(原始指标) topicproducemsg 消息生产数 Topic一分钟收到的消息数单位:Count >0 RocketMQ实例队列 1分钟 topicconsumemsg 消息消费数 Topic一分钟被消费的消息数单位:Count >0 RocketMQ实例队列 1分钟 topicproducerate 消息生产速率 Topic每秒收到的消息数单位:Count/s >0 RocketMQ实例队列 1分钟 topicconsumerate 消息消费速率 Topic每秒被消费的消息数单位:Count/s >0 RocketMQ实例队列 1分钟 topicbytesinrate 生产流量 当前主题的生产流量单位:Byte/s说明:2022年5月16号及以后购买的实例,支持此监控项。 >0 RocketMQ实例队列 1分钟 topicbytesoutrate 消费流量 当前主题的消费流量单位:Byte/s说明:2022年5月16号及以后购买的实例,支持此监控项。 >0 RocketMQ实例队列 1分钟
        来自:
        帮助文档
        分布式消息服务RocketMQ
        用户指南
        监控
        支持的监控指标
      • 接入方式
        本节介绍了RabbitMQ 接入方式。 安全接入点 RabbitMQ 安全接入点支持 "PLAIN"、"AMQPLAIN" 授权机制。 1、访问控制 RabbitMQ "PLAIN"、"AMQPLAIN"授权机制需要创建用户,从而获得对应虚拟主机的访问权限。 2、接入步骤 (1)新建用户(集群管理>用户>新建用户) (2)运行demo 客户端关键参数设置 "PLAIN"、"AMQPLAIN" 授权机制的客户端关键参数配置 String host "192.168.0.0"; //安全接入点ip Integer port 5672; //安全接入点port String username "xxx"; //集群管理用户列表的用户名 String password "xxx"; String vhost "/"; ConnectionFactory connectionFactory new ConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); SSL接入点 RabbitMQ 安全接入点支持 "EXTERNAL" 授权机制 1、访问控制 无 2、接入步骤 (1)下载SSL证书(实例概览>导出服务>下载SSL文件) (2)运行demo 客户端关键参数设置 "EXTERNAL" 授权机制的客户端关键参数配置 java String host "192.168.0.0"; //SSL接入点ip int port 5671; //SSL接入点port //以下2个ssl文件可通过控制台获取安装包, 具体的获取方式可以查看2.2.1接入步骤的第二小节 String ksFile "D:tmpsslclientrabbitmqkey.p12"; String tksFile "D:tmpssltruststore"; String vhost "/"; char[] keyPassphrase "W3zT98Zz9Io".toCharArray(); KeyStore ks KeyStore.getInstance("PKCS12"); ks.load(new FileInputStream(ksFile), keyPassphrase); KeyManagerFactory kmf KeyManagerFactory.getInstance("SunX509"); kmf.init(ks, keyPassphrase); char[] trustPassphrase null; trustPassphrase "W3zT98Zz9Io".toCharArray(); KeyStore tks KeyStore.getInstance("JKS"); tks.load(new FileInputStream(tksFile), trustPassphrase); TrustManagerFactory tmf TrustManagerFactory.getInstance("SunX509"); tmf.init(tks); SSLContext c SSLContext.getInstance("tlsv1.2"); c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); ConnectionFactory connectionFactory new ConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setVirtualHost(vhost); connectionFactory.setSaslConfig(DefaultSaslConfig.EXTERNAL); connectionFactory.useSslProtocol(c);
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        最佳实践
        RabbitMQ接入
        接入方式
      • 数据采集
        本文主要介绍数据采集 在使用APM服务过程中用户开启APM数据采集开关后,APM仅采集应用性能指标及调用链相关数据,不涉及个人隐私数据。所采集的数据仅用于应用的性能分析和故障诊断,不会用于其他商业目的。 数据类型 采集数据 传输方式 存储方式 数据用途 时限 性能指标数据 JVM相关数据、异常、数据库、SQL语句以及中间件调用相关的数据 通过WSS方式传输 APM服务端按照租户隔离存储 指标查看页面展示 免费版7天,企业版30天,到期彻底删除 调用链数据 调用链event数据,包含中间件调用的相关数据 通过WSS方式传输 APM服务端按照租户隔离存储 调用链前台查询展示 免费版7天,企业版30天,到期彻底删除 资源信息 服务类型、服务名称、创建时间、删除时间、所在节点地址和服务发布端口 通过WSS方式传输 APM服务端按照租户隔离存储 资源库前台查询展示 免费版7天,企业版30天,到期彻底删除 资源属性 系统类型、系统启动事件、CPU个数、服务执行用户名称、服务进程id、服务的PodID、CPU标志、系统版本、服务使用的Web框架、JVM版本、时区、系统名称、采集器版本以及LastMail的Url 通过WSS方式传输 APM服务端按照租户隔离存储 资源库前台查询展示 免费版7天,企业版30天,到期彻底删除 表 采集项限制说明 采集项名称 最大值 监控项默认最大行数 500行 SQL默认长度限制 2000字符 SQL Result Body体默认采集数量限制 100个 SQL Result Body体默认采集内容大小限制 999字符 Redis Body体默认长度限制 100字符 Mongo最大集群数 10个 Mongo command默认长度限制 2000字符 Hbase command默认长度限制 500字符 Es RestClient上限 10个 Cassandra CQL默认长度限制 2000字符 Cassandra Session上限 10个 Kafka Mbean采集ObjectName上限 100个 Kafka ClientId对应IP缓存上限 100个 RabbitMq连接地址上限 20个 RabbitMq每个地址最大缓存连接数 100个 RabbitMq Consumer上限 500个 RabbitMq每个Consumer最大缓存Channel数 100个 RabbitMq每个Channel没有ACK的消息数 3000条 RabbitMq缓存的手动ACK Consumer个数 20个 RocketMq PID上限 20个 RocketMq ClientId上限 20个 Jetcd Tag最大长度 500字符 HttpClient连接池上限 10条 连接池调用链默认上报时间阈值 1毫秒 Dubbo Invocation长度限制 500字符 Dubbo Attachment长度限制 500字符 URL Body体默认长度限制 9999字符 URL采集应用code body长度限制 0字符 Java Method Body体长度限制 8192字符
        来自:
        帮助文档
        应用性能管理
        产品介绍
        数据采集
      • 服务内联委托管理
        可信云服务可以通过IAM委托的方式访问其他云服务的资源。可信实体为天翼云服务的IAM委托,包括普通云服务委托和云服务关联委托。本文介绍事件总线EventBridge的服务内联委托。 什么是服务内联委托 在某些场景下,事件总线EventBridge为了完成自身的某个功能,需要获取其他云服务的访问权限,因此,事件总线EventBridge创建了与云服务内联委托,即服务内联委托CtyunAssumeRoleForEventBridge。 使用事件总线EventBridge,系统提供的服务内联委托及其包含的系统权限策略如下: 服务内联委托:CtyunAssumeRoleForEventBridge 系统权限策略:CtyunAssumePolicyForEventBridge CtyunAssumeRoleForEventBridge 服务内联委托CtyunAssumeRoleForEventBridge具有获取访函数列表、函数详情以及调用函数的权限;具有对分布式消息服务Kafka、分布式消息服务RocketMQ、分布式消息服务MQTT与分布式消息服务RabbitMQ的管理员权限;具有专有网络VPC、VPCE的管理员权限。 服务内联委托CtyunAssumeRoleForEventBridge被授予权限策略CtyunAssumePolicyForEventBridge,该权限策略的内容如下: plaintext { "Version": "1.1", "Statement": [ { "Action": [ "cf:inst:InvokeFunction", "cf:inst:GetFunction", "cf:inst:ListFunctions", "KAFKA::", "MQ2::", "mqtt::", "AMQP::", "vpce::", "vpc::" ], "Resource": [ "" ], "Effect": "Allow" } ] } 以下是使用事件总线EventBridge时,需要使用服务内联委托的场景: 建立函数计算规则时,需要委托事件总线EventBridge具有获取访函数列表、函数详情以及调用函数的权限。 建立消息中间件事件源与事件目标时,需要委托事件总线EventBridge具有对分布式消息服务Kafka、分布式消息服务RocketMQ、分布式消息服务MQTT与分布式消息服务RabbitMQ的管理员权限。 建立网络端点时,需要委托事件总线EventBridge具有专有网络VPC、VPCE的管理员权限。
        来自:
        帮助文档
        事件总线
        产品简介
        服务内联委托管理
      • 操作类常见问题
        本节介绍分布式消息服务Kafka操作类常见问题 消息在kafka保留多长时间? 消息保存72小时,超过72小时的消息将会被删除。 Kafka可以创建多少个主题? Kafka基础版可以创建50个主题、Kafka高级版可以创建100个主题。 如果想消费已经被消费过的数据? consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据消费。当然这里会产生一个很严重的问题,如果你重启一消费者程序,那你连一条数据都抓不到,但是log文件中明明可以看到所有数据都好好的存在。换句话说,一旦你消费过这些数据,那你就无法再次用同一个groupid消费同一组数据了。针对这种情况,你可在控制台重置消费组消费点(3天内)。 是否需要预先创建消费组 消费组和消费组订阅主题关系虽然业务应用客户端接入时可自动创建,但建议都先预先创建做好管理。 出现“Not authorized to access group”的错误信息 没有创建消费组时会遇到此报错信息,创建消费组可解决此问题。 为什么PHP发送延时比较长? PHP发送延时比较长是PHP的语言特性导致的。PHP每次发送时,都会重新初始化一个KafkaProducer对象,这个初始化会进行各种操作,包括连接各个Broker、更新元数据等,在VPC内耗时100ms以上,在公网可能耗时500ms以上。相比之下,Java会复用KafkaProducer,发送延迟较低。 哪里可以找到生产消费消息的示例 最佳实践 生产者实践、消费者实践。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        操作类
        操作类常见问题
      • 自定义分布式消息服务RocketMQ事件源
        本文介绍如何在事件总线EventBridge管理控制台中添加分布式消息服务RocketMQ类型的自定义事件源。 前提条件 事件总线EventBridge 开通事件总线EventBridge并委托授权。 创建自定义总线。 分布式消息服务RocketMQ 开通分布式消息服务RocketMQ。 创建实例,并创建对应主题与消费组。 操作步骤 1. 登录事件总线管理控制台。 2. 在左侧导航栏,单击事件总线。 3. 在事件总线页面,单击目标总线名称。 4. 在左侧导航栏,单击事件源。 5. 在事件源 页面,单击添加事件源。 6. 在添加自定义事件源 面板,输入名称 和描述 ,事件提供方 选择分布式消息服务RocketMQ,选择实例、主题等配置然后单击确认,如图1所示。 图1 创建事件源时事件提供方选择分布式消息服务RocketMQ 参数说明 参数 说明 示例 名称 事件源名。 source1 实例名称 选择前提条件中已创建的分布式消息服务RocketMQ版实例。 xxx Topic 选择当前实例中的Topic。 topic1 Group 快速创建:自动创建以GIDEVENTBRIDGExxx命名的Group ID。 使用已有:选择当前实例中已创建的Group,请不要与已有业务的Group混用,以免影响已有的消息收发。 group1 消费位点 开始消费的位置。仅支持从最早位点进行消费。 最早位点:从最初位点开始消费。 最早位点
        来自:
        帮助文档
        事件总线
        用户指南
        事件总线
        事件源
        自定义事件源类型
        自定义分布式消息服务RocketMQ事件源
      • 基于事件流实现RocketMQ消息路由
        本文介绍如何应用事件总线EventBridge的事件流功能实现分布式消息服务RocketMQ的消息路由。 前提条件 开通事件总线EventBridge并授权。 开通分布式消息服务RocketMQ并创建最少两个主题。 背景信息 事件流作为更轻量、实时端到端的流式事件通道,提供轻量级的流式数据的过滤和转换的能力,在不同的数据仓库之间、数据处理程序之间、数据分析和处理系统之间进行数据同步。源端分布式消息服务RocketMQ生产的消息可以通过事件流这个通道被路由到目标端的分布式消息服务RocketMQ。 步骤一:创建事件流 1. 登录事件总线EventBridge控制台。 2. 在左侧导航栏,单击事件流。 3. 在事件流页面,单击创建事件流。 4. 在创建事件流面板,设置任务名称和描述,配置以下参数,然后单击保存。 a.在Source(源)配置向导,选择数据提供方为分布式消息服务RocketMQ,设置以下参数,然后单击下一步。 参数 说明 示例 实例名称 前提条件中已创建的分布式消息服务RocketMQ版实例。 xxx Topic 当前实例中的Topic。 topic1 Group 消费组名。 快速创建:自动创建以GIDEVENTBRIDGExxx命名的Group ID。 使用已有:选择当前实例中已创建的Group,请不要与已有业务的Group混用,以免影响已有的消息收发。 group1 消费位点 开始消费的位置。 最新位点:从最新位点开始消费。 最新位点 Tag 用于过滤消息的Tag值,非必填。 tag1 b.在Filtering(过滤)配置向导,设置事件过滤规则,单击下一步。 c.在Sink(目标)配置向导,选择服务类型为分布式消息服务RocketMQ,配置以下参数,单击保存,如图1所示。 参数 说明 示例 实例 选择分布式消息服务RocketMQ实例。 instancexxx Topic 选择RocketMQ实例的Topic。 topic1 消息体 选择消息体(Body)的内容,更多参考“事件内容转换”。 完整事件 自定义属性 选择自定义属性(Properties)的内容,更多参考“事件内容转换”。 空 索引 选择索引(Keys)的内容,更多参考“事件内容转换”。 空 标签 选择标签(Tags)的内容,更多参考“事件内容转换”。 空 图1 创建事件流时选择服务类型为分布式消息服务RocketMQ的事件目标 5. 创建事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。
        来自:
        帮助文档
        事件总线
        最佳实践
        基于事件流实现消息路由
        基于事件流实现RocketMQ消息路由
      • 1
      • ...
      • 5
      • 6
      • 7
      • 8
      • 9
      • ...
      • 241
      跳转至
      推荐热词
      天翼云运维管理审计系统天翼云云服务平台云服务备份云日志服务应用运维管理云手机云电脑天翼云云hbase数据库电信云大数据saas服务电信云大数据paas服务轻量型云主机天翼云客户服务电话应用编排服务天翼云云安全解决方案云服务总线CSB天翼云服务器配置天翼云联邦学习产品天翼云云安全天翼云企业上云解决方案天翼云产品天翼云视频云存储

      天翼云最新活动

      安全隔离版OpenClaw

      OpenClaw云服务器专属“龙虾“套餐低至1.5折起

      天翼云新春焕新季

      云主机开年特惠28.8元/年,0元秒杀等你来抢!

      云上钜惠

      爆款云主机全场特惠,2核4G只要1.8折起!

      中小企业服务商合作专区

      国家云助力中小企业腾飞,高额上云补贴重磅上线

      出海产品促销专区

      爆款云主机低至2折,高性价比,不限新老速来抢购!

      天翼云奖励推广计划

      加入成为云推官,推荐新用户注册下单得现金奖励

      产品推荐

      弹性云主机 ECS

      物理机 DPS

      多活容灾服务

      GPU云主机

      轻量型云主机

      AI Store

      公共算力服务

      应用托管

      科研助手

      推荐文档

      天翼云最佳实践②:toa模块安装方法

      文档下载

      登录

      云课堂 第六课:如何让云主机不放在同一个篮子里

      应用数据管理

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 天翼云国际站
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2026 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号