活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 安全隔离版OpenClaw NEW OpenClaw云服务器专属“龙虾“套餐低至1.5折起
  • 青云志云端助力计划 NEW 一站式科研助手,海外资源安全访问平台,助力青年翼展宏图,平步青云
  • 云上钜惠 爆款云主机全场特惠,2核4G只要1.8折起!
  • 中小企业服务商合作专区 国家云助力中小企业腾飞,高额上云补贴重磅上线
  • 出海产品促销专区 NEW 爆款云主机低至2折,高性价比,不限新老速来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

息壤智算

领先开放的智算服务平台,提供算力、平台、数据、模型、应用“五位一体”智算服务体系,构建全流程的AI基础设施能力
AI Store
  • 算力市场
  • 模型市场
  • 应用市场
  • MCP市场
公共算力服务
  • 裸金属
  • 定制裸金属
训推服务
  • 模型开发
  • 训练任务
  • 服务部署
模型推理服务
  • 模型广场
  • 体验中心
  • 服务接入
应用托管
  • 应用实例
科研助手
  • 科研智能体
  • 科研服务
  • 开发机
  • 并行计算
大模型
  • DeepSeek-V3.1
  • DeepSeek-R1-0528
  • DeepSeek-V3-0324
  • Qwen3-235B-A22B
  • Qwen3-32B
智算一体机
  • 智算一体机
模型适配专家服务
  • 模型适配专家服务
算力服务商
  • 入驻算力服务商

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场进入AI Store创新解决方案公有云生态专区智云上海应用生态专区
建站工具
  • 新域名服务
  • SSL证书
  • 翼建站
企业办公
  • 安全邮箱
  • WPS 365 天翼云版
  • 天翼企业云盘(标准服务版)
灾备迁移
  • 云管家2.0
  • 翼备份(SaaS版)

定价

协助您快速了解云产品计费模式、价格详情,轻松预估上云成本
价格计算器
  • 动态测算产品价格
定价策略
  • 快速了解计费模式

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼信创云专区
  • 信创云专区
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
培训与认证
  • 天翼云学堂
  • 天翼云认证
开源社区
  • 魔乐社区
  • OpenTeleDB

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 服务保障
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家
我要反馈
  • 建议与反馈
  • 用户体验官
