云主机开年特惠28.8元/年,0元秒杀等你来抢!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 安全隔离版OpenClaw NEW OpenClaw云服务器专属“龙虾“套餐低至1.5折起
  • 天翼云新春焕新季 NEW 云主机开年特惠28.8元/年,0元秒杀等你来抢!
  • 云上钜惠 爆款云主机全场特惠,2核4G只要1.8折起!
  • 中小企业服务商合作专区 国家云助力中小企业腾飞,高额上云补贴重磅上线
  • 出海产品促销专区 NEW 爆款云主机低至2折,高性价比,不限新老速来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

息壤智算

领先开放的智算服务平台,提供算力、平台、数据、模型、应用“五位一体”智算服务体系,构建全流程的AI基础设施能力
AI Store
  • 算力市场
  • 模型市场
  • 应用市场
  • MCP市场
公共算力服务
  • 裸金属
  • 定制裸金属
训推服务
  • 模型开发
  • 训练任务
  • 服务部署
模型推理服务
  • 模型广场
  • 体验中心
  • 服务接入
应用托管
  • 应用实例
科研助手
  • 科研智能体
  • 科研服务
  • 开发机
  • 并行计算
大模型
  • DeepSeek-V3.1
  • DeepSeek-R1-0528
  • DeepSeek-V3-0324
  • Qwen3-235B-A22B
  • Qwen3-32B
智算一体机
  • 智算一体机
模型适配专家服务
  • 模型适配专家服务
算力服务商
  • 入驻算力服务商

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场进入AI Store创新解决方案公有云生态专区智云上海应用生态专区
建站工具
  • 新域名服务
  • SSL证书
  • 翼建站
企业办公
  • 安全邮箱
  • WPS 365 天翼云版
  • 天翼企业云盘(标准服务版)
灾备迁移
  • 云管家2.0
  • 翼备份(SaaS版)

定价

协助您快速了解云产品计费模式、价格详情,轻松预估上云成本
价格计算器
  • 动态测算产品价格
定价策略
  • 快速了解计费模式

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼信创云专区
  • 信创云专区
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
培训与认证
  • 天翼云学堂
  • 天翼云认证
开源社区
  • 魔乐社区
  • OpenTeleDB

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 服务保障
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家
我要反馈
  • 建议与反馈
  • 用户体验官