信息公告
  • 客户公告

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 息壤智算
  • 产品
  • 解决方案
  • 应用商城
  • 定价
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      分布式消息服务Kafka_相关内容
      • 产品规格
        本节介绍分布式消息服务Kafka的产品规格,以便您正确理解和使用。 Kafka实例规格 (1)以下规格适用于 华东1、华北2、西南1、华南2、上海36、青岛20、长沙42、南昌5、武汉41、杭州7、西南2贵州、太原4、郑州5、西安7、呼和浩特3 节点 注意 通用型规格已调整为白名单特性,如需了解该规格参数请联系技术支持。 单机版实例面向用户体验和业务测试场景,无法保证性能和高可用。如果需要在生产环境使用Kafka实例,建议购买集群版实例。 集群版 Intel计算增强型 实例规格 代理个数 单个代理TPS 单个代理分区上限 单个代理建议主题数 单个代理建议消费组数 单个代理客户端总连接数上限 存储空间范围 单个代理流量规格(MB/S) kafka.2u4g.cluster 350 30000 250 250 20 2000 300500000GB 100 kafka.4u8g.cluster 350 100000 500 500 100 4000 300500000GB 200 kafka.8u16g.cluster 350 150000 1000 1000 150 4000 300500000GB 375 kafka.12u24g.cluster 350 200000 1500 1500 200 4000 300500000GB 625 kafka.16u32g.cluster 350 250000 2000 2000 200 4000 300500000GB 750 kafka.24u48g.cluster 350 250000 2000 2000 200 4000 300500000GB 1125 kafka.32u64g.cluster 350 250000 2000 2000 200 4000 300500000GB 1500 kafka.48u96g.cluster 350 250000 2000 2000 200 4000 300500000GB 2250 kafka.64u128g.cluster 350 250000 2000 2000 200 4000 300500000GB 3000
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        产品规格
      • 约束与限制
        本文介绍分布式消息服务Kafka产品功能的约束和限制。 实例 表实例约束与限制 限制项 约束与限制 :: Kafka ZooKeeper Kafka集群依赖ZooKeeper进行管理,开放ZooKeeper可能引发误操作导致业务受损,当前仅Kafka内部使用,不对外提供服务。 版本 当前服务端版本为1.1.0、2.7和3.x。实例创建后,服务端版本不支持升级。 兼容0.10以上的客户端版本,推荐使用和服务端一致的版本。 登录Kafka节点所在机器 不能登录 存储空间 只支持扩大存储空间,不支持减小存储空间。 扩容存储空间有次数限制,最多扩容20次。 基准带宽/代理个数 只支持增加基准带宽/代理个数,不支持减小基准带宽/代理个数。 代理规格 支持扩容/缩容代理规格。 修改VPC/子网/可用区 实例创建后,不支持修改VPC/子网/可用区。 是否支持Kerberos认证 不支持 Topic 表 Topic约束与限制 限制项 约束与限制 Topic总分区数 Topic总分区数和实例规格有关,具体请参考产品规格。Kafka以分区为粒度管理消息,分区多导致生产、存储、消费都碎片化,影响性能稳定性。在使用过程中,当Topic的总分区数达到上限后,用户就无法继续创建Topic。 单个Topic分区数 按照开源Kafka现有逻辑,单个Topic分区数只支持增加,不支持减少。 Topic数量 Topic数量和Topic总分区数、每个Topic的分区数有关,具体请参考产品规格。 是否支持自动创建Topic 支持。开启自动创建Topic表示生产或消费一个未创建的Topic时,系统会自动创建此Topic,此Topic的默认参数值如下:分区数为3,副本数为3,老化时间为72小时,不开启同步复制和同步落盘。如果在“配置参数”中修改“log.retention.hours”、“default.replication.factor”或“num.partitions”的参数值,此后自动创建的Topic参数值为修改后的参数值。例如:“num.partitions”修改为“5”,自动创建的Topic参数值如下:分区数为5,副本数为3,老化时间为72小时,不开启同步复制和同步落盘。 同步复制 Topic副本数为1时,不能选择同步复制功能。 副本数 不建议使用单副本。实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。 老化时间 如果Topic已经设置了老化时间,此时“配置参数”中的log.retention.hours值将不对此Topic生效。仅在Topic中未设置老化时间时,“配置参数”中的log.retention.hours值才会对此Topic生效。例如:Topic01设置的老化时间为60小时,“配置参数”中的log.retention.hours值为72小时,此时Topic01实际的老化时间为60小时。 批量导入/导出Topic 支持批量导出,不支持批量导入。 Topic名称 Topic名称开头包含特殊字符,例如下划线“”、
        来自:
      • Kafka数据迁移概述
        迁移方案一:先迁生产,再迁消费 先将生产消息的业务迁移到新的Kafka,原Kafka不会有新的消息生产。待原有Kafka实例的消息全部消费完成后,再将消费消息业务迁移到新的Kafka,开始消费新Kafka实例的消息。 本方案为业界通用的迁移方案,操作步骤简单,迁移过程由业务侧自主控制,整个过程中消息不会存在乱序问题, 适用于对消息顺序有要求的场景 。但是该方案中需要等待消费者业务直至消费完毕,存在一个时间差的问题,部分数据可能存在较大的端到端时延。 1、将生产客户端的Kafka连接地址修改为新Kafka实例的连接地址。 2、重启生产业务,使得生产者将新的消息发送到新Kafka实例中。 3、观察各消费组在原Kafka的消费进度,直到原Kafka中数据都已经被消费完毕。 4、将消费客户端的Kafka连接地址修改为新Kafka实例的连接地址。 5、重启消费业务,使得消费者从新Kafka实例中消费消息。 6、观察消费者是否能正常从新Kafka实例中获取数据。 7、迁移结束。 结束 迁移方案二:同时消费,后迁生产 消费者业务启用多个消费客户端,分别向原Kafka和新Kafka实例消费消息,然后将生产业务切到新Kafka实例,这样能确保所有消息都被及时消费。 本方案中消费业务会在一段时间内同时消费原Kafka和新Kafka实例。由于在迁移生产业务之前,已经有消费业务运行在新Kafka实例上,因此不会存在端到端时延的问题。但在迁移生产的开始阶段,同时消费原Kafka与新Kafka实例,会导致部分消息之间的生产顺序无法保证,存在消息乱序的问题。此场景 适用于对端到端时延有要求,却对消息顺序不敏感的业务 。 1、启动新的消费客户端,配置Kafka连接地址为新Kafka实例的连接地址,消费新Kafka实例中的数据。 原有消费客户端需继续运行,消费业务同时消费原Kafka与新Kafka实例的消息。 2、修改生产客户端,Kafka连接地址改为新Kafka实例的连接地址。 3、重启生产客户端,将生产业务迁移到新Kafka实例中。 4、生产业务迁移后,观察连接新Kafka实例的消费业务是否正常。 5、等待原Kafka中数据消费完毕,关闭原有消费业务客户端。 6、迁移结束。 结束
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Kafka数据迁移
        Kafka数据迁移概述
      • Kafka消费者poll的优化
        介绍Kafka消费者poll的优化。 优化背景 Kafka是一个高吞吐量、低延迟的分布式消息系统,常用于构建实时数据流处理和大规模数据集的消费。在Kafka中,消费者通过调用 poll()方法从Broker拉取消息进行消费。 优化Kafka消费者的 poll()方法可以带来以下几个方面的好处: 1. 提高消费者的吞吐量:通过调整 poll()方法的参数和优化策略,可以减少网络通信的次数和延迟,提高消费者的消息处理速度,从而提高整体的吞吐量。 2. 减少消费者的资源占用:消费者在调用 poll()方法时会占用一定的CPU、内存和网络资源。通过优化 poll()方法,可以减少消费者的资源占用,提高资源的利用率,从而节省成本和提高系统的可扩展性。 3. 提高消息的实时性:优化 poll()方法可以减少消息的处理延迟,使得消息能够更快地被消费和处理。这对于实时数据流处理和需要快速响应的应用场景非常重要。 4. 提高系统的稳定性:通过优化 poll()方法,可以减少消费者的阻塞时间和等待时间,减少消息堆积和延迟,从而提高系统的稳定性和可靠性。 总之,优化Kafka消费者的 poll()方法可以提高消费者的吞吐量、降低延迟、节省资源、提高实时性和增强系统的稳定性。这对于大规模数据处理和实时数据流应用非常重要,能够提升系统的性能和用户体验。 优化方案 优化Kafka消费者的 poll()方法可以通过以下几个方面来实现: 1. 批量拉取消息:通过调整 max.poll.records参数,一次性拉取更多的消息,减少网络通信的次数,提高消费者的吞吐量。需要根据实际场景和消费者的处理能力进行合理的调整。 2. 控制拉取间隔:通过调整 poll()方法的调用频率,控制消费者的拉取速度。拉取间隔过小会增加网络开销,间隔过大会导致消息堆积和延迟。根据实际场景和消费者的处理能力,找到合适的拉取间隔,平衡吞吐量和消息的实时性。 3. 并行处理:使用多线程或多进程方式并行处理拉取到的消息,提高消费者的并发处理能力,加快消息的处理速度。确保消息处理逻辑线程安全,避免并发访问问题。 4. 提前预取:通过设置 fetch.min.bytes参数,提前预取下一批消息,减少 poll()方法的等待时间。根据实际场景和消费者的处理能力,找到合适的预取大小,平衡吞吐量和内存开销。 5. 异步提交偏移量:将 enable.auto.commit参数设置为 false,手动异步提交偏移量,减少 poll()方法的阻塞时间。提高消费者的吞吐量和性能。 6. 使用消费者组:将多个消费者组绑定到同一个主题,实现消息的并行消费。每个消费者组可以独立地消费消息,提高整体的消费能力。 7. 合理配置消费者参数:根据实际需求和系统资源,合理配置消费者的参数,如 max.poll.interval.ms、session.timeout.ms等,以避免消费者在处理消息时出现超时或重平衡的情况。 需要根据具体的应用场景和需求,结合实际的性能测试和优化策略,选择合适的优化方案来提高Kafka消费者的 poll()方法的效率和性能。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        Kafka消费者poll的优化
      • 基于事件流实现消息路由至函数计算
        步骤二:测试验证 1. 登录分布式消息服务Kafka控制台。 2. 在左侧导航栏,单击实例列表,选择事件流的源实例。 3. 在主题管理页面,选择源的目标主题,操作列点击更多 ,然后点击生产消息。 4. 在生产消息对话框输入想要发送的消息,然后点击发送消息。 5. 发送消息后,登录函数计算管理控制台。 6. 在函数页面,单击目标函数名称。 7. 在目标函数详情页面,单击监控页签,查看函数是否被触发以及调用时延,如图1所示。 图1 在函数计算管理控制台中查看函数监控指标
        来自:
        帮助文档
        事件总线
        最佳实践
        基于事件流实现消息路由
        基于事件流实现消息路由至函数计算
      • 不使用SASL证书连接
        生产消息 进入Kafka客户端文件的“/bin”目录下,执行如下命令进行生产消息: ./kafkaconsoleproducer.sh brokerlist ​{连接地址} topic 连接地址−−topic​{Topic名称} 参数说明如下: 连接地址:从前提条件中获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 示例如下,“10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094”为获取的Kafka实例公网连接地址。 执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin] ./kafkaconsoleproducer.sh brokerlist 10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094 topic topicdemoHello DMS Kafka! ^C[root@ecskafka bin] 如需停止生产使用Ctrl+C命令退出。 消费消息 执行如下命令进行消费消息: ./kafkaconsoleconsumer.sh bootstrapserver ​{连接地址} topic 连接地址−−topic​{Topic名称} group ${消费组名称} frombeginning 参数说明如下: 连接地址:从前提条件中获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 消费组名称:根据您的业务需求,设定消费组名称。 如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败。 消费组名称开头包含特殊字符,例如下划线“”、 号“ ”时,监控数据无法展示。 示例如下: [root@ecskafka bin] ./kafkaconsoleconsumer.sh bootstrapserver 10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094 topic topicdemo group ordertest frombeginning Kafka! DMS Hello ^CProcessed a total of 3 messages [root@ecskafka bin]
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        步骤四:连接实例生产消费消息
        不使用SASL证书连接
      • 云审计服务支持的DMS for Kafka操作列表
        本文主要介绍云审计服务支持的DMS for Kafka操作列表。 通过云审计服务,您可以记录与分布式消息服务Kafka相关的操作事件,便于日后的查询、审计和回溯。 表 云审计服务支持的DMS for Kafka操作列表 操作名称 资源类型 事件名称 创建DMS实例订单成功 kafka createDMSInstanceOrderSuccess 创建DMS实例任务执行成功 kafka createDMSInstanceTaskSuccess 创建DMS实例订单失败 kafka createDMSInstanceOrderFailure 创建DMS实例任务执行失败 kafka createDMSInstanceTaskFailure 删除创建失败的DMS实例成功 kafka deleteDMSCreateFailureInstancesSuccess 删除创建失败的DMS实例失败 kafka deleteDMSCreateFailureInstancesFailure 删除DMS实例任务执行成功 kafka deleteDMSInstanceTaskSuccess 删除DMS实例任务执行失败 kafka deleteDMSInstanceTaskFailure 批量删除DMS实例任务 kafka batchDeleteDMSInstanceTask 提交批量删除DMS实例请求成功 kafka batchDeleteDMSInstanceSuccess 批量删除DMS实例任务执行成功 kafka batchDeleteDMSInstanceTaskSuccess 提交批量删除DMS实例请求失败 kafka batchDeleteDMSInstanceFailure 批量删除DMS实例任务执行失败 kafka batchDeleteDMSInstanceTaskFailure 提交修改DMS实例订单请求成功 kafka modifyDMSInstanceOrderSuccess 提交修改DMS实例订单请求失败 kafka modifyDMSInstanceOrderFailure 提交扩容实例请求成功 kafka extendDMSInstanceSuccess 扩容DMS实例任务执行成功 kafka extendDMSInstanceTaskSuccess 提交扩容实例请求失败 kafka extendDMSInstanceFailure 扩容DMS实例任务执行失败 kafka extendDMSInstanceTaskFailure 提交重置DMS实例密码请求成功 kafka resetDMSInstancePasswordSuccess 提交重置DMS实例密码请求失败 kafka resetDMSInstancePasswordFailure 提交重启DMS实例请求成功 kafka restartDMSInstanceSuccess 重启DMS实例任务执行成功 kafka restartDMSInstanceTaskSuccess 提交重启DMS实例请求失败 kafka restartDMSInstanceFailure 重启DMS实例任务执行失败 kafka restartDMSInstanceTaskFailure 提交批量重启DMS实例请求成功 kafka batchRestartDMSInstanceSuccess 批量重启DMS实例任务执行成功 kafka batchRestartDMSInstanceTaskSuccess 提交批量重启DMS实例请求失败 kafka batchRestartDMSInstanceFailure 批量重启DMS实例任务执行失败 kafka batchRestartDMSInstanceTaskFailure 提交修改DMS实例信息请求成功 kafka modifyDMSInstanceInfoSuccess 修改DMS实例信息任务执行成功 kafka modifyDMSInstanceInfoTaskSuccess 提交修改DMS实例信息请求失败 kafka modifyDMSInstanceInfoFailure 修改DMS实例信息任务执行失败 kafka modifyDMSInstanceInfoTaskFailure 删除后台任务成功 kafka deleteDMSBackendJobSuccess 删除后台任务失败 kafka deleteDMSBackendJobFailure 冻结DMS实例任务执行成功 kafka freezeDMSInstanceTaskSuccess 冻结DMS实例任务执行失败 kafka freezeDMSInstanceTaskFailure 解冻DMS实例任务执行成功 kafka unfreezeDMSInstanceTaskSuccess 解冻DMS实例任务执行失败 kafka unfreezeDMSInstanceTaskFailure Kafka专享实例创建Topic成功 kafka KafkacreatetopicSuccess Kafka专享实例创建Topic失败 kafka KafkacreatetopicFailure Kafka专享实例删除Topic成功 kafka KafkadeletetopicsSuccess Kafka专享实例删除Topic失败 kafka KafkadeletetopicsFailure 开启自动创建Topic成功 kafka enableautotopicSuccess 开启自动创建Topic失败 kafka enableautotopicFailure 重置消费组偏移量成功 kafka KafkaresetconsumeroffsetSuccess 重置消费组偏移量失败 kafka KafkaresetconsumeroffsetFailure 创建用户成功 kafka createUserSuccess 创建用户失败 kafka createUserFailure 删除用户成功 kafka deleteUserSuccess 删除用户失败 kafka deleteUserFailure 更新用户策略成功 kafka updateUserPoliciesTaskSuccess 更新用户策略失败 kafka updateUserPoliciesTaskFailure
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        云审计服务支持的关键操作
        云审计服务支持的DMS for Kafka操作列表
      • 弹性存储最佳实践
        步骤一:创建Kafka实例(3.6.x版本或以上) 需订购好3.6.x引擎版本或以上的分布式消息Kafka实例,具体操作可参考创建实例。 步骤二:开启实例弹性存储 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”,进入实例详情。 (4)在实例详情页打开“弹性存储”开关按钮,选择用于存储远端数据的对象存储Bucket和填写对应具备读写权限的ak、sk。点击提交。 (5)等待集群开启变更完成,集群会发生轮训重启,等待重启完成即可。 步骤三:配置Topic的弹性存储属性与本地保留时长 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“Topic管理”后,选择指定的Topic点击其右侧“编辑”按钮。 (5)打开“是否开启弹性存储”开关,点击“修改”保存。 计费说明 该功能本地存储使用订购实例时的磁盘存储,具体费用可参考“分布式消息Kafka资费”, 远端存储费用为对象存储费用。该功能无额外收取其他费用。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        弹性存储最佳实践
      • 消息类问题
        消息超过老化时间,消息仍存在的原因 问题现象: 消息超过设置的老化时间(如果Topic已经设置了老化时间,此时“配置参数”中的log.retention.hours值将不对此Topic生效。仅在Topic中未设置老化时间时,“配置参数”中的log.retention.hours值才会对此Topic生效。),消息仍存在。 可能原因1: Topic的每个分区都是由多个大小相同的segment文件组成,每个segment文件的大小为500MB,当segment文件存储的消息大小到达500MB后,才会新建下一个segment文件。Kafka删除消息是删除segment文件,而不是删除一条消息。Kafka要求至少保留一个segment文件用来存储消息,如果正在使用的segment文件中包含超过老化时间的消息,由于此时segment文件不会被删除,所以超过老化时间的消息也不会被删除。 处理方法: 等待segment文件被使用完,或者删除超过老化时间的消息所在的Topic。 可能原因2: Topic中存在一条create time为未来时间的消息(例如当前时间为1月1日,create time设置成了2月1日),此消息在72小时后,并不会被老化,导致在此消息后创建的其他消息都不会被老化。 处理方法: 删除create time为未来时间的消息所在的Topic。 Kafka实例是否支持延迟消息? 不支持延迟消息。 如何查看堆积消息数? 通过以下任意一种方法,查看堆积消息数。 在Kafka控制台的“消费组管理”页面,单击待查看堆积消息的消费组名称,进入消费组详情页。在“消费进度”页签,查看消费组中每个Topic的总堆积数。具体步骤,请参考查询消费组信息。 在Kafka控制台的“监控”页面的“消费组”页签中,“消费组”选择待查看堆积消息数的消费组名称,“队列”选择“全部队列”,“消费组可消费消息数”表示此消费组中所有Topic的堆积消息数之和。查看监控数据的具体步骤,请参考查看监控数据。 在云监控页面的“消费组”页签中,“消费组”选择待查看堆积消息数的消费组名称,“队列”选择“全部队列”,“消费组可消费消息数”表示此消费组中所有Topic的堆积消息数之和。查看监控数据的具体步骤,请参考查看监控数据。 在Kafka客户端,在“/{命令行工具所在目录}/kafka{version}/bin/”目录下,通过 kafkaconsumergroups.sh bootstrapserver {kafka连接地址} describe group {消费组} 命令查看消费组中每个Topic的堆积消息数。“LAG”表示每个Topic的总堆积数。 图 查看每个Topic的总堆积数 说明 如果Kafka实例开启SASL认证,则以上命令还需要增加SASL认证的“consumer.properties”配置文件参数: commandconfig {SASL认证的consumer.properties配置文件} ,“consumer.properties”配置文件参考开启SASL认证的Kafka命令行连接说明。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        消息类问题
      • Kafka业务迁移
        本节主要介绍Kafka业务迁移。 Kafka迁移指将生产与消费消息的客户端切换成连接新Kafka,部分还涉及将持久化的消息文件迁移到新的Kafka。主要涉及到以下2类场景: 业务上云且不希望业务有中断。 在上云过程中,连续性要求高的业务,需要平滑迁移,不能有长时间的中断。 在云上变更业务部署 单AZ部署的Kafka实例,不具备AZ之间的容灾能力。用户对可靠性要求提升后,需要迁移到多AZ部署的实例上。 迁移准备 1、配置网络环境 Kafka实例分VPC内以及公网地址两种网络连接方式。如果使用公网地址,则消息生成与消费客户端需要有公网访问权限,并配置如下安全组。 表 安全组规则 方向 协议 端口 源地址 说明 ::::: 入方向 TCP 9094 0.0.0.0/0 通过公网访问Kafka(关闭SSL加密)。 入方向 TCP 9095 0.0.0.0/0 通过公网访问Kafka(开启SSL加密)。 2、创建Kafka实例 Kafka的规格不能低于原业务使用的Kafka规格。具体请参考购买Kafka实例。 3、创建Topic 在新的Kafka实例上创建与原Kafka实例相同配置的Topic,包括Topic名称、副本数、分区数、消息老化时间,以及是否同步复制和落盘等。具体请参考创建Topic。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        Kafka业务迁移
      • DMS for Kafka 消费者poll的优化
        本文主要介绍DMS for Kafka 消费者poll的优化。 场景介绍 在DMS提供的原生Kafka SDK中,消费者可以自定义拉取消息的时长,如果需要长时间的拉取消息,只需要把poll(long)方法的参数设置合适的值即可。但是这样的长连接可能会对客户端和服务端造成一定的压力,特别是分区数较多且每个消费者开启多个线程的情况下。 如图所示,Kafka队列含有多个分区,消费组中有多个消费者同时进行消费,每个线程均为长连接。当队列中消息较少或者没有时,连接不断开,所有消费者不间断地拉取消息,这样造成了一定的资源浪费。 图 Kafka消费者多线程消费模式 优化方案 在开了多个线程同时访问的情况下,如果队列里已经没有消息了,其实不需要所有的线程都在poll,只需要有一个线程poll各分区的消息就足够了,当在polling的线程发现队列中有消息,可以唤醒其他线程一起消费消息,以达到快速响应的目的。如图所示。 这种方案适用于对消费消息的实时性要求不高的应用场景。如果要求准实时消费消息,则建议保持所有消费者处于活跃状态。 图 优化后的多线程消费方案 说明 消费者(Consumer)和消息分区(Partition)并不强制数量相等,Kafka的poll(long)方法帮助实现获取消息、分区平衡、消费者与Kafka broker节点间的心跳检测等功能。 因此在对消费消息的实时性要求不高场景下,当消息数量不多的时候,可以选择让一部分消费者处于wait状态。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        DMS for Kafka 消费者poll的优化
      • 运行Kafka作业
        本章节主要介绍如何运行Kafka作业。 用户可将自己开发的程序提交到MRS中,执行程序并获取结果。本章节教您在Kafka主题中产生和消费消息。 暂不支持通过界面提交Kafka作业,请通过后台功能来提交作业。 通过后台提交作业 先查询ZooKeeper和Kafka的实例地址,再运行Kafka作业。 查询实例地址(3.x版本) 1.登录MRS管理控制台。 2.选择“集群列表 > 现有集群”,选中一个运行中的集群并单击集群名称,进入集群信息页面。 3.请参考访问FusionInsight Manager(MRS 3.x及之后版本),跳转至FusionInsight Manager页面。然后选择“服务 > ZooKeeper > 实例”,查看ZooKeeper角色实例的IP地址。记录ZooKeeper角色实例中任意一个的IP地址即可。 4.选择“服务 > Kafka > 实例”,查看Kafka角色实例的IP地址。记录Kafka角色实例中任意一个的IP地址即可。 查询实例地址(3.x之前版本) a. 登录MRS管理控制台。 b. 选择“集群列表 > 现有集群”,选中一个运行中的集群并单击集群名称,进入集群信息页面。 c. 在MRS集群详情页面,选择“组件管理 > ZooKeeper > 实例”,查看ZooKeeper角色实例的IP地址。记录ZooKeeper角色实例中任意一个的IP地址即可。 d. 选择“组件管理 > Kafka > 实例”,查看Kafka角色实例的IP地址。记录Kafka角色实例中任意一个的IP地址即可。 运行Kafka作业 MRS 3.x及之后版本客户端默认安装路径为“/opt/Bigdata/client”,MRS 3.x之前版本为“/opt/client”。具体以实际为准。 1.在集群信息页面的“节点管理”页签中单击Master2节点名称,进入弹性云主机管理控制台。 2.单击页面右上角的“远程登录”。 3.根据界面提示,输入Master节点的用户名和密码,用户名、密码分别为root和创建集群时设置的密码。 4.执行如下命令初始化环境变量。 source /opt/Bigdata/client/bigdataenv 5.如果当前集群已开启Kerberos认证,执行以下命令认证当前用户。如果当前集群未开启Kerberos认证,则无需执行该步骤。 kinit MRS集群用户 例如, kinit admin 6.执行如下命令,创建kafka topic。 kafkatopics.sh create zookeeper ZooKeeper角色实例IP:2181/kafkapartitions 2 replicationfactor 2 topic 7.在topic test中产生消息。 首先执行命令kafkaconsoleproducer.sh brokerlist Kafka角色实例IP:9092 topic producer.config /opt/Bigdata/client/Kafka/kafka/config/producer.properties 。 然后输入指定的内容作为生产者产生的消息,输入完成后按回车发送消息。如果需要结束产生消息,使用“Ctrl + C”退出任务。 8.消费topic test中的消息。 kafkaconsoleconsumer.sh topic bootstrapserver Kafka角色实例IP:9092 consumer.config/opt/Bigdata/client/Kafka/kafka/config/consumer.properties 说明 如果集群开启Kerberos认证,则执行如上两个命令时请修改端口号9092为21007,详见
        来自:
        帮助文档
        翼MapReduce
        用户指南
        管理集群
        作业管理
        运行Kafka作业
      • 修改配置参数
        本文主要介绍 修改配置参数。 操作场景 分布式消息服务Kafka为实例、Topic、消费者提供了几个常用配置参数的默认值,您可以根据实际业务需求,在控制台自行修改参数值。其他在控制台未列出的配置参数,请参考Kafka配置进行修改。 1.1.0版本实例的参数都为静态参数,2.3.0/2.7版本实例的参数分为动态参数和静态参数: 动态参数:动态参数修改成功后,无需重启实例,立即生效。 静态参数:静态参数修改成功后,需要手动重启实例才能生效。 说明 部分老实例不支持修改配置参数,具体以控制台为准,此时请联系客服解决。 前提条件 Kafka实例的状态为“运行中”时,才能修改配置参数。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例的名称,进入实例详情页面。 步骤 5 在“配置参数”页签,在待修改参数所在行,单击“编辑”,修改配置参数。1.1.0版本实例的参数说明如下表所示,2.3.0/2.7版本实例的参数说明如下表所示。 表动态参数说明(1.1.0版本) 参数 参数说明 参数范围 默认值 auto.create.groups.enable 是否开启自动创建消费组功能。 true/false true 表 静态参数说明(1.1.0版本) 参数 参数说明 参数范围 默认值 min.insync.replicas 当producer将acks设置为“all”(或“1”)时,此配置指定必须确认写入才能被认为成功的副本的最小数量。 1 ~ 3 1 message.max.bytes 单条消息的最大长度(单位:字节)。 0 ~ 10485760 10485760 unclean.leader.election.enable 指示是否启用不在ISR集合中的副本选为领导者作为最后的手段,即使这样做可能导致数据丢失。 true/false true connections.max.idle.ms 此参数用来指定在多少毫秒之后,关闭空闲的连接。 5000 ~ 600000 600000 log.retention.hours 日志文件最大保存时间。单位为小时。如果Topic已经设置了老化时间,则此参数对此Topic不生效。仅在Topic未设置老化时间时,此参数才对此Topic生效。 1 ~ 168 72 max.connections.per.ip 每个IP允许的最大连接数。超过此连接数的连接请求将被丢弃。 100 ~ 20000 1000 group.max.session.timeout.ms consumer允许的最大会话超时时间,单位为ms。超时时间越长,consumer就能在心跳探测周期内有更多时间处理消息,但也会使故障检测花费更长时间。 6000 ~ 1800000 1800000 default.replication.factor 自动创建Topic时的默认副本个数。 1 ~ 3 3 num.partitions 自动创建Topic时的默认分区数。 1 ~ 100 3 group.min.session.timeout.ms consumer允许的最小会话超时时间,单位为ms。超时时间越短,consumer的心跳探测越频繁,可以使故障检测更快,但会导致broker被抢占更多的资源。 6000 ~ 300000 6000 表动态参数说明(2.3.0/2.7版本) 参数 参数说明 参数范围 默认值 min.insync.replicas 当producer将acks设置为“all”(或“1”)时,此配置指定必须确认写入才能被认为成功的副本的最小数量。 1 ~ 3 1 message.max.bytes 单条消息的最大长度(单位:字节)。 0 ~ 10485760 10485760 auto.create.groups.enable 是否开启自动创建消费组功能。 true/false true max.connections.per.ip 每个IP允许的最大连接数。超过此连接数的连接请求将被丢弃。 100 ~ 20000 1000 unclean.leader.election.enable 指示是否启用不在ISR集合中的副本选为领导者作为最后的手段,即使这样做可能导致数据丢失。 true/false true 表静态参数说明(2.3.0/2.7版本) 参数 参数说明 参数范围 默认值 connections.max.idle.ms 此参数用来指定在多少毫秒之后,关闭空闲的连接。 5000 ~ 600000 600000 log.retention.hours 日志文件最大保存时间。单位为小时。如果Topic已经设置了老化时间,则此参数对此Topic不生效。仅在Topic未设置老化时间时,此参数才对此Topic生效。 1 ~ 168 72 group.max.session.timeout.ms consumer允许的最大会话超时时间,单位为ms。超时时间越长,consumer就能在心跳探测周期内有更多时间处理消息,但也会使故障检测花费更长时间。 6000 ~ 1800000 1800000 default.replication.factor 自动创建Topic时的默认副本个数。 1 ~ 3 3 num.partitions 自动创建Topic时的默认分区数。 1 ~ 100 3 group.min.session.timeout.ms consumer允许的最小会话超时时间,单位为ms。超时时间越短,consumer的心跳探测越频繁,可以使故障检测更快,但会导致broker被抢占更多的资源。 6000 ~ 300000 6000 说明 如果需要批量修改参数,单击“批量编辑”,可以一次性编辑多个动态参数或静态参数的运行值。 如果需要恢复默认值,在待修改参数后,单击“恢复默认”。 步骤 6 单击“保存”,完成参数的修改。 说明 动态参数修改成功后,无需重启实例,立即生效。 静态参数修改成功后,需要手动重启实例才能生效。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        修改配置参数
      • 分区动态处理
        操作场景 分布式消息服务 Kafka 版支持分区动态处理功能,当实例集群出现分区不平衡的情况时,可以手动进行分区均衡,重新分配分区在节点上的分布情况。也可以选择自动分区均衡 ,Kafka 会根据您设定的策略自动检查 Topic 的分区分布,自动分析选择业务低峰期发起分区均衡。 前提条件 自动分区均衡仅支持集群版实例。 自动分区均衡和Topic管理页面的分区平衡任务不能同时运行。 操作影响 分区均衡操作时可能引起流量抖动以及请求超时情况,建议在业务低峰期操作。 分区平衡后Topic的metadata会改变,如果生产者不支持重试机制,会有少量的请求失败,导致部分消息生产失败。 操作步骤 1. 进入Kafka实例管理控制台。 2. 在左侧菜单栏中选择智能运维中的弹性伸缩,进入弹性伸缩功能页面。 3. 在弹性伸缩页面的分区动态处理模块,可以选择开启或关闭自动化分区均衡。 4. 自动分区均衡支持两种策略,通过”配置“按钮可选择自定义时间或自动分析选择业务低峰期。 5. 单击”确定“,完成配置,再点击状态开关进行开启即可。 6. 可通过”查看“按钮或者直接进入智能运维中的后台任务管理功能页面查看自动化分区均衡调整记录详情。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        智能运维
        弹性伸缩
        分区动态处理
      • 支持的监控指标
        操作场景 天翼云分布式消息Kafka自集成了一整套监控方案,对Kafka实例的运行状态进行日常监控,可以通过管理控制台查看Kafka实例各项监控指标。各项监控指标可以分为实例监控、实例节点监控、主题监控、消费组监控和Connect监控,各项监控指标的具体细节如下表所示。 操作前提 已开通天翼云Kafka实例,且实例状态为“运行中” Kafka 实例是天翼云Ⅱ类资源池实例,目前Ⅱ类资源池包括:华东1、上海36、华北2、长沙42、武汉41、西安7、杭州7、青岛20、西南1、西南2、广州4、郑州5、华南2等 监控指标 监控项的数据聚合周期为1分钟,即1分钟计算一次,计算出来每秒字节数。您可以将该数据理解为一分钟内的平均值。 1. 实例监控 指标ID 指标名称 指标含义 取值范围 单位 currentbrokers 存活节点数 该指标用于统计Kafka实例中正常运行的实例节点数 0~50 Count currenttopics 主题数 该指标用于统计Kafka实例中已经创建的主题数量。 0~2000 Count currentpartitions 分区数 该指标用于统计Kafka实例中已经使用的分区数量。 0~2000 Count groupmsgs 堆积消息数 该指标用于统计Kafka实例中所有消费组中总堆积消息数。 >0 Count instancebytesinrate 生产流量 该指标用于统计Kafka实例中每秒生产的字节数。 >0 MB/s instancebytesoutrate 消费流量 该指标用于统计Kafka实例中每秒生产的字节数。 >0 MB/s instancemessagesinrate 消息生产速率 该指标用于统计实例每秒生产的消息数。 >0 Count/s instancemessagesoutrate 消息消费速率 该指标用于统计实例每秒消费的消息数。 注意:2025年1月及以后购买的实例,支持此监控项。 >0 Count/s instancerequestqueuesize 实例请求队列长度 该指标用于统计实例请求队列长度。 >0 Count instanceresponsequeuesize 实例响应队列长度 该指标用于统计实例响应队列长度。 >0 Count instanceconnectionusage 实例连接数使用率 该指标用于统计实例连接数使用率 注意:2025年6月及以后购买的实例,支持此监控项。 0~100 % instancetopicusage 实例用户主题数使用率 该指标用于统计实例租户主题使用率 注意:2025年6月及以后购买的实例,支持此监控项。 0~100 % instancepartitionusage 实例用户主题分区数使用率 该指标用于统计实例租户主题分区使用率 注意:2025年6月及以后购买的实例,支持此监控项。 0~100 % instancegroupusage 实例用户消费组数使用率 该指标用于统计实例租户消费组使用率 注意:2025年6月及以后购买的实例,支持此监控项。 0~100 % instanceproducelimit 实例生产限流次数 该指标用于统计实例生产限流次数 注意:2025年6月及以后购买的实例,支持此监控项。 >0 Count instanceconsumelimit 实例消费限流次数 该指标用于统计实例消费限流次数 注意:2025年6月及以后购买的实例,支持此监控项。 >0 Count
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        可观测
        监控信息
        支持的监控指标
      • 消费组列表
        介绍分布式消息服务Kafka消费组查看功能操作内容。 场景描述 在以下场景中,可以考虑查看Kafka的消费组列表: 监控消费组:通过查看消费组列表,可以监控和管理Kafka中的消费者。可以查看消费组的健康状态等信息,以确保消费者正常工作并及时发现潜在的问题。 动态调整消费者数量:通过查看消费组列表,可以了解当前的消费者数量和状态。根据实际需求,可以动态增加或减少消费者的数量,以适应不同的负载和流量变化。 操作步骤 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理” (4)点击“消费组管理”后出现如下图列表 (5)右上角输入消费组名称,可查询对应消费组 消费组状态说明见下表: 状态 说明 Empty 消费组没有分配到任何分区进行消费。 PreparingRebalance 消费组中有新的消费者加入或者有消费者离开,正在进行重新分配分区的准备工作。 CompletingRebalance 1. 重新分配分区的过程已经完成,消费组即将开始消费。 Stable 消费组中的消费者正常消费消息,并且消费进度与分区分配是一致的。 Dead 消费组已经停止工作,没有任何活跃的消费者。 PreparingSync 消费组正在准备同步消费进度,以确保消费者之间的一致性。 AwaitingSync 消费组中的消费者正在等待同步消费进度的完成。 Rebalancing 消费组正在进行重新分配分区的操作。 DeadAndEmpty 消费组已经停止工作,且没有分配到任何分区进行消费。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        消费组管理
        消费组列表
      • 使用SASL证书连接
        生产消息 进入Kafka客户端文件的“/bin”目录下,执行如下命令进行生产消息。 ./kafkaconsoleproducer.sh brokerlist ​{连接地址} topic 连接地址−−topic​{Topic名称} producer.config ../config/producer.properties 参数说明如下: 连接地址:从前提条件获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 示例如下,“10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095”为Kafka实例连接地址。 执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin] ./kafkaconsoleproducer.sh brokerlist 10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095 topic topicdemo producer.config ../config/producer.propertiesHello DMS Kafka! ^C[root@ecskafka bin] 如需停止生产使用Ctrl+C命令退出。 消费消息 执行如下命令消费消息。 ./kafkaconsoleconsumer.sh bootstrapserver ​{连接地址} topic 连接地址−−topic​{Topic名称} group ${消费组名称} frombeginning consumer.config ../config/consumer.properties 参数说明如下: 连接地址:从前提条件获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 消费组名称:根据您的业务需求,设定消费组名称。 如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败 。 消费组名称开头包含特殊字符,例如下划线“”、 号“ ”时,监控数据无法展示。 示例如下: [root@ecskafka bin]
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        步骤四:连接实例生产消费消息
        使用SASL证书连接
      • 操作类
        Kafka服务端支持版本是多少? Kafka 1.1.0和2.3.0版本。 创建的Kafka实例是集群模式么? 创建一个Kafka实例即为一个集群实例。 Kafka实例是否支持修改访问端口? Kafka实例的访问端口固定,不支持修改。 如果是访问未开启SASL的Kafka专享实例,访问端口为9092。 如果是访问开启SASL的Kafka专享实例,访问端口为9093。 在访问Kafka实例之前,需要确保安全组是否配置正确。 Kafka实例的SSL证书有效期多长? Kafka实例开启SASL时,需进行单向认证,证书有效期足够长(超过15年),客户端不需要关注证书过期风险。 如何将Kafka实例中的数据同步到另一个Kafka实例中? Kafka实例之间没有好的实时同步方案,如果需要做实例迁移,可以同时向两个实例生产消息,源实例中的消息可继续消费,待源实例的消息数据全部被消费完或老化后,业务可迁移到新的Kafka实例。 Kafka实例的SASLSSL开关如何修改? Kafka SASLSSL开关不支持创建实例后修改,在创建时,请慎重选择,如果创建后需要修改,需要重新创建实例。 购买实例时选择的单AZ,怎样可以扩展为多AZ? 已购买的实例无法扩展AZ,请重新购买多AZ的实例。 Kafka扩容会影响业务吗? Kafka扩容带宽/存储空间,都不会影响业务的使用。 Kafka实例创建后,能修改VPC和子网吗? 不能修改VPC和子网。 Kafka实例版本可以升级吗? Kafka实例创建成功后,实例版本不支持升级。您可以重新创建Kafka实例,实现升级Kafka实例的版本。
        来自:
        帮助文档
        专属云分布式消息服务Kafka
        常见问题
        操作类
      • 通知外送
        配置项 说明 名称 日志外送接口的名称。必须为中文字符、字母、数字、“”、“.”或“”,长度不超过 64 字符。 Kafka节点地址 Kafka服务器的IP(域名)及端口号。例如:192.168.0.1:9200。 Kafka主题 消息投放到Kafka服务器的主题。 Kafka分区 消息投放到的Kafka服务器的分区。Kafka服务器通过主题(topic)、分区(partition)和消费组(consumergroup)三个概念灵活适应各种消息场合,通过提升硬件资源利用率提高系统吞吐量。 以上Kafka相关配置与服务器端保持一致即可。 审计日志模板 设置发送审计日志的模板,具体字段请依据填写说明编辑。 操作日志模板 设置发送操作日志的模板,具体字段请依据填写说明编辑。 告警日志模板 设置发送告警日志的模板,具体字段请依据填写说明编辑。 流量控制日志模板 设置发送流量控制日志的模板,具体字段请依据填写说明编辑。
        来自:
        帮助文档
        数据安全专区
        用户指南
        API安全网关操作指导
        通知外送
      • 监控告警问题
        本文主要介绍监控告警问题。 云监控无法展示Kafka监控数据 Topic监控数据无法展示,可能原因如下: Topic名称开头包含特殊字符,例如下划线“”、号“”。 Kafka实例中没有创建Topic。 解决方法如下: 删除带特殊字符的Topic。 创建Topic。 消费组监控数据无法展示,可能原因如下: 消费组名称开头包含特殊字符,例如下划线“”、号“”。 此消费组从未有消费者连接。 Kafka监控显示消息堆积数跟实例里的消息数不一致? 问题现象 :监控显示消息堆积数为8.1亿+,Kafka控制台显示实例中6个Topic的消息数总和为1亿+,两者不一致。 问题结论 :两者统计方式不同,Kafka控制台显示的消息数为实例中未消费的消息个数,而监控显示的消息堆积数Topic中的消息积压数消费组数。 Kafka的消费组删除了,怎么监控页面还可以看到这个消费组? 监控数据是每分钟进行采集上报,上报的数据经过整理后才会显示在监控页面上,此过程大约需要几分钟到十几分钟,建议您在删除消费组后,过一段时间再去监控页面查看。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        监控告警问题
      • 续费、到期与欠费
        本节主要介绍分布式消息服务Kafka的续费、到期与欠费说明 到期前续费 手动续订:对于包年/包月订购的分布式消息服务Kafka实例,用户在资源到期前进行续费操作,可以延长原有资源到期时间,避免资源到期后冻结或超过保留期后被系统回收。详细操作请参考费用中心续订管理手动续订。 自动续订:自动续订仅针对采用包月、包年计费模式的资源,详细操作请参考费用中心续订管理自动续订。 到期处理 到期后,分布式消息服务Kafka进入保留期,您将不能正常访问及使用天翼云分布式消息服务Kafka服务,已开通的实例资源将予以保留。 若您在到期后15天内续费,自资源续订解冻开始,计算新的服务有效期,按照新的服务有效期计算费用; 若到期15天后您仍未续费,Kafka实例资源将被释放。 欠费原因 在按需计费的模式下帐号的余额不足。 按需欠费资源冻结规则 欠费后,资源进入保留期,您将不能正常访问及使用分布式消息服务Kafka,已开通的实例资源将予以保留。 若您在保留期内充值,充值后系统会自动扣减欠费金额。 若保留期到期您仍未充值,Kafka实例资源将被释放。
        来自:
        帮助文档
        分布式消息服务Kafka
        计费说明
        续费、到期与欠费
      • KafkaConsumer监控
        指标类别 指标 指标名称 指标说明 单位 数据类型 默认聚合方式 主题 (topic,kafka的topic监控数据。) id id clientid和ip信息 ENUM LAST 主题 (topic,kafka的topic监控数据。) topic topic kafka的topic名称 ENUM LAST 主题 (topic,kafka的topic监控数据。) bytesConsumedRate 每秒消费字节 每秒消费字节 Byte INT AVG 主题 (topic,kafka的topic监控数据。) fetchSizeAvg 请求获取平均字节 请求获取平均字节 Byte INT AVG 主题 (topic,kafka的topic监控数据。) fetchSizeMax 请求获取最大字节 请求获取最大字节 Byte INT MAX 主题 (topic,kafka的topic监控数据。) recordsConsumedRate 每秒消费消息数 每秒消费消息数 INT AVG 主题 (topic,kafka的topic监控数据。) recordsPerRequestAvg 单次请求平均消息数 单次请求平均消息数 INT AVG 主题 (topic,kafka的topic监控数据。) seqIds Producer生成序列号 Producer生成序列号 STRING LAST 主题 (topic,kafka的topic监控数据。) recordConsumedTotal 总消费次数 总消费次数 INT SUM 主题 (topic,kafka的topic监控数据。) bytesConsumedTotal 总消费字节数 总消费字节数 INT SUM fetch (fetch,kafka的fetch监控数据) id id clientid和ip信息 ENUM LAST fetch (fetch,kafka的fetch监控数据) bytesConsumedRate 每秒消费字节 每秒消费字节 Byte INT AVG fetch (fetch,kafka的fetch监控数据) fetchLatencyAvg 请求平均时延 请求平均时延 ms INT AVG fetch (fetch,kafka的fetch监控数据) fetchLatencyMax 请求最大时延 请求最大时延 ms INT MAX fetch (fetch,kafka的fetch监控数据) fetchRate 每秒请求数 每秒请求数 INT AVG fetch (fetch,kafka的fetch监控数据) fetchSizeAvg 请求获取平均字节 请求获取平均字节 Byte INT AVG fetch (fetch,kafka的fetch监控数据) fetchSizeMax 请求获取最大字节 请求获取最大字节 Byte INT MAX fetch (fetch,kafka的fetch监控数据) recordsConsumedRate 每秒消费消息数 每秒消费消息数 INT AVG fetch (fetch,kafka的fetch监控数据) recordsLagMax 最大堆积消息数 最大堆积消息数 INT MAX fetch (fetch,kafka的fetch监控数据) recordsPerRequestAvg 单次请求平均消息数 单次请求平均消息数 INT AVG fetch (fetch,kafka的fetch监控数据) seqIds Producer生成序列号 Producer生成序列号 STRING LAST fetch (fetch,kafka的fetch监控数据) recordConsumedTotal 总消费次数 总消费次数 INT SUM fetch (fetch,kafka的fetch监控数据) bytesConsumedTotal 总消费字节数 总消费字节数 INT SUM partition (partition,kafka的partition监控数据。) id id clientid和ip信息 ENUM LAST partition (partition,kafka的partition监控数据。) partition partition kafka的partition名称 ENUM LAST partition (partition,kafka的partition监控数据。) recordsLag 堆积消息数 堆积消息数 INT LAST partition (partition,kafka的partition监控数据。) recordsLagAvg 平均堆积消息数 平均堆积消息数 INT AVG partition (partition,kafka的partition监控数据。) recordsLagMax 最大堆积消息数 最大堆积消息数 INT MAX partition (partition,kafka的partition监控数据。) seqIds Producer生成序列号 Producer生成序列号 STRING LAST kafka消费方法监控 (consumer,kafka消费方法监控。) method method 消费方法 ENUM LAST kafka消费方法监控 (consumer,kafka消费方法监控。) concurrentMax 最大并发 最大并发 INT MAX kafka消费方法监控 (consumer,kafka消费方法监控。) errorCount 错误数 错误数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) invokeCount 调用次数 调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) lastError 错误信息 发生错误时产生的错误信息 STRING LAST kafka消费方法监控 (consumer,kafka消费方法监控。) maxTime 最大响应时间 采集周期内最大响应时间 INT MAX kafka消费方法监控 (consumer,kafka消费方法监控。) range1 010ms 响应时间在010ms范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range2 10100ms 响应时间在10100ms范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range3 100500ms 响应时间在100500ms范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range4 5001000ms 响应时间在5001000ms范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range5 110s 响应时间在110s范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range6 10s以上 响应时间在10s以上调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) totalTime 总响应时间 总响应时间 INT SUM KafkaConsumer汇总(total,KafkaConsumer汇总信息统计。) recordConsumedTotal 总消费次数 总消费次数 INT SUM KafkaConsumer汇总(total,KafkaConsumer汇总信息统计。) bytesConsumedTotal 总消费字节数 总消费字节数 INT SUM KafkaConsumer汇总(total,KafkaConsumer汇总信息统计。) recordsLag 总堆积消息数 总堆积消息数 INT LAST 异常 (exception,kafka消费异常信息。) causeType 异常发生类 异常发生类 ENUM LAST 异常 (exception,kafka消费异常信息。) exceptionType 异常类 异常类 ENUM LAST 异常 (exception,kafka消费异常信息。) count 数量 异常数量 INT SUM 异常 (exception,kafka消费异常信息。) message 异常消息 异常消息 STRING LAST 异常 (exception,kafka消费异常信息。) stackTrace 异常堆栈 异常堆栈 CLOB LAST
        来自:
        帮助文档
        应用性能管理
        产品介绍
        指标总览
        消息队列
        KafkaConsumer监控
      • SDK下载及使用说明
        该页面主要介绍分布式消息服务Kafka版的SDK,您可以根据实际业务需求进行集成。 各语言SDK下载及使用 支持语言 SDK附件 Java ctyunctgkafkajavasdkv1.0.1.zip Go ctyunctgkafkagosdkv1.0.1.zip 说明 由于运行环境的多样性,部分接口在某些特定配置下可能存在适配差异,我们的技术团队将持续优化兼容性。如有问题,可提单咨询。
        来自:
        帮助文档
        分布式消息服务Kafka
        API参考
        SDK下载及使用说明
      • 产品规格
        专属云Kafka的产品服务与公有云Kafka的产品规格保持一致,由两种服务资源组成,队列规格、队列存储。 其中队列计算规格按消息队列基准带宽分为4种分别是:100MB/s、300MB/s、600MB/s、1200MB/s; 存储类型分2种,分别如下:高I/O、超高I/O。
        来自:
        帮助文档
        专属云分布式消息服务Kafka
        计费说明
        产品规格
      • 步骤二:创建Kafka实例
        本文主要介绍 步骤二:创建Kafka实例。 前提条件 在创建Kafka实例前,需要保证存在可使用的虚拟私有云。创建方法,请参考《虚拟私有云 用户指南》的“创建虚拟私有云和子网”。 如果您已有虚拟私有云,可重复使用,不需要多次创建。 操作步骤 步骤 1 登录分布式消息服务Kafka控制台,单击页面右上方的“购买Kafka实例”。 步骤 2 选择计费模式。 步骤 3 在“区域”下拉列表中,选择靠近您应用程序的区域,可降低网络延时、提高访问速度。 步骤 4 在“项目”下拉列表中,选择项目。 步骤 5 在“可用区”区域,根据实际情况选择1个或者3个及以上可用区。 步骤 6 设置“实例名称”和“企业项目”。 步骤 7 设置实例信息。 1. 版本:Kafka的版本号,支持1.1.0、2.7和3.x,根据实际情况选择,推荐使用3.x。 Kafka实例创建后,版本号不支持修改 。 2. 鲲鹏实例,“创建鲲鹏架构实例”选框,默认不勾选是X86架构的实例,勾选之后是ARM架构的鲲鹏实例。 3. 在“代理规格”中,请根据业务需求选择相应的代理规格。在“代理数量”中,选择代理个数。 单个代理最大分区数代理个数实例分区数上限。当所有Topic的总分区数大于实例分区数上限时,创建Topic失败。 4. 在“存储空间”区域,您根据实际需要选择存储Kafka数据的磁盘类型和总磁盘大小。 Kafka实例创建后,磁盘类型不支持修改 。 存储空间包含所有副本存储空间总和,建议根据业务消息体积以及副本数量选择存储空间大小。假设业务存储数据保留天数内磁盘大小为100GB,则磁盘容量最少为100GB副本数 + 预留磁盘大小100GB。 创建实例时会进行磁盘格式化,磁盘格式化会导致实际可用磁盘为总磁盘的93%~95%。 5. 在“容量阈值策略”区域,设置磁盘使用达到容量阈值后的消息处理策略,容量阈值为95%。 自动删除:可以正常生产和消费消息,但是会删除最早的10%的消息,以保证磁盘容量充足。该场景优先保障业务不中断,数据存在丢失的风险。 生产受限:无法继续生产消息,但可以继续消费消息。该场景适用于对数据不能丢的业务场景,但是会导致生产业务失败。 图 创建Kafka实例 步骤 8 设置实例网络环境信息。 1. 在“虚拟私有云”下拉列表,选择已经创建好的虚拟私有云和子网。 说明 虚拟私有云和子网在Kafka实例创建完成后,不支持修改。 子网开启IPv6后,Kafka实例支持IPv6功能。Kafka实例开启IPv6后,客户端可以使用IPv6地址连接实例。 子网开启IPv6后,页面才展示此参数。开启IPv6后,客户端可以使用IPv6地址连接实例。 开启IPv6的实例不支持动态开启和关闭SASLSSL功能。 实例创建成功后,不支持修改IPv6开关。 3. 在“安全组”下拉列表,可以选择已经创建好的安全组。 安全组是一组对Kafka实例的访问规则的集合。您可以单击右侧的“管理安全组”,跳转到网络控制台的“安全组”页面,查看或创建安全组。 步骤 9 设置登录Kafka Manager的用户名和密码。创建实例后,Kafka Manager用户名无法修改。 Kafka Manager是开源的Kafka集群管理工具,实例创建成功后,实例详情页面会展示Kafka Manager登录地址,您可登录Kafka Manager页面,查看Kafka集群的监控、代理等信息。 步骤 10 设置实例购买时长。 当选择了“包年/包月”付费模式时,页面才显示“购买时长”参数,您需要根据业务需要选择。 步骤 11 单击“更多配置”,设置更多相关信息。 1. 设置“公网访问”。 “公网访问”默认为关闭状态,根据业务需求选择是否开启。开启公网访问后,还需要为每个代理设置对应的IPv4弹性IP地址。 图 设置公网访问开关 2. 设置“Kafka SASLSSL”。 客户端连接Kafka实例时,是否开启SSL认证。开启Kafka SASLSSL,则数据加密传输,安全性更高。 “Kafka SASLSSL”默认为关闭状态,您可以选择是否开启。 Kafka实例创建后,Kafka SASLSSL开关不支持修改 ,请慎重选择。如果创建后需要修改,需要重新创建实例。 开启Kafka SASLSSL后,您可以选择是否开启“SASL PLAIN 机制”。未开启“SASL PLAIN 机制”时,使用SCRAMSHA512机制传输数据,开启“SASL PLAIN 机制”后,同时支持SCRAMSHA512机制和PLAIN机制,根据实际情况选择其中任意一种配置连接。Kafka实例创建后,SASL PLAIN机制开关不支持修改。 什么是SCRAMSHA512机制和PLAIN机制? SCRAMSHA512机制:采用哈希算法对用户名与密码生成凭证,进行身份校验的安全认证机制,比PLAIN机制安全性更高。 PLAIN机制:一种简单的用户名密码校验机制。 开启Kafka SASLSSL后,您需要设置连接Kafka实例的用户名和密码。 3. 设置“Kafka自动创建Topic”。 “Kafka自动创建Topic”默认为关闭状态,您可以选择是否开启。 开启“Kafka自动创建Topic”表示生产或消费一个未创建的Topic时,系统会自动创建此Topic,此Topic的默认参数值如下:分区数为3,副本数为3,老化时间为72小时,不开启同步复制和同步落盘。 如果在“配置参数”中修改“log.retention.hours”、“default.replication.factor”或“num.partitions”的参数值,此后自动创建的Topic参数值为修改后的参数值。例如:“num.partitions”修改为“5”,自动创建的Topic参数值如下:分区数为5,副本数为3,老化时间为72小时,不开启同步复制和同步落盘。 4. 设置“标签”。 标签用于标识云资源,当您拥有相同类型的许多云资源时,可以使用标签按各种维度(例如用途、所有者或环境)对云资源进行分类。 如果您已经预定义了标签,在“标签键”和“标签值”中选择已经定义的标签键值对。另外,您可以单击“查看预定义标签”,跳转到标签管理服务页面,查看已经预定义的标签,或者创建新的标签。 您也可以直接在“标签键”和“标签值”中设置标签。 当前每个Kafka实例最多支持设置20个不同标签,标签的命名规格,请参考管理实例标签。 5. 设置实例的描述信息。 步骤 12 填写完上述信息后,单击“立即购买”,进入规格确认页面。 步骤 13 确认实例信息无误后,提交请求。 步骤 14 单击“返回Kafka专享版”,查看Kafka实例是否创建成功。 创建实例大约需要3到15分钟,此时实例的“状态”为“创建中”。 当实例的“状态”变为“运行中”时,说明实例创建成功。 如果创建实例失败,在信息栏的“创建失败任务”中查看创建失败的实例。请删除创建失败的实例,然后重新创建。如果重新创建仍然失败,请联系客服。 说明 创建失败的实例,不会占用其他资源。
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        步骤二:创建Kafka实例
      • KafkaProducer
        介绍APM监控详情里消息监控中的KafkaProducer相关指标的名称、含义等信息。 KafkaProducer监控指标说明表 指标类别 指标 指标说明 数据类型 异常 exceptionstacktrace 异常产生的堆栈信息 String 异常 exceptiontype 异常类型 String 异常 exceptioncount 错误数 Int Topic (Topic,Kafka的Topic监控数据) id Clientid和IP信息 Enum Topic (Topic,Kafka的Topic监控数据) topic Kafka的Topic名称 Enum Topic (Topic,Kafka的Topic监控数据) kafkaproducerbyterate 每秒发送字节 Int Topic (Topic,Kafka的Topic监控数据) kafkaproducerrecorderrorrate 每秒错误数 Int Topic (Topic,Kafka的Topic监控数据) kafkaproducerrecordretryrate 每秒重试数 Int Topic (Topic,Kafka的Topic监控数据) kafkaproducerrecordsendrate 每秒发送数 Int Topic (Topic,Kafka的Topic监控数据) kafkaproducerrecordsendtotal 总发送次数 Int Topic (Topic,Kafka的Topic监控数据) kafkaproducerbytetotal 总发送字节数 Int 汇总(Total,KafkaProducer汇总信息统计) kafkaproducerrecordsendtotal 总发送次数 Int 汇总(Total,KafkaProducer汇总信息统计) kafkaproducerbytetotal 总发送字节数 Int 发送方法(DoSendMethod,发送消息方法监控) topic Kafka的Topic名称 Enum 发送方法(DoSendMethod,发送消息方法监控) errorCount 错误数 Int 发送方法(DoSendMethod,发送消息方法监控) invokeCount 调用次数 Int 发送方法(DoSendMethod,发送消息方法监控) maxTime 最慢时延 Int 发送方法(DoSendMethod,发送消息方法监控) ms0To10Count 010ms次数 Int 发送方法(DoSendMethod,发送消息方法监控) ms10To100Count 10100ms次数 Int 发送方法(DoSendMethod,发送消息方法监控) ms100To500Count 100500ms次数 Int 发送方法(DoSendMethod,发送消息方法监控) ms500To1000Count 5001000ms次数 Int 发送方法(DoSendMethod,发送消息方法监控) ms1000To10000Count 110s次数 Int 发送方法(DoSendMethod,发送消息方法监控) msMorethan10000Count 10s以上次数 Int 发送方法(DoSendMethod,发送消息方法监控) totalTime 调用总耗时 Int
        来自:
        帮助文档
        应用性能监控 APM
        产品介绍
        指标总览
        消息监控
        KafkaProducer
      • Python
        环境安装 1. 安装Python。(Python版本为2.7或3.X。) 2. 安装依赖库。(使用公网连接需要安装confluentkafka 1.9.2及以下版本的依赖库) pip install confluentkafka1.9.2 3. 下载Demo包kafkaconfluentpythondemo.zip。 配置修改 1. 如果是ssl连接,需要在控制台下载证书。并且解压压缩包得到ssl.client.truststore.jks,执行以下命令生成caRoot.pem文件。 keytool importkeystore srckeystore ssl.client.truststore.jks destkeystore caRoot.p12 deststoretype pkcs12 openssl pkcs12 in caRoot.p12 out caRoot.pem 2. 修改setting.py文件。(calocation仅在ssl连接时需要配置) kafkasetting { 'bootstrapservers': 'XXX', 'topicname': 'XXX', 'groupname': 'XXX' } 生产消息 发送以下命令发送消息。 python kafkaproducer.py 生产消息示例代码如下: from confluentkafka import Producer import setting conf setting.kafkasetting """初始化一个 Producer 对象""" p Producer({'bootstrap.servers': conf['bootstrapservers']}) def deliveryreport(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) """异步发送消息""" p.produce(conf['topicname'], "Hello".encode('utf8'), callbackdeliveryreport) p.poll(0) """在程序结束时,调用flush""" p.flush() 消费消息 发送以下命令消费消息。 python kafkaconsumer.py 消费消息示例代码如下: from confluentkafka import Consumer, KafkaError import setting conf setting.kafkasetting c Consumer({ 'bootstrap.servers': conf['bootstrapservers'], 'group.id': conf['groupname'], 'auto.offset.reset': 'latest' }) c.subscribe([conf['topicname']]) while True: msg c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() KafkaError.PARTITIONEOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf8'))) c.close()
        来自:
        帮助文档
        分布式消息服务Kafka
        开发指南
        Python
      • Kafka性能白皮书
        步骤四:购买客户端服务器 购买3台ECS服务器(资源池、可用区、虚拟私有云、子网、安全组与Kafka实例保持一致,带宽要大于等于Kafka实例带宽) 购买完成后需要进行如下操作: 安装JDK yum install y java1.8.0openjdkdevel.x8664 下载Kafka命令行工具并解压 tar zxvf kafka2.132.8.2.tar.gz 步骤五:测试命令 ./kafkaproducerperftest.sh producerprops bootstrap.servers${连接地址} acks1 batch.size16384 topic ${Topic名称} numrecords 5000000 recordsize 1024 throughput 1 producer.config ../config/producer.properties bootstrap.servers:购买Kafka实例后,获取的Kafka实例的地址。 acks:消息主从同步策略,acks1表示异步复制消息,acks1表示同步复制消息。 batch.size:每次批量发送消息的大小(单位为字节)。 topic:创建Topic中设置的Topic名称。 numrecords:总共需要发送的消息数。 recordsize:每条消息的大小。 throughput:每秒发送的消息数,1表示不作限制。 测试结果 测试场景一(实例是否开启SASL):相同的Topic(30分区,3副本,异步复制) 实例规格 磁盘类型 节点数量 TPS(使用SASL) TPS(不使用SASL) 计算增强型4核8GB 超高IO 3 170000 500000 计算增强型8核16GB 超高IO 3 200000 730000 计算增强型16核32GB 超高IO 3 360000 886000 测试场景二(同步/异步复制):相同的实例(超高I/O、3个节点、不使用SASL) 实例规格 分区数 副本数 TPS(同步复制) TPS(异步复制) 计算增强型4核8GB 30 3 238000 500000 计算增强型8核16GB 30 3 315000 730000 计算增强型16核32GB 30 3 375000 886000 测试场景三(不同磁盘类型):相同的Topic(30分区,3副本,异步复制) 实例规格 是否使用SASL 节点数量 TPS(高IO) TPS(超高IO) 计算增强型4核8GB 不使用 3 135000 500000 计算增强型8核16GB 不使用 3 240000 730000 计算增强型16核32GB 不使用 3 280000 886000 测试场景四(不同分区数):相同的实例(超高I/O、3个节点、不使用SASL) 实例规格 是否同步复制 副本数 TPS(3分区) TPS(12分区) TPS(100分区) 计算增强型4核8GB 否 3 330000 280000 260000 计算增强型8核16GB 否 3 480000 410000 340000 计算增强型16核32GB 否 3 534000 744000 630000
        来自:
        帮助文档
        分布式消息服务Kafka
        性能白皮书
        Kafka性能白皮书
      • KafkaProducer监控
        指标类别 指标 指标名称 指标说明 单位 数据类型 默认聚合方式 topic (topic,kafka的topic监控数据。) id id clientid和ip信息 ENUM LAST topic (topic,kafka的topic监控数据。) topic topic kafka的topic名称 ENUM LAST topic (topic,kafka的topic监控数据。) byteRate 每秒发送字节 每秒发送字节 Byte INT AVG topic (topic,kafka的topic监控数据。) recordErrorRate 每秒错误数 每秒错误数 INT AVG topic (topic,kafka的topic监控数据。) recordRetryRate 每秒重试数 每秒重试数 INT AVG topic (topic,kafka的topic监控数据。) recordSendRate 每秒发送数 每秒发送数 INT AVG topic (topic,kafka的topic监控数据。) seqIds Producer生成序列号 Producer生成序列号 STRING LAST topic (topic,kafka的topic监控数据。) recordSendTotal 总发送次数 总发送次数 INT SUM topic (topic,kafka的topic监控数据。) byteTotal 总发送字节数 总发送字节数 INT SUM KafkaProducer汇总(total,KafkaProducer汇总信息统计。) recordSendTotal 总发送次数 总发送次数 INT SUM KafkaProducer汇总(total,KafkaProducer汇总信息统计。) byteTotal 总发送字节数 总发送字节数 INT SUM 异常 (exception,kafka发送异常信息。) causeType 异常发生类 异常发生类 ENUM LAST 异常 (exception,kafka发送异常信息。) exceptionType 异常类 异常类 ENUM LAST 异常 (exception,kafka发送异常信息。) count 数量 异常数量 INT SUM 异常 (exception,kafka发送异常信息。) message 异常消息 异常消息 STRING LAST 异常 (exception,kafka发送异常信息。) stackTrace 异常堆栈 异常堆栈 CLOB LAST 发送方法(doSendMethod,发送消息方法监控。) topic topic topic ENUM LAST 发送方法(doSendMethod,发送消息方法监控。) concurrentMax 最大并发 最大并发 INT MAX 发送方法(doSendMethod,发送消息方法监控。) errorCount 错误数 错误数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) invokeCount 调用次数 调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) maxTime 最慢时延 最慢时延 INT MAX 发送方法(doSendMethod,发送消息方法监控。) range1 0–10ms 时延在010ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range2 10–100ms 时延在10–100ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range3 100–500ms 时延在100–500ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range4 500–1000ms 时延在500–1000ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range5 1–10s 时延在1–10s范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range6 10sn 时延在10s以上调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) totalTime 总时延 调用总耗时 INT SUM
        来自:
        帮助文档
        应用性能管理
        产品介绍
        指标总览
        消息队列
        KafkaProducer监控
      • 使用限制
        介绍分布式消息服务Kafka产品功能的使用限制。 限制项 使用说明 实例 开通后,不支持修改VPC/子网/可用区,不支持变更地域属性 扩缩容 当前只支持节点、规格、磁盘扩容,不支持缩容 版本 当前服务端版本为2.132.8.2。实例创建后,服务端版本不支持升级,不支持定制版本 弹性ip 支持对每个节点绑定弹性ip,可以在公网访问topic,弹性ip只能通过SASL连接访问 topic创建 默认不支持自动创建,需要在页面上创建,总数不超过100个 topic分区数 只能增加不能减少,总分区数不能超过2000个,分区数过多会导致磁盘碎片化,影响性能 私有主题 支持私有主题 消息大小 默认1MB,可自定义配置 批量导入/导出 支持topic、消费组、连接用户的批量导入 私有消费组 支持私有消费组,需要在页面上创建、订阅对应的私有主题,并对连接用户授权
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        使用限制
      • 服务等级目标(SLO)
        产品名称 可用性SLO 弹性云主机 99.995% 物理机 99% 弹性伸缩服务 99.95% 云硬盘 99.999% 弹性文件服务 99.9% 对象存储(经典I型) 99.99% 对象存储(原生版)I型 99.95% 媒体存储 99.99% 弹性IP 99% 弹性负载均衡 99.95% VPN连接 99.95% 分布式缓存Redis 99.95% 分布式消息RocketMQ 99.95% 分布式消息RabbitMQ 99.95% 分布式消息Kafka 99.95% CDN加速 99.90%
        来自:
        帮助文档
        服务等级目标
        天翼云服务等级目标
        服务等级目标(SLO)
      • DMS for Kafka自定义策略
        本文主要介绍 DMS for Kafka自定义策略。 如果系统预置的DMS for Kafka权限,不满足您的授权要求,可以创建自定义策略。 目前云服务平台支持以下两种方式创建自定义策略: 可视化视图创建自定义策略:无需了解策略语法,按可视化视图导航栏选择云服务、操作、资源、条件等策略内容,可自动生成策略。 JSON视图创建自定义策略:可以在选择策略模板后,根据具体需求编辑策略内容;也可以直接在编辑框内编写JSON格式的策略内容。 具体创建步骤请参见:《统一身份认证服务用户指南》的“创建自定义策略”章节。本章为您介绍常用的DMS for Kafka自定义策略样例。 说明 DMS for Kafka的权限与策略基于分布式消息服务DMS,因此在IAM服务中为DMS for Kafka分配用户与权限时,请选择并使用“DMS”的权限与策略。 由于缓存的存在,对用户、用户组以及企业项目授予OBS相关的细粒度策略后,大概需要等待5分钟细粒度策略才能生效。 DMS自定义策略样例 示例1:授权用户删除实例和重启实例 { "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "dms:instance:modifyStatus", "dms:instance:delete" ] } ] } 示例2:拒绝用户删除实例 拒绝策略需要同时配合其他策略使用,否则没有实际作用。用户被授予的策略中,一个授权项的作用如果同时存在Allow和Deny,则遵循Deny优先。 如果您给用户授予DMS FullAccess的系统策略,但不希望用户拥有DMS FullAccess中定义的删除实例权限,您可以创建一条拒绝删除实例的自定义策略,然后同时将DMS FullAccess和拒绝策略授予用户,根据Deny优先原则,则用户可以对DMS for Kafka执行除了删除实例外的所有操作。拒绝策略示例如下: { "Version": "1.1", "Statement": [ { "Effect": "Deny", "Action": [ "dms:instance:delete" ] } ] }
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        权限管理
        DMS for Kafka自定义策略
      • 1
      • ...
      • 3
      • 4
      • 5
      • 6
      • 7
      • ...
      • 531
      跳转至
      推荐热词
      天翼云运维管理审计系统天翼云云服务平台云服务备份云日志服务应用运维管理云手机云电脑天翼云云hbase数据库电信云大数据saas服务电信云大数据paas服务轻量型云主机天翼云客户服务电话应用编排服务天翼云云安全解决方案云服务总线CSB天翼云服务器配置天翼云联邦学习产品天翼云云安全天翼云企业上云解决方案天翼云产品天翼云视频云存储

      天翼云最新活动

      安全隔离版OpenClaw

      OpenClaw云服务器专属“龙虾“套餐低至1.5折起

      青云志云端助力计划

      一站式科研助手,海外资源安全访问平台,助力青年翼展宏图,平步青云

      云上钜惠

      爆款云主机全场特惠,2核4G只要1.8折起!

      中小企业服务商合作专区

      国家云助力中小企业腾飞,高额上云补贴重磅上线

      出海产品促销专区

      爆款云主机低至2折,高性价比,不限新老速来抢购!

      天翼云奖励推广计划

      加入成为云推官,推荐新用户注册下单得现金奖励

      产品推荐

      弹性云主机 ECS

      多活容灾服务

      轻量型云主机

      AI Store

      公共算力服务

      模型推理服务

      一站式智算服务平台

      智算一体机

      人脸检测

      推荐文档

      产品规格

      退订

      管理只读实例

      操作类

      文档下载

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 天翼云国际站
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2026 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号