信息公告
  • 客户公告

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 息壤智算
  • 产品
  • 解决方案
  • 应用商城
  • 定价
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      分布式消息服务MQTT_相关内容
      • 使用限制
        分布式消息服务Kafka在某些功能做了约束和限制,如下表: 限制项 约束和限制 描述 Kafka Zookeeper 不对外暴露 Kafka实例的Zookeeper目前仅处于自用,不对外提供服务,为Kafka内部使用。 版本 当前服务端版本为1.1.0和2.3.0 兼容0.10以上的客户端版本,推荐使用和服务端一致的版本。 消息大小 生产消息的最大长度为10M 消息长度不要超过10M,否则生产失败。 登录Kafka节点所在机器 不能登录 无 限制Kafka Topic总分区数 限制 Kafka以分区为粒度管理消息,分区多导致生产、存储、消费都碎片化,影响性能稳定性。在使用过程中,当Topic的总分区数达到上限后,用户就无法继续创建Topic。 是否支持自动创建Topic 支持 在创建实例时候,您可以选择是否开启。 当您选择开启,表示生产或消费一个未创建的Topic时,会自动创建一个包含3个分区和3个副本的Topic。 是否需要创建消费组、消费者、生产者 不需要 不需要单独创建消费组、生产者和消费者,在使用时自动生成,实例创建后,直接使用即可 减少分区数 不支持 按照开源Kafka现有逻辑,不支持减少分区数。
        来自:
        帮助文档
        专属云分布式消息服务Kafka
        产品简介
        使用限制
      • 分布式消息产品选型
        介绍分布式消息产品选型对比项。 特性 Kafka RabbitMQ RocketMQ 功能 支持功能较少,不支持延迟发送,消息重试等功能 功能丰富,支持多个队列种类(优先级队列、延迟队列、死信队列镜像队列等),提供丰富的策略分配 功能完善,支持事务消息、定时消息、事务消息等 单机吞吐量 十万级 万级 几万级 稳定性 队列/分区多时性能不稳定 消息堆积时,性能不稳定 队列较多、消息堆积时性能保持稳定 可用性 非常高(分布式)具有主备故障自动切换 较高,基于主从架构实现高可用性 非常高(分布式)具有主备故障自动切换 选型建议 性能要求高,数据量大,适合产生大量数据的互联网服务的数据收集业务,如日志采集处理、需对接大数据应用等,kafka是首选 数据量少,吞吐量需求不大;数据可靠性要求较高,对功能丰富性极高 可靠性要求很高且性能要求较高的场景以及业务削峰场景,如电商、订单处理等
        来自:
        帮助文档
        分布式消息服务RocketMQ
        产品简介
        分布式消息产品选型
      • 查看监控数据
        本文主要介绍查看监控数据。 操作场景 云监控对Kafka实例的运行状态进行日常监控,可以通过控制台直观的查看Kafka实例各项监控指标。 前提条件 已创建Kafka实例,且实例中有可消费的消息。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 通过以下任意一种方法,查看监控数据。 在Kafka实例名称后,单击。跳转到云监控页面,查看实例、节点、队列和消费组的监控数据,数据更新周期为1分钟。 单击Kafka实例名称,进入实例详情页。在左侧导航栏单击“监控”,进入监控页面,查看实例、节点、队列和消费组的监控数据,数据更新周期为1分钟。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        监控
        查看监控数据
      • 分布式消息服务Kafka服务等级协议(新)
        自2021年11月11日起,新版分布式消息服务Kafka服务等级协议(SLA)生效。详情请参见这里。
        来自:
        帮助文档
        分布式消息服务Kafka
        相关协议
        分布式消息服务Kafka服务等级协议(新)
      • 计费类
        如何为购买的消息队列RocketMQ实例续费? 为防止资源到期或者浪费,已经购买包月实例的用户,可执行续费操作,延长 RocketMQ实例的有效期,也可以设置到期自动续费的相关操作。 开通的RocketMQ实例资源包何时生效? 天翼云分布式消息服务RocketMQ资源包在客户支付成功之后,系统经过初始化完成后即可生效使用。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        常见问题
        计费类
      • 实例常见问题
        本节介绍分布式消息服务Kafka实例常见问题 为什么可用区不能选择2个? Kafka 通常选择三个可用区而不是两个的原因在于数据的容错性和高可用性。通过将副本分布在三个可用区中,Kafka 能够实现更高级别的容错性和可用性,即使一个可用区发生故障,系统仍然可以继续正常运行。这种三个可用区的配置是为了提供更强大的冗余和容错能力,确保 Kafka 集群在面对故障时仍能保持数据的可靠性和可用性。 创建实例时为什么无法查看子网和安全组等信息? 创建实例时,如果无法查看虚拟私有云、子网、安全组、弹性IP,可能原因是该用户还没创建相关网络实例,需要到对应天翼云网络产品购买相应的网络产品实例。 如何选择实例硬盘大小? 磁盘大小:流量均值×存储时长×3(备份),建议在迁移上云过程中优化Topic以降低成本。 Kafka实例的超高IO和高IO如何选择? 选择Kafka磁盘类型主要取决于应用的需求和预算。通常,对于Kafka来说,高IO磁盘是更常见的选择,因为它提供了更好的性能和吞吐量。更多信息请参考计费及购买类问题Kafka磁盘选择超高IO还是高IO? Kafka服务端支持版本是多少? 分布式消息服务Kafka支持2.132.8.2版本和2.133.6.2版本服务端。 Kafka实例是否支持修改访问端口? 分布式消息服务Kafka支持在实例创建时指定访问端口。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        实例问题
        实例常见问题
      • 使用DNAT访问Kafka实例
        步骤四:在Kafka控制台绑定弹性公网IP地址 步骤 1 在管理控制台左上角单击,选择“应用服务 > 分布式消息服务 Kafka”,进入分布式消息服务Kafka专享版页面。 步骤 2 单击Kafka实例名称,进入实例详情页面。 步骤 3 在“基本信息”页面的“高级配置”区域,单击“修改”。 步骤 4 将“advertised.listeners IP/域名”改为DNAT规则中的弹性公网IP,内网连接地址和弹性公网IP的对应关系与添加DNAT规则中记录的对应关系保持一致,单击“保存”。 图修改advertised.listeners IP(使用DNAT访问) 步骤五:验证接口连通性 参考连接未开启SASL的Kafka实例或者连接已开启SASL的Kafka实例,测试是否可以生产和消费消息。 测试接口连通性时,注意以下几点: 连接Kafka实例的地址为“advertised.listeners IP:9011”,以上图为例,连接Kafka实例的地址为“124.xxx.xxx.167:9011,124.xxx.xxx.174:9011,124.xxx.xxx.57:9011”。 在Kafka实例安全组的入方向规则中放通9011端口。 连接Kafka实例的客户端已开启公网访问功能。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        连接Kafka实例
        使用DNAT访问Kafka实例
      • 功能特性
        本节主要介绍分布式消息服务Kafka的功能特性 消息收发 消息发送,支持指定分区发送、同步发送、异步发送、分区批量累积发送;支持身份认证,客户端连接Broker时使用SSL或SASL进行验证,数据传加密传输;支持ACL机制,提供客户端读、写权限认证。支持通过压缩算法实现消息体压缩,减少网络传输数据量,提高Kafka的消息发送吞吐量。 消息消费,支持指定Partition消费、指定分区的offset消费;采用poll方式,支持批量消费,支持广播消费。 消息有序性 针对消息有序的业务需求,分为全局有序和局部有序。 全局有序:一个Topic下的所有消息都需要按照生产顺序消费。 局部有序:一个Topic下的消息,只需要满足同一业务字段的要按照生产顺序消费。例如:Topic消息是订单的流水表,包含订单orderId,业务要求同一个orderId的消息需要按照生产顺序进行消费。 由于Kafka的一个Topic可以分为了多个Partition,Producer发送消息的时候,是分散在不同 Partition的。当Producer按顺序发消息给Broker,但进入Kafka之后,这些消息就不一定进到哪个Partition,会导致顺序是乱的。因此要满足全局有序,需要1个Topic只能对应1个Partition。 要满足局部有序,只需要在发消息的时候指定Partition Key,Kafka对其进行Hash计算,根据计算结果决定放入哪个Partition。这样Partition Key相同的消息会放在同一个Partition。此时,Partition的数量仍然可以设置多个,提升Topic的整体吞吐量。
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        功能特性
      • 使用限制
        数据库双向同步 应用容灾多活 通过集成数据传输服务 产品能力,支持数据库双向同步功能。您需要到DTS产品控制台开通实例 和创建数据同步任务,并在多活产品控制台进行集成管理。 消息双向同步 应用容灾多活 通过集成分布式消息服务RocketMQ产品能力,支持消息双向同步功能。您可以在多活产品控制台进行集成配置,也可以到分布式消息服务RocketMQ产品控制台进行任务管理。 注册中心同步 应用容灾多活 支持RPC请求的本地优先和异地定向等跨集群调用策略,您需要到微服务引擎 产品控制台配置注册中心双向同步任务。 配额说明 配额是指您在容灾多活产品中可创建的资源数量限制,具体的资源配额如下表所示。 资源 配额 说明 ::: 应用系统 10 单个租户支持的应用系统数量。 站点 2 单个应用系统支持的站点数量。 单元 2 单个站点支持单元数量。 单元组 3 单个应用系统支持的单元组数量。 应用节点 100 单个应用系统支持的应用节点数量。
        来自:
        帮助文档
        应用高可用
        产品简介
        应用容灾多活
        使用限制
      • Java
        消费消息 import com.rabbitmq.client.; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class RabbitmqConsumer { //队列名称 private final static String QUEUENAME "helloMQ"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory new ConnectionFactory(); //设置主机ip factory.setHost("127.0.0.1"); //设置amqp的tcp端口号 factory.setPort(5672); //设置用户名密码 factory.setUsername("YOUR USERNAME"); factory.setPassword("YOUR PASSWORD"); //设置Vhost factory.setVirtualHost("/"); //基于网络环境合理设置超时时间 factory.setConnectionTimeout(30 1000); factory.setHandshakeTimeout(30 1000); factory.setShutdownTimeout(0); Connection connection factory.newConnection(); Channel channel connection.createChannel(); //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。 channel.queueDeclare(QUEUENAME, false, false, false, null); System.out.println(" [] Waiting for messages. To exit press CTRL+C"); Consumer consumer new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message new String(body, StandardCharsets.UTF8); System.out.println("Received message: '" + message + "'"); } }; channel.basicConsume(QUEUENAME, true, consumer); } } SSL生产消息 import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultSaslConfig; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import java.io.FileInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.; import java.security.cert.CertificateException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class RabbitmqProducerSsl { // private final static String EXCHANGENAME "exchangeTest"; private final static String QUEUENAME "helloMQ"; // private final static String ROUTINGKEY "test"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // 创建连接工厂 ConnectionFactory factory new ConnectionFactory(); // 设置主机ip factory.setHost("127.0.0.1"); // 设置amqp的ssl端口号 factory.setPort(5671); String ksFile "/sslpath/ssl/clientrabbitmqkey.p12"; String tksFile "/sslpath/ssl/truststore"; SSLContext c null; try { char[] keyPassphrase "YOUR PASSPHRASE".toCharArray(); KeyStore ks KeyStore.getInstance("PKCS12"); ks.load(new FileInputStream(ksFile), keyPassphrase); KeyManagerFactory kmf KeyManagerFactory.getInstance("SunX509"); kmf.init(ks, keyPassphrase); char[] trustPassphrase "YOUR PASSPHRASE".toCharArray(); KeyStore tks KeyStore.getInstance("JKS"); tks.load(new FileInputStream(tksFile), trustPassphrase); TrustManagerFactory tmf TrustManagerFactory.getInstance("SunX509"); tmf.init(tks); c SSLContext.getInstance("tlsv1.2"); c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); } catch (KeyStoreException e) { throw new RuntimeException(e); } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); } catch (CertificateException e) { throw new RuntimeException(e); } catch (UnrecoverableKeyException e) { throw new RuntimeException(e); } catch (KeyManagementException e) { throw new RuntimeException(e); } factory.setSaslConfig(DefaultSaslConfig.EXTERNAL); factory.useSslProtocol(c); // 设置Vhost,需要在控制台先创建 factory.setVirtualHost("/"); // 基于网络环境合理设置超时时间 factory.setConnectionTimeout(30 1000); factory.setHandshakeTimeout(30 1000); factory.setShutdownTimeout(0); // 创建一个连接 Connection connection factory.newConnection(); // 创建一个频道 Channel channel connection.createChannel(); // 指定一个队列 channel.queueDeclare(QUEUENAME, false, false, false, null); for (int i 0; i < 100; i++) { // 发送的消息 String message "Hello rabbitMQ!" + i; // 往队列中发送一条消息,使用默认的交换器 channel.basicPublish("", QUEUENAME, null, message.getBytes(StandardCharsets.UTF8)); System.out.println(" Sent message: '" + message + "'"); TimeUnit.MILLISECONDS.sleep(100); } //关闭频道和连接 channel.close(); connection.close(); } }
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        开发指南
        Java
      • 注册配置中心
        本章节介绍注册配置中心RCC子产品应用场景 分布式协同 提供分布式系统中服务注册发现及协同调度能力,确保系统状态一致性并完成预期的功能请求,包括状态管理,分布式锁,服务发现,监控检查等。助力用户轻松构建稳定、高可用的分布式服务。 分布式系统中一个绕不过去的问题是如何协同系统中各个分布的模块,确保系统状态的一致性并完成预期的功能,包括状态管理,分布式锁,服务发现等;将分布式系统这一核心必备能力抽象成为组件对外提供服务,使得开发者不用过多了解分布式细节,即可基于这列组件轻松构建稳定、高可用的分布式服务。 分布式配置管理 提供分布式应用配置集中管理能力,包括配置热生效、配置历史查看、配置数据回滚等,支持主流的Nacos 配置中心,支持原生数据无缝迁移,帮助您高效解决分布式环境下多应用的配置管理问题。 在分布式服务架构中,当系统从一个单体应用,被拆分成分布式系统上一个个服务节点后,配置文件也必须跟着迁移分割,这样配置就分散了,不仅如此,分散中还包含着冗余,而配置中心将配置从各应用中剥离出来,对配置进行统一管理,应用自身不需要自己去管理配置。
        来自:
        帮助文档
        微服务引擎
        产品简介
        应用场景
        注册配置中心
      • 删除消费组
        本章节介绍了如何删除分布式消息服务RocketMQ实例的消费组。 操作场景 消费组无需继续使用,需要清理时,参考本章节删除消费组。 操作步骤 1. 登录分布式消息服务RocketMQ控制台。 2. 单击RocketMQ实例的名称,进入实例详情页面。 3. 在左侧导航栏,单击“消费组管理”,进入“消费组管理”页面。 图1 删除消费组 4. 选择以下任意一种方法删除消费组。 在待删除的消费组所在行,单击“删除”。 单击待删除的消费组名称,进入消费组详情页面。在页面右上角,单击“删除”。 如需批量删除消费组,勾选待删除的消费组,单击“删除消费组”。 5. 单击“是”,确认删除消费组。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        用户指南
        消费组管理
        删除消费组
      • 生产者实践
        本文主要介绍消息队列 Kafka 发布者的最佳实践,从而帮助您更好的使用该产品。 文中的最佳实践基于消息队列 Kafka 的 Java 客户端;对于其它语言的客户端,其基本概念与思想是通用的,但实现细节可能有差异,仅供参考。 Kafka 发送示例代码片段 Key 和 Value Kafka 0.10.0.0 的消息字段只有两个:Key 和 Value。Key 是消息的标识,Value 即消息内容。为了便于追踪,重要消息最好都设置一个唯一的 Key。通过 Key 追踪某消息,打印发送日志和消费日志,了解该消息的发送和消费情况。 失败重试 在分布式环境下,由于网络等原因,偶尔的发送失败是常见的。导致这种失败的原因有可能是消息已经发送成功,但是 Ack 失败,也有可能是确实没发送成功。 消息队列 Kafka 是 VIP 网络架构,会主动掐掉空闲连接(30 秒没活动),也就是说,不是一直活跃的客户端会经常收到 “connection rest by peer” 这样的错误,因此建议都考虑重试消息发送。 异步发送 发送接口是异步的;如果你想得到发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)。 线程安全 Producer 是线程安全的,且可以往任何 Topic 发送消息。通常情况下,一个应用对应一个 Producer 就足够了。 Acks Acks的说明如下: acks0,表示无需服务端的 Response,性能较高,丢数据风险较大; acks1,服务端主节点写成功即返回 Response,性能中等,丢数据风险中等,主节点宕机可能导致数据丢失; acksall,服务端主节点写成功,且备节点同步成功,才返回 Response,性能较差,数据较为安全,主节点和备节点都宕机才会导致数据丢失。 一般建议选择 acks1,重要的服务可以设置 acksall。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        生产者实践
      • 修改Topic老化时间
        本文主要介绍 修改Topic老化时间。 老化时间即消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。 创建Topic成功之后,您可以根据业务需要修改Topic的老化时间。修改老化时间,不会影响业务。Topic老化时间,默认为72小时。 修改老化时间可以通过以下方式实现: 在“Topic管理”中,修改老化时间。 在“配置参数”中,修改“log.retention.hours”参数值,具体步骤请参考修改配置参数。 说明 如果Topic已经设置了老化时间,此时“配置参数”中的log.retention.hours值将不对此Topic生效。仅在Topic中未设置老化时间时,“配置参数”中的log.retention.hours值才会对此Topic生效。例如:Topic01设置的老化时间为60小时,“配置参数”中的log.retention.hours值为72小时,此时Topic01实际的老化时间为60小时。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例的名称,进入实例详情页面。 步骤 5 选择“Topic管理”页签,显示已创建的Topic详情。 步骤 6 通过以下任意一种方法,修改Topic老化时间。 勾选Topic名称左侧的方框,可选一个或多个,单击信息栏左上侧的“编辑Topic”。 在待修改老化时间的Topic所在行,单击“编辑”。 步骤 7 在“编辑Topic”对话框中,输入老化时间,单击“确定”。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        修改Topic老化时间
      • 删除流控
        本文主要介绍删除流控。 操作场景 本章节指导您在无需流控时,删除流控。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例的名称,进入实例详情页面。 步骤 5 在左侧导航栏单击“流控管理 > 流控列表”,进入流控列表页面。 步骤 6 在待删除的流控所在行,单击“删除”,弹出“删除流控”对话框。 步骤 7 单击“是”,跳转到“后台任务管理”页面,当流控任务的“状态”为“成功”时,表示成功删除流控。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        流控管理
        删除流控
      • 修改认证用户密码
        接口功能介绍 修改认证用户密码 接口约束 无 URI POST /v3/api/updateMqttUser 路径参数 无 Query参数 无 请求参数 请求头header参数 参数 是否必填 参数类型 说明 示例 下级对象 regionId 是 String 资源池id 请求体body参数 参数 是否必填 参数类型 说明 示例 下级对象 prodInstId 是 String 实例ID userName 是 String 认证用户 password 是 String 新认证用户密码,需按要求加密 响应参数 参数 参数类型 说明 示例 下级对象 message String 描述状态 statusCode String 800成功 其他失败 error String 错误码 returnObj Object 返回对象 枚举参数 无 请求示例 请求url 请求头header { "regionId":"bb9fdb42056f11eda1610242ac110002" } 请求体body { "prodInstId": "123", "userName": "mqtt1", "password": "ojrkopqwropqwkopmvpsdkfopakfop[a" } 响应示例 { "statusCode": 800 } 状态码 请参考 [状态码 错误码 请参考 错误码
        来自:
        帮助文档
        分布式消息服务MQTT
        API参考
        API
        2022-04-13
        4.0版本接口
        权限管理
        修改认证用户密码
      • 环境准备
        本文主要介绍分布式消息服务RabbitMQ所需的环境准备。 创建RabbitMQ实例前,您需要提前准备相关依赖资源,包括虚拟私有云(Virtual Private Cloud,以下简称VPC)、子网和安全组,并配置安全组策略。每个RabbitMQ实例都部署在某个VPC中,并绑定具体的子网和安全组,通过这样的方式为RabbitMQ提供一个隔离的、用户自主配置和管理的虚拟网络环境以及安全保护策略,提升实例的安全性。 如果用户已有VPC,可重复使用,不需要多次创建。 创建VPC 1、登录管理控制台。 2、在管理控制台右上角单击,选择区域。 说明 此处请选择与您的应用服务相同的区域。 3、在管理控制台左上角单击,选择“网络 > 虚拟私有云”。 4、单击“创建虚拟私有云”,进入“创建虚拟私有云”界面。 5、根据界面提示创建虚拟私有云。如无特殊需求,界面参数均可保持默认。关于创建VPC的详细信息可以参考《虚拟私有云用户指南》。 创建虚拟私有云时,会同时创建子网,若需要额外创建子网,请参考6。如果不需要额外创建子网,请执行7。 6、在左侧导航栏,单击“子网”,进入“子网”界面。单击“创建子网”。根据界面提示创建子网。如无特殊需求,界面参数均可保持默认。 关于创建子网的详细信息可以参考《虚拟私有云用户指南》。 7、在左侧导航栏,选择“访问控制 > 安全组”,进入“安全组”界面。根据界面提示创建安全组。如无特殊需求,界面参数均可保持默认。 关于创建安全组的详细信息可以参考《虚拟私有云用户指南》。 使用RabbitMQ实例前,添加表1所示安全组规则,其他规则请根据实际需要添加。 说明 创建安全组后,系统默认添加入方向“允许安全组内的弹性云主机彼此通信”规则和出方向“放通全部流量”规则,此时使用内网通过同一个VPC访问RabbitMQ实例,无需添加表1的规则。 表1 安全组规则 方向 协议 端口 源地址 说明 入方向 TCP 5672 0.0.0.0/0 访问RabbitMQ实例(关闭SSL) 入方向 TCP 5671 0.0.0.0/0 访问RabbitMQ实例(开启SSL) 入方向 TCP 15672 0.0.0.0/0 访问Web界面UI地址(关闭SSL加密) 入方向 TCP 15671 0.0.0.0/0 访问Web界面UI地址(开启SSL加密)
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        用户指南
        环境准备
      • 计费互转
        介绍分布式消息服务Kafka计费模式互转的功能操作内容。 场景描述 Kafka的按需转包周期的场景描述如下: 在使用Kafka时,可能会遇到需要设置按需转包周期的场景,例如: 消息积压处理:当Kafka中的消息积压较多时,可能会导致消息的消费速度跟不K上消息的生产速度,进而影响系统的性能和稳定性。为了解决这个问题,可以设置按需转包周期,即将一定数量的消息打包成一个批次进行消费,以提高消费的效率和吞吐量。 业务流量波动:在某些业务场景下,业务流量可能会出现波动,即某个时间段内的消息产生速度较快,而另一个时间段内的消息产生速度较慢。为了更好地适应业务流量的波动,可以设置按需转包周期,以根据实际的消息产生情况进行灵活的批量消费。 系统资源优化:当Kafka的消费者资源有限时,可以通过设置按需转包周期来优化系统的资源利用。通过将一定数量的消息打包成一个批次进行消费,可以减少消费者的竞争和上下文切换,提高系统的并发处理能力。 消息处理延迟优化:在某些场景下,对消息的实时性要求较低,可以通过设置按需转包周期来优化消息的处理延迟。将一定数量的消息打包成一个批次进行消费,可以减少消息的处理次数,从而降低消息的处理延迟。 需要注意的是,在设置按需转包周期时,应根据实际业务需求和系统情况进行调整。同时,应考虑消息的重要性、消费者的处理能力、系统的资源限制等因素,以确保系统的稳定性和性能。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        实例管理
        计费互转
      • 删除消息
        介绍分布式消息服务Kafka删除消息功能操作内容。 场景描述 在以下场景中,可以考虑删除Kafka中的消息: 错误消息:当Kafka中的消息存在错误或异常时,需要将这些消息删除。这可能是由于消息格式错误、消息内容不完整或者其他数据质量问题导致的。删除错误消息可以保持数据的一致性和准确性。 保留策略变更:Kafka中可以设置消息的保留时间或者大小,即消息在主题中的保留期限。当需要更改消息的保留策略时,可能需要删除旧的消息以应用新的策略。例如,如果要缩短消息的保留时间,则需要删除超过新保留期限的旧消息。 清理测试数据:在测试环境中,经常需要清理旧的测试数据,以确保环境的可用空间和性能。当测试数据不再需要时,可以删除相应的消息来释放资源。 故障恢复:当Kafka发生故障或者数据丢失时,可能需要删除一些消息以进行数据恢复。在这种情况下,需要根据故障的范围和影响选择删除相应的消息,以恢复数据的完整性。 需要注意的是,在删除消息之前,需要确保删除的消息不再需要或者已经备份。删除消息是不可逆的操作,一旦删除就无法恢复。因此,在执行删除操作之前,建议先进行备份或者确认消息不再需要。 操作步骤 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“Topic管理”后进入主题管理页面。 (5)在Topic所在行,点击其右侧“更多”,在下拉框中单击“消息删除”,并选择确认。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        删除消息
      • 删除消费组
        本文主要介绍删除消费组。 分布式消息服务Kafka支持通过以下两种方式删除消费组,您可以根据实际情况选择任意一种方式。 方法一:在管理控制台删除消费组 方法二:在Kafka客户端使用命令行工具删除消费组(确保Kafka实例版本与命令行工具版本相同) 前提条件 待删除消费组的状态为“EMPTY”。 方法一:在管理控制台删除消费组 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例的名称,进入实例详情页面。 步骤 5 在左侧导航栏选择“消费组管理”,进入消费组列表页面。 步骤 6 通过以下任意一种方法,删除消费组。 勾选消费组名称左侧的方框,可选一个或多个,单击信息栏左上侧的“删除消费组”。 在待删除消费组所在行,单击“删除”。 说明 仅在“消费组状态”为“EMPTY”时,支持删除。 消费组包含以下状态: DEAD:消费组内没有任何成员,且没有任何元数据。 EMPTY:消费组内没有任何成员,存在元数据。 PREPARINGREBALANCE:准备开启Rebalance。 COMPLETINGREBALANCE:所有成员加入消费组。 STABLE:消费组内成员可以正常消费。 步骤 7 弹出“删除消费组”对话框,单击“是”,完成消费组的删除。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        消费组管理
        删除消费组
      • RocketMQ触发器
        RocketMQ触发器可以订阅分布式消息服务RocketMQ并根据消息触发关联的函数,借此能力,使得函数可以消费指定topic的消息,执行自定义处理逻辑。 注意事项 RocketMQ触发器订阅的分布式消息服务RocketMQ实例必须和函数计算的函数实例在相同地域。 前提条件 创建函数。 开通分布式消息服务RocketMQ实例(RocketMQ引擎类型),详情请参考开通RocketMQ实例。 创建Topic和GroupID。 创建用户,且默认Topic权限设置为:PUBSUB,默认消费组权限为SUB。详情请参考创建用户。 操作步骤 1. 登录函数计算控制台,点击目标函数,进入函数详情。 2. 选择详情下顶部的配置选项卡。 3. 在配置 选项卡中,选择左边的触发器选项卡。 4. 点击创建触发器 ,在弹出的右抽屉中选择RocketMQ触发器,配置参数解释如下。 配置项 操作 示例 触发器类型 选择RocketMQ触发器。 RocketMQ触发器 名称 填写自定义的触发器名称。 rocketmqtrigger 版本或别名 默认值为LATEST,支持选择任意函数版本或函数别名。 LATEST RocketMQ 实例 选择已创建的RocketMQ实例。 Topic 选择已创建的RocketMQ实例的Topic。 Group ID 选择已创建的RocketMQ实例的Group ID。 消费位点 选择消息的消费位点,即触发器从RocketMQ实例开始拉取消息的位置。取值说明如下。 最新位点 :从最新位点开始消费。 最早位点 :从最早位点开始消费。 指定时间戳:从指定时间戳开始消费。 最新位点 调用方式 选择函数调用方式。 同步调用 :指触发器消费topic消息后投递到函数是同步调用,会等待函数响应后继续下一个消息投递。 异步调用:指触发器消费topic消息后投递到函数是异步调用,不会等待函数响应,可以快速消费事件。 同步调用 用户ID RocketMQ实例用户ID,需要在RocketMQ控制台创建。 密钥 RocketMQ实例用户密钥,需要在RocketMQ控制台创建。 触发器启用状态 创建触发器后是否立即启用。默认选择开启,即创建触发器后立即启用触发器。 推送配置 批量推送条数:批量推送的最大值,积压值达到后立刻推送,取值范围为 [1, 10000]。 批量推送间隔:批量推送的最大时间间隔,达到后立刻推送,单位秒,取值[0,15]。默认0无需等待,数据直接推送。 推送格式:函数收到的事件格式,详情请查阅触发器事件消息格式。 重试策略 消息推送函数失败后重试的策略,共两种: 指数退避:指数退避重试,重试5次,重试周期为2,4,8,16,32(秒)。 线性退避:线性退避重试,重试5次,重试周期为1,2,3,4,5(秒)。 容错策略 当重试次数耗尽后仍然失败时的处理方式: 允许容错:当异常发生并超过重试策略配置时直接丢弃。 禁止容错:当异常发生并超过重试策略配置时继续阻塞执行。 死信队列 当容错策略为:允许容错时,可以额外开启死信队列。当开启死信队列时且异常发生并超过重试策略配置时,消息会被投递到指定的消息队列里,当前只支持投递到kafka和rocketmq
        来自:
        帮助文档
        函数计算
        用户指南
        事件触发
        RocketMQ触发器
      • 变更实例规格
        介绍分布式消息服务Kafka扩容相关内容。 场景描述 当需要处理大量消息时,Kafka实例的扩容是一种常见的解决方案。扩容可以增加Kafka集群的吞吐量、存储能力和高可用性。分布式消息服务Kafka提供三类扩容方案,分别为节点、规格和磁盘扩容,更好满足用户不同场景下的扩容需求。 ● 代理数量扩容:指向Kafka集群中添加更多的节点以增加系统的吞吐量和可靠性。通过扩容,可以将消息的发送和消费负载分摊到更多的节点上,从而提高系统的并发处理能力。 ● 规格扩容:指通过增加Kafka的资源配置来提升系统的处理能力和性能。 ● 磁盘扩容:指增加磁盘的存储容量,以满足更多消息的存储需求。 变更实例规格的影响 变更配置类型 影响 磁盘扩容 磁盘扩容不会影响业务。 代理数量扩容 1.代理数量扩容不会影响原来的代理,业务也不受影响(如果实例已配置分区自动重平衡,会触发重平衡,客户端会触发重连)。 2.新创建的Topic才会分布在新代理上,原有Topic还分布在原有代理上。通过修改Kafka分区平衡,实现将原有Topic分区的副本迁移到新代理上。 规格扩容缩容 1.若Topic为单副本,扩容/缩容期间会出现无法对该Topic生产消息或消费消息,会造成业务中断。 2.若Topic为多副本,代理会滚动重启,代理滚动重启造成分区Leader切换,会发生秒级连接闪断,Leader切换时长一般为1分钟以内。建议您在业务低峰期扩容/缩容,并且再生产客户端配置重试机制,方法如下: (1).生产客户端为Kafka开源客户端时,检查是否配置retries和retry.backoff.ms参数。建议参数值分别配置为:retries10,retry.backoff.ms1000。 (2).生产客户端为Flink客户端时,检查是否配置重启策略,配置重启策略可以参考如下代码。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.seconds(20)));
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        实例管理
        变更实例规格
      • 消费组详情
        介绍分布式消息服务Kafka消费组详情功能操作内容。 场景描述 当需要查询以下信息时,可通过消费组详情页面操作: 查看在线消费者列表及其订阅的主题、分区。 查看消费组订阅的主题的消息堆积详细情况。 Kafka消息堆积的场景包括以下几个: 消费者处理延迟:当消费者的处理能力不足或出现故障时,无法及时消费Kafka中的消息,导致消息堆积。这可能是由于消费者的处理逻辑复杂、处理速度慢,或者消费者的资源不足等原因引起的。 网络故障:当Kafka集群与消费者之间的网络出现故障或不稳定时,可能导致消息传输延迟或中断。这会导致消息在Kafka中堆积,等待网络恢复后才能被消费。 生产者速度超过消费者:当生产者产生消息的速度超过消费者的处理速度时,会导致消息在Kafka中堆积。这可能是由于生产者的速度过快、消费者处理能力不足或者消费者故障等原因引起的。 消费者组调整:当消费者组中的消费者发生变化,如新增或退出消费者,会触发Kafka的重平衡操作。在重平衡期间,消费者无法消费消息,导致消息堆积。这通常发生在消费者扩展或故障恢复时。 高峰期消息涌入:在某些特定的时间段或事件发生时,可能会引发大量的消息涌入Kafka,超过消费者的处理能力。这会导致消息在Kafka中堆积,直到消费者能够跟上消息的处理速度。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        消费组管理
        消费组详情
      • 代码示例
        本节介绍了RabbitMQ接入的代码示例。 安全接入点(PLAIN、AMQPLAIN授权机制) java import com.rabbitmq.client.; import java.io.IOException; public class RabbitmqAmqpDemo { public static void main(String[] args) throws Exception { String host "192.168.0.0"; //安全接入点ip Integer port 5672; //安全接入点port String username "xxx"; //集群管理用户列表的用户名 String password "xxx"; //集群管理用户列表的密码 String vhost "/"; String exchangeName "extest"; String queueName "qutest"; ConnectionFactory connectionFactory new ConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); Connection connection connectionFactory.newConnection(); Channel channel connection.createChannel(); channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, "test"); String message "Hello Aop"; for (int i 0; i < 10; i++) { channel.basicPublish(exchangeName, "test", null, message.getBytes()); System.out.println("消息发送成功"); } Channel consumeChannel connection.createChannel(); Consumer consumer new DefaultConsumer(consumeChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String messageGet new String(body, "UTF8"); if (messageGet.equals(message)) { System.out.println("消息消费成功"); } } }; consumeChannel.setDefaultConsumer(consumer); consumeChannel.basicConsume(queueName, false, consumer); Thread.sleep(10000); } } SSL接入点(EXTERNAL授权机制) java import com.rabbitmq.client.; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import java.io.FileInputStream; import java.io.IOException; import java.security.KeyStore; public class RabbitmqExternalDemo { public static void main(String[] args) throws Exception { String host "192.168.0.0"; //SSL接入点ip int port 5671; //SSL接入点port //以下2个ssl文件可通过控制台获取安装包, 具体的获取方式可以查看2.2.1接入步骤的第二小节 String ksFile "D:tmpsslclientrabbitmqkey.p12"; String tksFile "D:tmpssltruststore"; String vhost "/"; String exchangeName "extest"; String queueName "qutest"; char[] keyPassphrase "W3zT98Zz9Io".toCharArray(); KeyStore ks KeyStore.getInstance("PKCS12"); ks.load(new FileInputStream(ksFile), keyPassphrase); KeyManagerFactory kmf KeyManagerFactory.getInstance("SunX509"); kmf.init(ks, keyPassphrase); char[] trustPassphrase null; trustPassphrase "W3zT98Zz9Io".toCharArray(); KeyStore tks KeyStore.getInstance("JKS"); tks.load(new FileInputStream(tksFile), trustPassphrase); TrustManagerFactory tmf TrustManagerFactory.getInstance("SunX509"); tmf.init(tks); SSLContext c SSLContext.getInstance("tlsv1.2"); c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); ConnectionFactory connectionFactory new ConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setVirtualHost(vhost); connectionFactory.setSaslConfig(DefaultSaslConfig.EXTERNAL); connectionFactory.useSslProtocol(c); Connection connection connectionFactory.newConnection(); Channel channel connection.createChannel(); channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, "test"); String message "Hello Aop"; for (int i 0; i < 10; i++) { channel.basicPublish(exchangeName, "test", null, message.getBytes()); System.out.println("消息发送成功"); } Channel consumeChannel connection.createChannel(); Consumer consumer new DefaultConsumer(consumeChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String messageGet new String(body, "UTF8"); if (messageGet.equals(message)) { System.out.println("消息消费成功"); } } }; consumeChannel.setDefaultConsumer(consumer); consumeChannel.basicConsume(queueName, false, consumer); Thread.sleep(10000); } }
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        最佳实践
        RabbitMQ接入
        代码示例
      • 创建Topic
        本文主要介绍 创建Topic。 Topic,即消息主题。创建Kafka实例成功后,如果没有开启“Kafka自动创建Topic”,需要手动创建Topic,然后才能进行生产消息和消费消息。如果实例开启了“Kafka自动创建Topic”,则该操作为可选。 “Kafka自动创建Topic”表示在生产或消费一个未创建的Topic时,会自动创建此Topic,此Topic的默认参数值如下:分区数为3,副本数为3,老化时间为72小时,不开启同步复制和同步落盘。如果在“配置参数”中修改“log.retention.hours”、“default.replication.factor”或“num.partitions”的参数值,此后自动创建的Topic参数值为修改后的参数值。例如:“num.partitions”修改为“5”,自动创建的Topic参数值如下:分区数为5,副本数为3,老化时间为72小时,不开启同步复制和同步落盘。 Kafka实例对Topic的总分区数设置了上限, 当Topic的总分区数达到上限后,用户就无法继续创建Topic 。不同规格配置的Topic总分区数不同,具体请参考Kafka产品规格说明。 本文主要介绍手动创建Topic的操作,有以下几种方式,您可以根据实际情况选择任意一种方式: 方式1:在控制台创建 方式2:在Kafka Manager创建 方式3:在Kafka客户端上创建 说明 实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。 方式1:在控制台创建 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例的名称,进入实例详情页面。 步骤 5 选择“Topic管理”页签,单击“创建Topic”。 弹出“创建Topic”对话框。 图 创建Topic 步骤 6 填写Topic名称和配置信息。 表 Topic参数说明 参考 说明 Topic名称 系统为您自动生成了Topic名称,您可以根据需要修改。 创建Topic后不能修改名称。 分区数 您可以设置Topic的分区数,分区数越大消费的并发度越大。 该参数设置为1时,消费消息时会按照先入先出的顺序进行消费。 取值范围:1~200 默认值:3 副本数 您可以为每个Topic设置副本的数量,Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。 该参数设置为1时,表示只有一份数据。 默认值:3 说明 实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。 老化时间(小时) 消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。 取值范围:1~720 默认值:72 同步复制 指后端收到生产消息请求并复制给所有副本后,才返回客户端。 开启同步复制后,需要在客户端配置acksall或者1,否则无效。 当副本数为1时,不能选择同步复制功能。 同步落盘 同步落盘是指生产的每条消息都会立即写入磁盘。 开启:生产的每条消息都会立即写入磁盘,可靠性更高。 关闭:生产的消息存在内存中,不会立即写入磁盘。 消息时间戳类型 定义消息中的时间戳类型,取值如下: CreateTime:生产者创建消息的时间。 LogAppendTime:broker将消息写入日志的时间。 批处理消息最大值 Kafka允许的最大批处理大小,如果启用消息压缩,则表示压缩后的最大批处理大小。 如果增加“max.message.bytes”的值,且存在消费者版本早于0.10.2,此时消费者的“fetch size”值也必须增加,以便消费者可以获取增加后的批处理大小。 描述 Topic的描述信息。 步骤 7 配置完成后,单击“确定”,完成创建Topic。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        创建Topic
      • 环境准备
        概述 在创建天翼云分布式消息服务RabbitMQ实例之前,您需要做一些准备工作。 首先,您需要设置一个虚拟私有云(VPC),这是一个隔离的网络环境,用于托管RabbitMQ实例。 接下来,您需要创建一个子网,它是VPC内部的一个子网络,用于划分不同的部分和区域。 最后,您需要配置一个安全组,用于控制入站和出站的流量规则,以保证RabbitMQ实例的安全性。 每个分布式消息服务RabbitMQ实例都会被部署在特定的VPC中,并与特定的子网和安全组相关联。这种方式可以让您自主配置和管理RabbitMQ实例的网络环境,并提供安全保护策略。如果您已经有了现成的VPC、子网和安全组,可以重复使用它们,无需重复创建。这样可以节省时间和资源,并确保一致性和可靠性。 VPC和子网 VPC和子网可重复使用,您也可以使用不同的VPC和子网来配置RabbitMQ实例,您可根据实际需要进行配置。在创建VPC和子网时应注意如下要求: VPC与使用的天翼云分布式消息服务RabbitMQ服务应在相同的区域。 如无特殊需求,创建VPC和子网的配置参数使用默认配置即可。 创建VPC和子网的操作请参考虚拟私有云创建VPC、子网搭建私有网络 若需要在已有VPC上创建和使用新的子网,请参考虚拟私有云子网管理创建子网 安全组 安全组可重复使用,您也可以根据实际情况使用不同的安全组,请根据实际需要进行配置。 创建安全组的操作指导,请参考虚拟私有云创建安全组 若需要为安全组添加规则,请参考虚拟私有云安全组添加安全组规则
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        快速入门
        环境准备
      • 创建数据订阅任务
        本文为您介绍创建数据订阅任务的操作场景、前提条件和操作步骤。 操作场景 场景一:数据实时分析 使用云监控数据订阅功能,将业务进行异步解耦,在不影响源库业务的情况下,实时同步监控数据(指标、事件)或告警数据到客户自有分析系统中,帮助企业用户进行实时数据分析。 场景二:数据归档存储 使用云监控数据订阅功能,您可将资源监控或告警数据的增量更新数据,实时地推送到归档数据库或数据仓库。 说明 数据订阅功能当前为受限开放,如有需求可以联系客户经理为您开放此功能。 资源池下单个用户最多可创建10个数据订阅任务。 前提条件 注册天翼云账号,并完成实名认证。具体操作,请参见天翼云账号注册流程。 注意 数据订阅功能数据类型支持指标数据、事件数据,订阅方式支持分布式消息服务及API方式。 指标数据订阅支持分布式消息服务(kafka)及remotewriteapi方式。 事件数据订阅支持分布式消息服务(kafka)及apipush方式。(事件订阅入口为:云监控服务>事件监控>事件订阅) 创建订阅任务需提前创建订阅渠道。 操作步骤 1. 登录控制中心。 2. 在控制中心页面左上角点击,选择区域,本文我们选择华东1。 3. 依次选择“管理与部署”,单击“云监控”,进入监控概览页面。 4. 单击左侧“数据订阅”菜单,进入数据订阅任务列表。 5. 单击“创建订阅任务”功能,进入数据订阅任务创建二级页面。 6. 创建订阅任务参数配置如下: 模块 参数 参数说明 配置示例 备注 选择订阅对象 服务维度 选择需要订阅数据的服务维度信息,支持多选 云主机云主机 选择订阅对象 监控对象类型 具体实例 选择订阅对象 选择对象 选择资源实例对象 具体实例 定义订阅方式 订阅通道 选择公网/内网 分布式消息服务 定义订阅方式 发送渠道 Kafka Kafka 定义订阅方式 订阅失败缓存时间 输入用户客户端地址 基础信息 订阅任务名称 填写自定义订阅任务名称 testtask 基础信息 描述 填写订阅任务描述信息 说明 关于告警数据订阅全部资源的场景,涉及两个周期: 订阅服务subscription同步订阅配置的周期,预计上线配置2分钟。 数据订阅配置全部资源,也需要周期同步全量的实例资源,预计线上配置3分钟。 因此,考虑极限情况,告警订阅服务在周期1 + 周期2之后产生的告警,订阅服务才能匹配消费到。
        来自:
        帮助文档
        云监控服务
        用户指南
        数据订阅
        创建数据订阅任务
      • Kafka相关概念
        本文主要介绍 Kafka相关概念。 云服务平台使用Kafka作为消息引擎,以下概念基于Kafka进行描述。 Topic 消息主题。消息的生产与消费,围绕消息主题进行生产、消费以及其他消息管理操作。 Topic也是消息队列的一种发布与订阅消息模型。生产者向消息主题发布消息,多个消费者订阅该消息主题的消息,生产者与消费者彼此并无直接关系。 生产者(Producer) 向Topic(消息主题)发布消息的一方。发布消息的最终目的在于将消息内容传递给其他系统/模块,使对方按照约定处理该消息。 消费者(Consumer) 从Topic(消息主题)订阅消息的一方。订阅消息最终目的在于处理消息内容,如日志集成场景中,监控告警平台(消费者)从主题订阅日志消息,识别出告警日志并发送告警消息/邮件。 代理(Broker) 即Kafka集群架构设计中的单个Kafka进程,一个Kafka进程对应一台服务器,因此手册中描述的代理,还包括对应的存储、带宽等服务器资源。 分区(Partition) 为了实现水平扩展与高可用,Kafka将Topic划分为多个分区,消息被分布式存储在分区中。 副本(Replica) 消息的备份存储。为了确保消息可靠,Kafka创建Topic时,每个分区会分别从代理中选择1个或多个,对消息进行冗余存储。 Topic的所有消息分布式存储在各个分区上,分区在每个副本存储一份全量数据,副本之间的消息数据保持同步,任何一个副本不可用,数据都不会丢失。 每个分区都随机挑选一个副本作为Leader,该分区所有消息的生产与消费都在Leader副本上完成,消息从Leader副本复制到其他副本(Follower)。 Kafka的主题和分区属于逻辑概念,副本与代理属于物理概念。下图通过消息的生产与消费流向,解释了Kafka的分区、代理与主题间的关系。 图 Kafka消息流
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        Kafka相关概念
      • 按需转包周期
        介绍 分布式消息服务RocketMQ的按需转包周期 本节适用于南京3、重庆2、晋中、上海7、北京5、内蒙6、石家庄20 节点。 场景描述 RocketMQ的按需转包周期的场景描述如下: 在使用RocketMQ时,可能会遇到需要设置按需转包周期的场景,例如: 消息积压处理:当RocketMQ中的消息积压较多时,可能会导致消息的消费速度跟不上消息的生产速度,进而影响系统的性能和稳定性。为了解决这个问题,可以设置按需转包周期,即将一定数量的消息打包成一个批次进行消费,以提高消费的效率和吞吐量。 业务流量波动:在某些业务场景下,业务流量可能会出现波动,即某个时间段内的消息产生速度较快,而另一个时间段内的消息产生速度较慢。为了更好地适应业务流量的波动,可以设置按需转包周期,以根据实际的消息产生情况进行灵活的批量消费。 系统资源优化:当RocketMQ的消费者资源有限时,可以通过设置按需转包周期来优化系统的资源利用。通过将一定数量的消息打包成一个批次进行消费,可以减少消费者的竞争和上下文切换,提高系统的并发处理能力。 消息处理延迟优化:在某些场景下,对消息的实时性要求较低,可以通过设置按需转包周期来优化消息的处理延迟。将一定数量的消息打包成一个批次进行消费,可以减少消息的处理次数,从而降低消息的处理延迟。 需要注意的是,在设置按需转包周期时,应根据实际业务需求和系统情况进行调整。同时,应考虑消息的重要性、消费者的处理能力、系统的资源限制等因素,以确保系统的稳定性和性能。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        用户指南
        实例管理
        按需转包周期
      • 创建自定义集群
        软件配置 MRS集群软件配置 参数 参数说明 区域 选择区域。 不同区域的云服务产品之间内网互不相通。请就近选择靠近您业务的区域,可减少网络时延,提高访问速度。 集群名称 集群名称不允许重复。 只能由字母、数字、中划线和下划线组成,并且长度为1~64个字符。 默认名称为mrsxxxx,xxxx为字母和数字的四位随机组合数,系统自动组合。 集群版本 当前版本为MRS 3.1.0。 集群类型 提供几种集群类型: 分析集群:用来做离线数据分析,提供的是Hadoop体系的组件。 流式集群:用来做流处理任务,提供的是流式处理组件。 混合集群:既可以用来做离线数据分析,也可以用来做流处理任务,提供的是Hadoop体系的组件和流式处理组件。建议同时需要做离线数据分析和流处理任务时使用混合集群。 自定义:用户可按照业务需求调整集群服务的部署方式,具体请参见创建自定义拓扑集群。(目前仅MRS 3.x版本支持) 说明 MRS流式集群不支持“作业管理”和“文件管理”功能。如需在集群中安装全部组件,请选择“自定义”类型集群。 组件选择 MRS配套的组件如下: 分析集群组件 Presto:开源、分布式SQL查询引擎。 Hadoop:分布式系统基础架构。 Spark:内存分布式系统框架。(MRS 3.x版本不支持) Spark2x:Spark2x是一个对大规模数据处理的快速和通用引擎,基于开源Spark2.x版本开发。(仅MRS 3.x版本支持) Hive:建立在Hadoop上的数据仓库框架。 HBase:分布式列数据库。 Tez:提供有向无环图的分布式计算框架。 Hue:提供Hadoop UI能力,让用户通过浏览器分析处理Hadoop集群数据。 Loader:基于开源sqoop 1.99.7开发,专为Apache Hadoop和结构化数据库(如关系型数据库)设计的高效传输大量数据的工具。(MRS 3.x版本不支持) Hadoop为必选组件,且Spark与Hive组件需要配套使用。请根据业务选择搭配组件。 Flink:分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态计算。 Oozie:Hadoop作业调度系统。(仅MRS 3.x版本支持) HetuEngine:HetuEngine是一个同异构大数据集的分布式SQL查询引擎。(仅MRS 3.1.x LTS版本支持) Ranger:一个基于Hadoop平台监控和管理数据安全的框架。 Impala:一种处理大量数据的SQL查询引擎。 ClickHouse:ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS)。CPU架构为鲲鹏计算的ClickHouse集群表引擎不支持使用HDFS和Kafka。 Kudu:一种列存储管理器。 流式集群组件 Kafka:提供分布式消息订阅的系统。 Flume:提供分布式、高可用、高可靠的海量日志采集、聚合和传输系统。 ZooKeeper:一个集中的服务,它用于维护配置信息、命名、提供分布式的同步和提供分组服务。(仅MRS 3.x版本支持) Ranger:一个基于Hadoop平台监控和管理数据安全的框架。(仅MRS 3.x版本支持) 元数据 是否使用外部数据源存储元数据。 本地元数据: 元数据存储在集群本地。 数据连接:使用外部数据源元数据,若集群异常或删除时将不影响元数据,适用于存储计算分离的场景。 支持Hive或Ranger组件的集群支持该功能。 组件名 当“元数据”选择“数据连接”时该参数有效。用于表示可以设置外部数据源的组件类型。 Hive Ranger 数据连接类型 当“元数据”选择“数据连接”时该参数有效。用于表示外部数据源的类型。 Hive组件支持的数据连接类型: − RDS服务MySQL数据库− 本地数据库 Ranger组件支持的数据连接类型: − RDS服务MySQL数据库− 本地数据库 数据连接实例 当“数据连接类型”选择“RDS服务MySQL数据库”时,该参数有效。用于表示MRS集群与RDS服务数据库连接的名称,该实例必选先创建才能在此处引用。可单击“创建数据连接”进行创建,具体请参考配置数据连接。
        来自:
        帮助文档
        翼MapReduce
        用户指南
        配置集群
        创建自定义集群
      • 导出Topic
        本文主要介绍 导出Topic。 本章节指导您在控制台导出Topic,支持批量导出。 前提条件 已创建Topic。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例的名称,进入实例详情页面。 步骤 5 选择“Topic管理”页签,显示已创建的Topic详情。 步骤 6 在页面右上角单击,导出Topic列表到本地。 Topic列表中包含如下信息:Topic名称、分区数、副本数、老化时间、是否同步复制或同步落盘。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        导出Topic
      • 计费模式
        本节介绍了分布式消息服务RabbitMQ产品计费模式的区别。 包周期(包年/包月)、按需2种计费模式供您灵活选择,使用越久越便宜。 按包周期实例计费 天翼云提供包月和包年的购买模式。这种购买方式相对于按需付费则能够提供更大的折扣,对于长期使用者,推荐该方式。包周期计费按照订单的购买周期来进行结算。 按需实例计费 这种购买方式比较灵活,可以即开即停,支持秒级计费。实例从“开通”开启计费到“删除”结束计费,按实际购买时长(精确到秒)计费。 下表列出两种模式的区别: 计费模式 包年/包月 按需计费 付费方式 预付费按照订单的购买周期结算。 后付费按照云服务器实际使用时长计费。 计费周期 按订单的购买周期计费。 按小时结算。 实例升级 支持扩容,工单施工完生效,但是施工过程中服务不可用;不支持缩容 支持扩容,工单施工完生效,但是施工过程中服务不可用;不支持缩容 更改计费模式 支持变更为按需资源 支持变更为包周期资源。 变更规格 支持变更实例规格。 支持变更实例规格。 适用场景 适用于可预估资源使用周期的场景,价格比按需计费模式更优惠。对于长期使用者,推荐该方式。 适用于消息资源需求波动的场景,可以随时开通,随时删除。 变更配置:天翼云分布式消息服务支持计费模式变更,更多操作内容请参考按需转包周期。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        计费说明
        计费模式
      • 1
      • ...
      • 12
      • 13
      • 14
      • 15
      • 16
      • ...
      • 531
      跳转至
      推荐热词
      天翼云运维管理审计系统天翼云云服务平台云服务备份云日志服务应用运维管理云手机云电脑天翼云云hbase数据库电信云大数据saas服务电信云大数据paas服务轻量型云主机天翼云客户服务电话应用编排服务天翼云云安全解决方案云服务总线CSB天翼云服务器配置天翼云联邦学习产品天翼云云安全天翼云企业上云解决方案天翼云产品天翼云视频云存储

      天翼云最新活动

      安全隔离版OpenClaw

      OpenClaw云服务器专属“龙虾“套餐低至1.5折起

      天翼云新春焕新季

      云主机开年特惠28.8元/年,0元秒杀等你来抢!

      云上钜惠

      爆款云主机全场特惠,2核4G只要1.8折起!

      中小企业服务商合作专区

      国家云助力中小企业腾飞,高额上云补贴重磅上线

      出海产品促销专区

      爆款云主机低至2折,高性价比,不限新老速来抢购!

      天翼云奖励推广计划

      加入成为云推官,推荐新用户注册下单得现金奖励

      产品推荐

      物理机 DPS

      GPU云主机

      镜像服务 IMS

      轻量型云主机

      弹性伸缩服务 AS

      训推服务

      公共算力服务

      模型推理服务

      人脸检测

      推荐文档

      概览

      错误日志

      术语解释

      常见问题

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 天翼云国际站
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2026 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号