爆款云主机低至25.83元/年
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 云聚517 · 好价翼起拼 NEW 爆款云主机低至25.83元/年,参与拼团享更多优惠,拼成得额外优惠券
  • 安全隔离版OpenClaw NEW OpenClaw云服务器专属“龙虾“套餐低至1.5折起
  • 聚力AI赋能 天翼云大模型专项 大模型特惠专区·Token Plan 轻享包低至9.9元起
  • 青云志云端助力计划 NEW 一站式科研助手,海外资源安全访问平台,助力青年翼展宏图,平步青云
  • 企业出海解决方案 NEW 助力您的业务扬帆出海,通达全球!
  • 天翼云信创专区 NEW “一云多芯、一云多态”,国产化软件全面适配,国产操作系统及硬件芯片支持丰富
  • 中小企业服务商合作专区 国家云助力中小企业腾飞,高额上云补贴重磅上线
  • 云上钜惠 爆款云主机全场特惠,2核4G只要1.8折起!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

息壤智算

领先开放的智算服务平台,提供算力、平台、数据、模型、应用“五位一体”智算服务体系,构建全流程的AI基础设施能力
AI Store
  • 算力市场
  • 模型市场
  • 应用市场
公共算力服务
  • 裸金属
  • 定制裸金属
训推服务
  • 模型开发
  • 训练任务
  • 服务部署
Token服务
  • 模型广场
  • 体验中心
  • 服务接入
应用托管
  • 应用实例
科研助手
  • 科研智能体
  • 科研服务
  • 开发机
  • 并行计算
大模型
  • DeepSeek-V4-Flash
  • GLM-5.1
  • Qwen3.5-122B-A10B
  • DeepSeek-V3.2(旗舰版)
  • GLM-5(正式版)
智算一体机
  • 智算一体机
智能体引擎
  • 智能体引擎
模型适配专家服务
  • 模型适配专家服务
算力服务商
  • 入驻算力服务商

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场进入AI Store创新解决方案公有云生态专区智云上海应用生态专区
建站工具
  • 新域名服务
  • SSL证书
  • 翼建站
企业办公
  • 安全邮箱
  • WPS 365 天翼云版
  • 天翼企业云盘(标准服务版)
灾备迁移
  • 云管家2.0
  • 翼备份(SaaS版)

定价

协助您快速了解云产品计费模式、价格详情,轻松预估上云成本
价格计算器
  • 动态测算产品价格
定价策略
  • 快速了解计费模式

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼信创云专区
  • 信创云专区
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
培训与认证
  • 天翼云学堂
  • 天翼云认证
开源社区
  • 魔乐社区
  • OpenTeleDB

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 服务保障
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家
我要反馈
  • 建议与反馈
  • 用户体验官
信息公告
  • 客户公告

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2026智能云生态大会
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 息壤智算
  • 产品
  • 解决方案
  • 应用商城
  • 定价
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      专属云分布式消息服务Kafka_相关内容
      • 消费失败
        Kafka 是按分区一条一条消息顺序向前推进消费的,如果消费端拿到某条消息后执行消费逻辑失败,比如应用服务器出现了脏数据,导致某条消息处理失败,等待人工干预,那么有以下两种处理方式: 失败后一直尝试再次执行消费逻辑。这种方式有可能造成消费线程阻塞在当前消息,无法向前推进,造成消息堆积。 由于 Kafka 自身没有处理失败消息的设计,实践中通常会打印失败的消息、或者存储到某个服务(比如创建一个 Topic 专门用来放失败的消息),然后定时 check 失败消息的情况,分析失败原因,根据情况处理。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        消费者实践
        消费失败
      • 消息类问题
        消息超过老化时间,消息仍存在的原因 问题现象: 消息超过设置的老化时间(如果Topic已经设置了老化时间,此时“配置参数”中的log.retention.hours值将不对此Topic生效。仅在Topic中未设置老化时间时,“配置参数”中的log.retention.hours值才会对此Topic生效。),消息仍存在。 可能原因1: Topic的每个分区都是由多个大小相同的segment文件组成,每个segment文件的大小为500MB,当segment文件存储的消息大小到达500MB后,才会新建下一个segment文件。Kafka删除消息是删除segment文件,而不是删除一条消息。Kafka要求至少保留一个segment文件用来存储消息,如果正在使用的segment文件中包含超过老化时间的消息,由于此时segment文件不会被删除,所以超过老化时间的消息也不会被删除。 处理方法: 等待segment文件被使用完,或者删除超过老化时间的消息所在的Topic。 可能原因2: Topic中存在一条create time为未来时间的消息(例如当前时间为1月1日,create time设置成了2月1日),此消息在72小时后,并不会被老化,导致在此消息后创建的其他消息都不会被老化。 处理方法: 删除create time为未来时间的消息所在的Topic。 Kafka实例是否支持延迟消息? 不支持延迟消息。 如何查看堆积消息数? 通过以下任意一种方法,查看堆积消息数。 在Kafka控制台的“消费组管理”页面,单击待查看堆积消息的消费组名称,进入消费组详情页。在“消费进度”页签,查看消费组中每个Topic的总堆积数。具体步骤,请参考查询消费组信息。 在Kafka控制台的“监控”页面的“消费组”页签中,“消费组”选择待查看堆积消息数的消费组名称,“队列”选择“全部队列”,“消费组可消费消息数”表示此消费组中所有Topic的堆积消息数之和。查看监控数据的具体步骤,请参考查看监控数据。 在云监控页面的“消费组”页签中,“消费组”选择待查看堆积消息数的消费组名称,“队列”选择“全部队列”,“消费组可消费消息数”表示此消费组中所有Topic的堆积消息数之和。查看监控数据的具体步骤,请参考查看监控数据。 在Kafka客户端,在“/{命令行工具所在目录}/kafka{version}/bin/”目录下,通过 kafkaconsumergroups.sh bootstrapserver {kafka连接地址} describe group {消费组} 命令查看消费组中每个Topic的堆积消息数。“LAG”表示每个Topic的总堆积数。 图 查看每个Topic的总堆积数 说明 如果Kafka实例开启SASL认证,则以上命令还需要增加SASL认证的“consumer.properties”配置文件参数: commandconfig {SASL认证的consumer.properties配置文件} ,“consumer.properties”配置文件参考开启SASL认证的Kafka命令行连接说明。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        消息类问题
      • SDK下载及使用说明
        该页面主要介绍分布式消息服务Kafka版的SDK,您可以根据实际业务需求进行集成。 各语言SDK下载及使用 支持语言 SDK附件 Java ctyunctgkafkajavasdkv1.0.1.zip Go ctyunctgkafkagosdkv1.0.1.zip 说明 由于运行环境的多样性,部分接口在某些特定配置下可能存在适配差异,我们的技术团队将持续优化兼容性。如有问题,可提单咨询。
        来自:
        帮助文档
        分布式消息服务Kafka
        API参考
        SDK下载及使用说明
      • 不使用SASL证书连接
        生产消息 进入Kafka客户端文件的“/bin”目录下,执行如下命令进行生产消息: ./kafkaconsoleproducer.sh brokerlist ​{连接地址} topic 连接地址−−topic​{Topic名称} 参数说明如下: 连接地址:从前提条件中获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 示例如下,“10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094”为获取的Kafka实例公网连接地址。 执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin] ./kafkaconsoleproducer.sh brokerlist 10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094 topic topicdemoHello DMS Kafka! ^C[root@ecskafka bin] 如需停止生产使用Ctrl+C命令退出。 消费消息 执行如下命令进行消费消息: ./kafkaconsoleconsumer.sh bootstrapserver ​{连接地址} topic 连接地址−−topic​{Topic名称} group ${消费组名称} frombeginning 参数说明如下: 连接地址:从前提条件中获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 消费组名称:根据您的业务需求,设定消费组名称。 如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败。 消费组名称开头包含特殊字符,例如下划线“”、 号“ ”时,监控数据无法展示。 示例如下: [root@ecskafka bin] ./kafkaconsoleconsumer.sh bootstrapserver 10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094 topic topicdemo group ordertest frombeginning Kafka! DMS Hello ^CProcessed a total of 3 messages [root@ecskafka bin]
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        步骤四:连接实例生产消费消息
        不使用SASL证书连接
      • 使用限制
        介绍分布式消息服务Kafka产品功能的使用限制。 限制项 使用说明 实例 开通后,不支持修改VPC/子网/可用区,不支持变更地域属性 扩缩容 当前只支持节点、规格、磁盘扩容,不支持缩容 版本 当前服务端版本为2.132.8.2。实例创建后,服务端版本不支持升级,不支持定制版本 弹性ip 支持对每个节点绑定弹性ip,可以在公网访问topic,弹性ip只能通过SASL连接访问 topic创建 默认不支持自动创建,需要在页面上创建,总数不超过100个 topic分区数 只能增加不能减少,总分区数不能超过2000个,分区数过多会导致磁盘碎片化,影响性能 私有主题 支持私有主题 消息大小 默认1MB,可自定义配置 批量导入/导出 支持topic、消费组、连接用户的批量导入 私有消费组 支持私有消费组,需要在页面上创建、订阅对应的私有主题,并对连接用户授权
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        使用限制
      • 操作类
        Kafka服务端支持版本是多少? Kafka 1.1.0和2.3.0版本。 创建的Kafka实例是集群模式么? 创建一个Kafka实例即为一个集群实例。 Kafka实例是否支持修改访问端口? Kafka实例的访问端口固定,不支持修改。 如果是访问未开启SASL的Kafka专享实例,访问端口为9092。 如果是访问开启SASL的Kafka专享实例,访问端口为9093。 在访问Kafka实例之前,需要确保安全组是否配置正确。 Kafka实例的SSL证书有效期多长? Kafka实例开启SASL时,需进行单向认证,证书有效期足够长(超过15年),客户端不需要关注证书过期风险。 如何将Kafka实例中的数据同步到另一个Kafka实例中? Kafka实例之间没有好的实时同步方案,如果需要做实例迁移,可以同时向两个实例生产消息,源实例中的消息可继续消费,待源实例的消息数据全部被消费完或老化后,业务可迁移到新的Kafka实例。 Kafka实例的SASLSSL开关如何修改? Kafka SASLSSL开关不支持创建实例后修改,在创建时,请慎重选择,如果创建后需要修改,需要重新创建实例。 购买实例时选择的单AZ,怎样可以扩展为多AZ? 已购买的实例无法扩展AZ,请重新购买多AZ的实例。 Kafka扩容会影响业务吗? Kafka扩容带宽/存储空间,都不会影响业务的使用。 Kafka实例创建后,能修改VPC和子网吗? 不能修改VPC和子网。 Kafka实例版本可以升级吗? Kafka实例创建成功后,实例版本不支持升级。您可以重新创建Kafka实例,实现升级Kafka实例的版本。
        来自:
        帮助文档
        专属云分布式消息服务Kafka
        常见问题
        操作类
      • 查看Topic日志
        查看Topic日志 1. 登录管理控制台。 2. 在管理控制台左上角单击,选择Kafka实例所在的区域。 3. 在管理控制台左上角单击,选择“应用服务 > 分布式消息服务Kafka”,进入Kafka总览页面。 4. 在左侧导航栏单击“Kafka实例”,进入Kafka实例列表页面。 5. 单击Kafka实例的名称,进入实例详情页面。 6. 在左侧导航栏选择“日志管理 > Topic日志”,进入“Topic日志”页面。 7. 在“日志查询”页签,查看Topic日志。 如果您需要搜索日志,请参考进入搜索LTS日志页面进行操作。 Topic日志示例如下: plaintext { "level": "INFO", "timestamp": "20241227 17:26:13,361", "message": { "topicPartition": "topic0", "targetState": "OnlinePartition", "leaderAndIsr": "LeaderAndIsr(leader1, leaderEpoch3, isrList(1, 0), leaderRecoveryStateRECOVERED, partitionEpoch3)", "partitionState": "OnlinePartition", "topic": "topic", "type": "ELECTLEADER" } } { "level": "INFO", "timestamp": "20241227 17:26:13,491", "message": { "leader": "1", "startOffset": "0", "topic": "topic", "type": "MAKELEADER", "topicPartition": "topic0", "epoch": "3" } } 参数说明如表1所示。 表1 Topic日志参数说明 参数名称 说明 level Topic日志的等级,只有“INFO”一种等级。 timestamp Topic分区选举Leader或确定Leader的时间。 topicPartition Topic分区。 targetState 目标状态,取值如下: NewPartition:表示分区处于新建状态。 OnlinePartition:表示分区处于正常工作状态。 OfflinePartition:表示分区处于下线状态。 NonExistentPartition:表示分区不存在或被删除。 leaderAndIsr leaderAndIsr请求的信息。 partitionState 分区状态,取值如下: NewPartition:表示分区处于新建状态。 OnlinePartition:表示分区处于正常工作状态。 OfflinePartition:表示分区处于下线状态。 NonExistentPartition:表示分区不存在或被删除。 topic Topic名称。 type Leader所处的阶段,取值如下: ELECTLEADER:选举Leader。 MAKELEADER:确定Leader。 leader Leader所在分区。 startOffset Leader在对应Epoch上写入第一条消息的Offset。 每个Epoch对应一个startOffset。 epoch Leader的选举次数,初始值为0。Leader每发生一次选举,Epoch值加一。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        查看Topic日志
      • 【优惠】正式开放2年7折,3年5折包年折扣
        面向长期稳定客户,我们特别推出了更加优惠的包年订阅选项,旨在通过深度折扣,帮助客户显著降低资源单位成本,优化整体支出。 自2024年11月9日起订购和续订分布式消息服务Kafka产品更长包周期即可享受2年7折,3年5折优惠。 注意 本次包年优惠适用于新资费产品范围,具体支持资源池请参阅
        来自:
        帮助文档
        分布式消息服务Kafka
        服务公告
        2024年
        【优惠】正式开放2年7折,3年5折包年折扣
      • KafkaConsumer监控
        指标类别 指标 指标名称 指标说明 单位 数据类型 默认聚合方式 主题 (topic,kafka的topic监控数据。) id id clientid和ip信息 ENUM LAST 主题 (topic,kafka的topic监控数据。) topic topic kafka的topic名称 ENUM LAST 主题 (topic,kafka的topic监控数据。) bytesConsumedRate 每秒消费字节 每秒消费字节 Byte INT AVG 主题 (topic,kafka的topic监控数据。) fetchSizeAvg 请求获取平均字节 请求获取平均字节 Byte INT AVG 主题 (topic,kafka的topic监控数据。) fetchSizeMax 请求获取最大字节 请求获取最大字节 Byte INT MAX 主题 (topic,kafka的topic监控数据。) recordsConsumedRate 每秒消费消息数 每秒消费消息数 INT AVG 主题 (topic,kafka的topic监控数据。) recordsPerRequestAvg 单次请求平均消息数 单次请求平均消息数 INT AVG 主题 (topic,kafka的topic监控数据。) seqIds Producer生成序列号 Producer生成序列号 STRING LAST 主题 (topic,kafka的topic监控数据。) recordConsumedTotal 总消费次数 总消费次数 INT SUM 主题 (topic,kafka的topic监控数据。) bytesConsumedTotal 总消费字节数 总消费字节数 INT SUM fetch (fetch,kafka的fetch监控数据) id id clientid和ip信息 ENUM LAST fetch (fetch,kafka的fetch监控数据) bytesConsumedRate 每秒消费字节 每秒消费字节 Byte INT AVG fetch (fetch,kafka的fetch监控数据) fetchLatencyAvg 请求平均时延 请求平均时延 ms INT AVG fetch (fetch,kafka的fetch监控数据) fetchLatencyMax 请求最大时延 请求最大时延 ms INT MAX fetch (fetch,kafka的fetch监控数据) fetchRate 每秒请求数 每秒请求数 INT AVG fetch (fetch,kafka的fetch监控数据) fetchSizeAvg 请求获取平均字节 请求获取平均字节 Byte INT AVG fetch (fetch,kafka的fetch监控数据) fetchSizeMax 请求获取最大字节 请求获取最大字节 Byte INT MAX fetch (fetch,kafka的fetch监控数据) recordsConsumedRate 每秒消费消息数 每秒消费消息数 INT AVG fetch (fetch,kafka的fetch监控数据) recordsLagMax 最大堆积消息数 最大堆积消息数 INT MAX fetch (fetch,kafka的fetch监控数据) recordsPerRequestAvg 单次请求平均消息数 单次请求平均消息数 INT AVG fetch (fetch,kafka的fetch监控数据) seqIds Producer生成序列号 Producer生成序列号 STRING LAST fetch (fetch,kafka的fetch监控数据) recordConsumedTotal 总消费次数 总消费次数 INT SUM fetch (fetch,kafka的fetch监控数据) bytesConsumedTotal 总消费字节数 总消费字节数 INT SUM partition (partition,kafka的partition监控数据。) id id clientid和ip信息 ENUM LAST partition (partition,kafka的partition监控数据。) partition partition kafka的partition名称 ENUM LAST partition (partition,kafka的partition监控数据。) recordsLag 堆积消息数 堆积消息数 INT LAST partition (partition,kafka的partition监控数据。) recordsLagAvg 平均堆积消息数 平均堆积消息数 INT AVG partition (partition,kafka的partition监控数据。) recordsLagMax 最大堆积消息数 最大堆积消息数 INT MAX partition (partition,kafka的partition监控数据。) seqIds Producer生成序列号 Producer生成序列号 STRING LAST kafka消费方法监控 (consumer,kafka消费方法监控。) method method 消费方法 ENUM LAST kafka消费方法监控 (consumer,kafka消费方法监控。) concurrentMax 最大并发 最大并发 INT MAX kafka消费方法监控 (consumer,kafka消费方法监控。) errorCount 错误数 错误数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) invokeCount 调用次数 调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) lastError 错误信息 发生错误时产生的错误信息 STRING LAST kafka消费方法监控 (consumer,kafka消费方法监控。) maxTime 最大响应时间 采集周期内最大响应时间 INT MAX kafka消费方法监控 (consumer,kafka消费方法监控。) range1 010ms 响应时间在010ms范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range2 10100ms 响应时间在10100ms范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range3 100500ms 响应时间在100500ms范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range4 5001000ms 响应时间在5001000ms范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range5 110s 响应时间在110s范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range6 10s以上 响应时间在10s以上调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) totalTime 总响应时间 总响应时间 INT SUM KafkaConsumer汇总(total,KafkaConsumer汇总信息统计。) recordConsumedTotal 总消费次数 总消费次数 INT SUM KafkaConsumer汇总(total,KafkaConsumer汇总信息统计。) bytesConsumedTotal 总消费字节数 总消费字节数 INT SUM KafkaConsumer汇总(total,KafkaConsumer汇总信息统计。) recordsLag 总堆积消息数 总堆积消息数 INT LAST 异常 (exception,kafka消费异常信息。) causeType 异常发生类 异常发生类 ENUM LAST 异常 (exception,kafka消费异常信息。) exceptionType 异常类 异常类 ENUM LAST 异常 (exception,kafka消费异常信息。) count 数量 异常数量 INT SUM 异常 (exception,kafka消费异常信息。) message 异常消息 异常消息 STRING LAST 异常 (exception,kafka消费异常信息。) stackTrace 异常堆栈 异常堆栈 CLOB LAST
        来自:
        帮助文档
        应用性能管理
        产品介绍
        指标总览
        消息队列
        KafkaConsumer监控
      • 使用SASL证书连接
        生产消息 进入Kafka客户端文件的“/bin”目录下,执行如下命令进行生产消息。 ./kafkaconsoleproducer.sh brokerlist ​{连接地址} topic 连接地址−−topic​{Topic名称} producer.config ../config/producer.properties 参数说明如下: 连接地址:从前提条件获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 示例如下,“10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095”为Kafka实例连接地址。 执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin] ./kafkaconsoleproducer.sh brokerlist 10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095 topic topicdemo producer.config ../config/producer.propertiesHello DMS Kafka! ^C[root@ecskafka bin] 如需停止生产使用Ctrl+C命令退出。 消费消息 执行如下命令消费消息。 ./kafkaconsoleconsumer.sh bootstrapserver ​{连接地址} topic 连接地址−−topic​{Topic名称} group ${消费组名称} frombeginning consumer.config ../config/consumer.properties 参数说明如下: 连接地址:从前提条件获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 消费组名称:根据您的业务需求,设定消费组名称。 如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败 。 消费组名称开头包含特殊字符,例如下划线“”、 号“ ”时,监控数据无法展示。 示例如下: [root@ecskafka bin]
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        步骤四:连接实例生产消费消息
        使用SASL证书连接
      • DMS for Kafka 消费者poll的优化
        本文主要介绍DMS for Kafka 消费者poll的优化。 场景介绍 在DMS提供的原生Kafka SDK中,消费者可以自定义拉取消息的时长,如果需要长时间的拉取消息,只需要把poll(long)方法的参数设置合适的值即可。但是这样的长连接可能会对客户端和服务端造成一定的压力,特别是分区数较多且每个消费者开启多个线程的情况下。 如图所示,Kafka队列含有多个分区,消费组中有多个消费者同时进行消费,每个线程均为长连接。当队列中消息较少或者没有时,连接不断开,所有消费者不间断地拉取消息,这样造成了一定的资源浪费。 图 Kafka消费者多线程消费模式 优化方案 在开了多个线程同时访问的情况下,如果队列里已经没有消息了,其实不需要所有的线程都在poll,只需要有一个线程poll各分区的消息就足够了,当在polling的线程发现队列中有消息,可以唤醒其他线程一起消费消息,以达到快速响应的目的。如图所示。 这种方案适用于对消费消息的实时性要求不高的应用场景。如果要求准实时消费消息,则建议保持所有消费者处于活跃状态。 图 优化后的多线程消费方案 说明 消费者(Consumer)和消息分区(Partition)并不强制数量相等,Kafka的poll(long)方法帮助实现获取消息、分区平衡、消费者与Kafka broker节点间的心跳检测等功能。 因此在对消费消息的实时性要求不高场景下,当消息数量不多的时候,可以选择让一部分消费者处于wait状态。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        DMS for Kafka 消费者poll的优化
      • Python
        环境安装 1. 安装Python。(Python版本为2.7或3.X。) 2. 安装依赖库。(使用公网连接需要安装confluentkafka 1.9.2及以下版本的依赖库) pip install confluentkafka1.9.2 3. 下载Demo包kafkaconfluentpythondemo.zip。 配置修改 1. 如果是ssl连接,需要在控制台下载证书。并且解压压缩包得到ssl.client.truststore.jks,执行以下命令生成caRoot.pem文件。 keytool importkeystore srckeystore ssl.client.truststore.jks destkeystore caRoot.p12 deststoretype pkcs12 openssl pkcs12 in caRoot.p12 out caRoot.pem 2. 修改setting.py文件。(calocation仅在ssl连接时需要配置) kafkasetting { 'bootstrapservers': 'XXX', 'topicname': 'XXX', 'groupname': 'XXX' } 生产消息 发送以下命令发送消息。 python kafkaproducer.py 生产消息示例代码如下: from confluentkafka import Producer import setting conf setting.kafkasetting """初始化一个 Producer 对象""" p Producer({'bootstrap.servers': conf['bootstrapservers']}) def deliveryreport(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) """异步发送消息""" p.produce(conf['topicname'], "Hello".encode('utf8'), callbackdeliveryreport) p.poll(0) """在程序结束时,调用flush""" p.flush() 消费消息 发送以下命令消费消息。 python kafkaconsumer.py 消费消息示例代码如下: from confluentkafka import Consumer, KafkaError import setting conf setting.kafkasetting c Consumer({ 'bootstrap.servers': conf['bootstrapservers'], 'group.id': conf['groupname'], 'auto.offset.reset': 'latest' }) c.subscribe([conf['topicname']]) while True: msg c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() KafkaError.PARTITIONEOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf8'))) c.close()
        来自:
        帮助文档
        分布式消息服务Kafka
        开发指南
        Python
      • 网络带宽利用率监控告警配置指导
        本章介绍Kafka的网络带宽利用率监控告警配置指导。 使用场景建议 Kafka存在持续业务,避免出现客户端批量断链场景。 间断性业务场景不建议使用,会存在误告警。 新创建实例不建议设置告警。 告警通知设置,告警对象、告警组设置 1. 登录管理控制台。 2. 在管理控制台左上角单击,选择目标实例所在的区域。 3. 在管理控制台左上角单击,搜索CES服务进入“云监控服务”界面。 4. 选择“告警 >告警通知”,单击“通知对象 >创建通知对象”,填写要通知的对象以及相关联系方式,如果已存在则跳过该步骤,重复该步骤可创建多个通知对象。 5. 选择“通知组 >创建通知组”,把步骤4创建的告警对象都纳入当前组进行管理。 设置告警规则 1. 选择“告警 >告警规则 >创建告警规则”。 参数 说明 名称 自定义名称 描述 自定义描述 告警类型 指标 资源类型 分布式消息服务 维度 Kafka专享版 Broker节点 监控范围 指定资源 监控对象 选择指定kafka实例的所有broker,可选择多个kafka实例 触发规则 自定义创建 告警策略 若++网络带宽利用率++ 的++原始值连续3次<++xx则++每5分钟 告警一次++ ++根据实际情况设置xx的紧急、重要、次要等告警++ 发送通知 打开 通知方式 通知组 通知组 选择上一步创建的通知组 通知内容模板 都可以选择系统模版 生效时间 每日00:00 – 23:59 触发条件 出现告警恢复告警都选 2. 单击“立即创建”。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        网络带宽利用率监控告警配置指导
      • 服务内联委托管理
        可信云服务可以通过IAM委托的方式访问其他云服务的资源。可信实体为天翼云服务的IAM委托,包括普通云服务委托和云服务关联委托。本文介绍事件总线EventBridge的服务内联委托。 什么是服务内联委托 在某些场景下,事件总线EventBridge为了完成自身的某个功能,需要获取其他云服务的访问权限,因此,事件总线EventBridge创建了与云服务内联委托,即服务内联委托CtyunAssumeRoleForEventBridge。 使用事件总线EventBridge,系统提供的服务内联委托及其包含的系统权限策略如下: 服务内联委托:CtyunAssumeRoleForEventBridge 系统权限策略:CtyunAssumePolicyForEventBridge CtyunAssumeRoleForEventBridge 服务内联委托CtyunAssumeRoleForEventBridge具有获取访函数列表、函数详情以及调用函数的权限;具有对分布式消息服务Kafka、分布式消息服务RocketMQ、分布式消息服务MQTT与分布式消息服务RabbitMQ的管理员权限;具有专有网络VPC、VPCE的管理员权限。 服务内联委托CtyunAssumeRoleForEventBridge被授予权限策略CtyunAssumePolicyForEventBridge,该权限策略的内容如下: plaintext { "Version": "1.1", "Statement": [ { "Action": [ "cf:inst:InvokeFunction", "cf:inst:GetFunction", "cf:inst:ListFunctions", "KAFKA::", "MQ2::", "mqtt::", "AMQP::", "vpce::", "vpc::" ], "Resource": [ "" ], "Effect": "Allow" } ] } 以下是使用事件总线EventBridge时,需要使用服务内联委托的场景: 建立函数计算规则时,需要委托事件总线EventBridge具有获取访函数列表、函数详情以及调用函数的权限。 建立消息中间件事件源与事件目标时,需要委托事件总线EventBridge具有对分布式消息服务Kafka、分布式消息服务RocketMQ、分布式消息服务MQTT与分布式消息服务RabbitMQ的管理员权限。 建立网络端点时,需要委托事件总线EventBridge具有专有网络VPC、VPCE的管理员权限。
        来自:
        帮助文档
        事件总线
        产品简介
        服务内联委托管理
      • 【通知】产品订购、续订包周期2年和3年选项调整为白名单特性
        尊敬的天翼云客户,分布式消息服务Kafka自2025年12月27日起,订购和续订2年、3年选项默认不开放,调整为白名单特性。 调整时间 2025年12月27日 影响范围 所有区域 调整影响 新订购和续订的实例默认不开放2年、3年选项,您可以选择1年包年选项,如仍需要23年包周期选项,请联系技术支持开通后使用。 已购买2年、3年且还在服务期间的实例仍可继续正常使用不受影响。
        来自:
        帮助文档
        分布式消息服务Kafka
        服务公告
        2025年
        【通知】产品订购、续订包周期2年和3年选项调整为白名单特性
      • 管理Smart Connect任务
        启动/暂停Smart Connect任务 暂停任务后,Kafka实例的数据将不会再同步到另一个Kafka实例或者其他云服务中。 1、登录管理控制台。 2、在管理控制台左上角单击,选择Kafka实例所在的区域。 3、在管理控制台左上角单击,选择“应用服务 > 分布式消息服务 Kafka”,进入分布式消息服务Kafka专享版页面。 4、单击Kafka实例名称,进入实例详情页面。 5、在左侧导航栏单击“Smart Connect”,进入Smart Connect任务列表页面。 6、执行以下操作,启动/暂停Smart Connect任务。 启动:在待启动的Smart Connect任务所在行,单击“启动”。 暂停:在待暂停的Smart Connect任务所在行,单击“暂停”,弹出“暂停任务”对话框,单击“确定”。 结束 重启Smart Connect任务 1、登录管理控制台。 2、在管理控制台左上角单击,选择Kafka实例所在的区域。 3、在管理控制台左上角单击,选择“应用服务 > 分布式消息服务 Kafka”,进入分布式消息服务Kafka专享版页面。 4、单击Kafka实例名称,进入实例详情页面。 5、在左侧导航栏单击“Smart Connect”,进入Smart Connect任务列表页面。 6、在待重启的Smart Connect任务所在行,单击“重启”,弹出“重启任务”对话框。 重启Smart Connect任务前,请注意以下两点: Smart Connect任务创建后,修改了源端或者目标端的参数,可能会导致重启失败。 重启Smart Connect任务会重置同步进度,并重新开始同步任务。 7、单击“确定”,完成Smart Connect任务的重启。 当页面左上方显示“成功重启任务xxx”时,表示成功重启Smart Connect任务。 结束
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Kafka数据迁移
        使用Smart Connect迁移Kafka数据
        管理Smart Connect任务
      • 【通知】通用型主机规格调整为白名单特性
        分布式消息服务Kafka主机类型通用型规格调整为白名单特性,更多了解请查看计费项产品规格说明。 调整时间 2024年6月25日 影响范围 所有区域 调整影响 新用户默认不开放主机类型通用型规格订购开通,如需要该特性,请联系技术支持开通后使用。 已购买主机类型通用型规格实例的用户,原实例仍可正常使用,续费、扩容等操作不受影响。
        来自:
        帮助文档
        分布式消息服务Kafka
        服务公告
        2024年
        【通知】通用型主机规格调整为白名单特性
      • 事件源参数
        resourceKey 是否必传 form value template instanceId 是 CONSTANT 分布式消息服务Kafka实例ID。 无 topic 是 CONSTANT 主题名称。 无 groupName 是 CONSTANT 消费组名。 无 initialOffset 是 CONSTANT 消息位点。 最新位点:latest 最早位点:earliest 无 dataFormat 否 CONSTANT 数据格式,不填时默认为JSON编码格式: JSON格式编码:JSON 文本格式编码:Text 二进制格式编码:Binary 无
        来自:
        帮助文档
        事件总线
        用户指南
        事件流
        事件源
        事件源参数
      • 旧资费
        说明 分布式消息服务Kafka旧资费根据实例规格分为基础版和高级版,存储空间按照不同类型收费。 目前在 芜湖2、上海7、重庆2、乌鲁木齐27、石家庄20、内蒙6、北京5 资源池开放订购。 实例价格如下: 规格 标准资费(元/月) 标准资费(元/小时) 基础版(100 MB/s) 1300 2.71 高级版(300 MB/s ) 2100 4.38 存储空间价格如下: 存储类型 标准资费(元/G/月) 标准资费(元/G/小时) 普通 IO 0.45 0.0008 高 IO 0.6 0.0013 超高 IO 1.5 0.0032
        来自:
        帮助文档
        分布式消息服务Kafka
        计费说明
        产品资费
        旧资费
      • KafkaProducer监控
        指标类别 指标 指标名称 指标说明 单位 数据类型 默认聚合方式 topic (topic,kafka的topic监控数据。) id id clientid和ip信息 ENUM LAST topic (topic,kafka的topic监控数据。) topic topic kafka的topic名称 ENUM LAST topic (topic,kafka的topic监控数据。) byteRate 每秒发送字节 每秒发送字节 Byte INT AVG topic (topic,kafka的topic监控数据。) recordErrorRate 每秒错误数 每秒错误数 INT AVG topic (topic,kafka的topic监控数据。) recordRetryRate 每秒重试数 每秒重试数 INT AVG topic (topic,kafka的topic监控数据。) recordSendRate 每秒发送数 每秒发送数 INT AVG topic (topic,kafka的topic监控数据。) seqIds Producer生成序列号 Producer生成序列号 STRING LAST topic (topic,kafka的topic监控数据。) recordSendTotal 总发送次数 总发送次数 INT SUM topic (topic,kafka的topic监控数据。) byteTotal 总发送字节数 总发送字节数 INT SUM KafkaProducer汇总(total,KafkaProducer汇总信息统计。) recordSendTotal 总发送次数 总发送次数 INT SUM KafkaProducer汇总(total,KafkaProducer汇总信息统计。) byteTotal 总发送字节数 总发送字节数 INT SUM 异常 (exception,kafka发送异常信息。) causeType 异常发生类 异常发生类 ENUM LAST 异常 (exception,kafka发送异常信息。) exceptionType 异常类 异常类 ENUM LAST 异常 (exception,kafka发送异常信息。) count 数量 异常数量 INT SUM 异常 (exception,kafka发送异常信息。) message 异常消息 异常消息 STRING LAST 异常 (exception,kafka发送异常信息。) stackTrace 异常堆栈 异常堆栈 CLOB LAST 发送方法(doSendMethod,发送消息方法监控。) topic topic topic ENUM LAST 发送方法(doSendMethod,发送消息方法监控。) concurrentMax 最大并发 最大并发 INT MAX 发送方法(doSendMethod,发送消息方法监控。) errorCount 错误数 错误数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) invokeCount 调用次数 调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) maxTime 最慢时延 最慢时延 INT MAX 发送方法(doSendMethod,发送消息方法监控。) range1 0–10ms 时延在010ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range2 10–100ms 时延在10–100ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range3 100–500ms 时延在100–500ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range4 500–1000ms 时延在500–1000ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range5 1–10s 时延在1–10s范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range6 10sn 时延在10s以上调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) totalTime 总时延 调用总耗时 INT SUM
        来自:
        帮助文档
        应用性能管理
        产品介绍
        指标总览
        消息队列
        KafkaProducer监控
      • 创建应用用户
        本节主要介绍分布式消息服务Kafka如何创建应用用户 场景描述 用户:主要用于规定生产消费指定加密主题的策略而需要,例如规定用户A可生产消费加密Topic1,用户B可生产消费加密Topic2,用户C可生产消费加密Topic1、Topic2,则需要为这三个用户创建用户,并分配加密主题权限。 操作步骤 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“用户管理”后进入用户管理页面,点击新建用户。 (5)在 新建用户的窗口中填入集群名、用户、密码、描述,然后保存。 批量创建用户 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“用户管理”后进入应用用户管理页面。 (5)点击“批量创建用户”后,出现如下上传文件界面,文件格式件批量下载说明。 (6)点击“上传”完成批量创建。 下载批量创建用户模板 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“用户管理”后进入用户管理页面。 (5)点击“下载模板”右侧下拉倒三角“下载模板”,内容如下图。 (6)参数说明。 参数 说明 username 用户名 password 密码 description 备注或描述
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        用户管理
        创建应用用户
      • 通知外送
        配置项 说明 名称 日志外送接口的名称。必须为中文字符、字母、数字、“”、“.”或“”,长度不超过 64 字符。 Kafka节点地址 Kafka服务器的IP(域名)及端口号。例如:192.168.0.1:9200。 Kafka主题 消息投放到Kafka服务器的主题。 Kafka分区 消息投放到的Kafka服务器的分区。Kafka服务器通过主题(topic)、分区(partition)和消费组(consumergroup)三个概念灵活适应各种消息场合,通过提升硬件资源利用率提高系统吞吐量。 以上Kafka相关配置与服务器端保持一致即可。 审计日志模板 设置发送审计日志的模板,具体字段请依据填写说明编辑。 操作日志模板 设置发送操作日志的模板,具体字段请依据填写说明编辑。 告警日志模板 设置发送告警日志的模板,具体字段请依据填写说明编辑。 流量控制日志模板 设置发送流量控制日志的模板,具体字段请依据填写说明编辑。
        来自:
        帮助文档
        数据安全专区
        用户指南
        API安全网关操作指导
        通知外送
      • 购买类常见问题
        本节介绍分布式消息服务Kafka计费类常见问题 可以购买哪些版本? 不同资源池可购买的版本、规格不一致,具体请查看产品规格说明 (1)华东1、华北2、西南1、华南2、上海36、青岛20、长沙42、南昌5、武汉41、杭州7、西南2贵州、太原4、郑州5、西安7、呼和浩特3 可以选购Kafka引擎,选择主机类型、节点规格、节点数和存储空间。 (2)芜湖2、上海7、重庆2、乌鲁木齐27、石家庄20、内蒙6、北京5 可以选购高级版和基础版两个版本。 到期后如何续费? 在集群列表中点击“续费”,进入购买时长页面,购买成功后自动续费。 手动续订:对于包年/包月订购的分布式缓存服务,用户在资源到期前进行续费操作,可以延长原有资源到期时间,避免资源到期后冻结或超过保留期后被系统回收。详细操作请参考费用中心续订管理手动续订。 自动续订:自动续订仅针对采用包月、包年计费模式的资源,详细操作请参考费用中心续订管理自动续订。 产品订购时可选资源池节点不一致? 已上线资源池节点的剩余容量达到一定比例后,为确保老客户权益,将不再面向新客户开放,产品订购时的可选资源节点范围以实际为准。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        计费与购买类
        购买类常见问题
      • 连接已开启SASL的Kafka专享实例
        创建实例时开启SASLSSL访问,则数据加密传输,安全性更高。 由于安全问题,支持的加密套件为TLSECDHEECDSAWITHAES128CBCSHA256,TLSECDHERSAWITHAES128CBCSHA256和TLSECDHERSAWITHAES128GCMSHA256。 本节介绍如何使用开源的Kafka客户端访问开启SASL的Kafka专享实例的方法。 说明: 使用SASL方式连接Kafka实例时,为了客户端能够快速解析实例的Broker,建议在客户端所在主机的“/etc/hosts”文件中配置host和IP的映射关系。 其中,IP地址必须为实例连接地址(Broker地址),host为每个实例主机的名称(您可以自定义主机的名称,但不能重复)。 例如: 10.154.48.120 server01 10.154.48.121 server02 10.154.48.122 server03 前提条件 已配置正确的安全组。 访问开启SASL的Kafka专享实例时,支持VPC内访问。实例需要配置正确的安全组规则,具体安全组配置要求,请参考表32。 已获取连接Kafka专享版实例的地址。 使用VPC内访问,实例端口为9093,实例连接地址获取如下图。 获取VPC内访问Kafka专享实例的连接地址(实例已开启SASL) Kafka专享实例已创建Topic,否则请提前创建Topic。 已下载client.truststore.jks证书。如果没有,在控制台单击Kafka实例名称,进入实例详情页面,在“基本信息 > 高级配置 > Kafka SASLSSL”所在行,单击 。下载压缩包后解压,获取压缩包中的客户端证书文件:client.truststore.jks。 已下载Kafka命令行工具1.1.0版本或者Kafka命令行工具2.3.0版本,确保Kafka实例版本与命令行工具版本相同。 已在Kafka命令行工具的使用环境中安装Java Development Kit 1.8.111或以上版本,并完成环境变量配置。 命令行模式连接实例 以下操作命令以Linux系统为例进行说明。 解压Kafka命令行工具。 进入文件压缩包所在目录,然后执行以下命令解压文件。 tar zxf [kafkatar] 其中,[kafkatar]表示命令行工具的压缩包名称。 例如: tar zxf kafka2.111.1.0.tgz 修改Kafka命令行工具配置文件。 在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。 sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required username"" password""; sasl.mechanismPLAIN security.protocolSASLSSL ssl.truststore.location/opt/kafka2.111.1.0/config/client.truststore.jks ssl.truststore.passworddms@kafka ssl.endpoint.identification.algorithm 参数说明: username和password为创建Kafka专享实例过程中开启SASLSSL时填入的用户名和密码。 ssl.trustore.location配置为client.truststore.jks证书的存放路径。注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中拷贝路径时的“”,否则客户端获取证书失败。 ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka。 ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。 进入Kafka命令行工具的“/bin”目录下。 注意,Windows系统下需要进入“/bin/windows”目录下。 执行如下命令进行生产消息。 ./kafkaconsoleproducer.sh brokerlist ${连接地址} topic ${Topic名称} producer.config ../config/producer.properties 参数说明如下: 连接地址:从前提条件获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 如下示例,Kafka实例连接地址为“10.xxx.xxx.202:9093,10.xxx.xxx.197:9093,10.xxx.xxx.68:9093”。 执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin]
        来自:
        帮助文档
        专属云分布式消息服务Kafka
        用户指南
        连接已开启SASL的Kafka专享实例
      • SSL接入性能优化
        本文主要介绍消息队列 Kafka 通过SSL接入的最佳实践,从而帮助您更好的使用该产品。 文中的最佳实践基于消息队列 Kafka 的 Java 客户端;对于其它语言的客户端,其基本概念与思想是通用的,但实现细节可能有差异,仅供参考。 背景 云消息队列 Kafka 版实例如果选择SASLSSL接入时,可能会出现性能较差的情况。可以通过在客户端指定密码套件的方式,手动选择性能和安全性都较高的套件进行TLS通讯。 SSL握手时客户端发送Hello Client 并带上客户端支持的密码套件,服务端收到握手请求后,获取客户端带来的密码套件和服务端支持的密码套件取交集。密码套件加载顺序受客户端jdk版本影响,不同jdk版本,密码套件顺序不一样,可能会导致性能和安全性等无法保证。因此为了保证性能,SSL连接时客户端可以指定密码套件。 推荐的性能和安全性较高的密码套件 TLSECDHERSAWITHAES256GCMSHA384 TLSECDHERSAWITHAES128GCMSHA256 步骤 1.先按照SASLSSL协议接入Kafka,SASLSSL接入可参考文档 SASLSSL接入点接入 2.在此基础上增加 ssl.cipher.suites 配置 客户端关键参数 java Properties props new Properties(); props.put(ProducerConfig.BOOTSTRAPSERVERSCONFIG, BROKERADDR); props.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, StringSerializer.class.getName()); props.put(CommonClientConfigs.SECURITYPROTOCOLCONFIG, "SASLSSL"); props.put("sasl.mechanism", "SCRAMSHA512"); props.put(SaslConfigs.SASLJAASCONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username"testuser" password"f6eca4dbc78df78d63fba980e448185f";");//注:上面的密码f6eca4dbc78df78d63fba980e448185f,为用户管理里面创建用户时填入的密码进行md5的结果,md5取32位小写 props.put("ssl.truststore.location","/kafka/client.truststore.jks"); props.put("ssl.truststore.password","sJses2tin1@23"); props.put("ssl.endpoint.identification.algorithm",""); props.put("ssl.cipher.suites","TLSECDHERSAWITHAES256GCMSHA384,TLSECDHERSAWITHAES128GCMSHA256");//密码套件支持多个,用半角逗号分开
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        SSL接入性能优化
      • Java客户端接入示例
        生产消息 import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.Future; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.SslConfigs; public class KafkaProducerDemo { public static void main(String args[]) { //加载kafka.properties Properties kafkaProperties JavaKafkaConfigurer.getKafkaProperties(); Properties props new Properties(); //设置接入点,请通过控制台获取对应Topic的接入点 props.put(ProducerConfig.BOOTSTRAPSERVERSCONFIG, kafkaProperties.getProperty("bootstrap.servers")); //设置SSL根证书的路径,请记得将XXX修改为自己的路径 props.put(SslConfigs.SSLTRUSTSTORELOCATIONCONFIG, kafkaProperties.getProperty("ssl.truststore.location")); //根证书store的密码,保持不变 props.put(SslConfigs.SSLTRUSTSTOREPASSWORDCONFIG, "c24f5210"); //接入协议,目前支持使用SSL协议接入 props.put(CommonClientConfigs.SECURITYPROTOCOLCONFIG, "SSL"); //Kafka消息的序列化方式 props.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //请求的最长等待时间 props.put(ProducerConfig.MAXBLOCKMSCONFIG, 30 1000); //设置客户端内部重试次数 props.put(ProducerConfig.RETRIESCONFIG, 5); //设置客户端内部重试间隔 props.put(ProducerConfig.RECONNECTBACKOFFMSCONFIG, 3000); //hostname校验改成空 props.put(SslConfigs.SSLENDPOINTIDENTIFICATIONALGORITHMCONFIG, ""); //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可; //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个 KafkaProducer producer new KafkaProducer (props); //构造一个Kafka消息 String topic kafkaProperties.getProperty("topic"); //消息所属的Topic,请在控制台申请之后,填写在这里 String value "this is the message's value"; //消息的内容 try { //批量获取 futures 可以加快速度, 但注意,批量不要太大 List > futures new ArrayList >(128); for (int i 0; i kafkaMessage new ProducerRecord (topic, value + ": " + i); Future metadataFuture producer.send(kafkaMessage); futures.add(metadataFuture); } producer.flush(); for (Future future: futures) { //同步获得Future对象的结果 try { RecordMetadata recordMetadata future.get(); System.out.println("Produce ok:" + recordMetadata.toString()); } catch (Throwable t) { t.printStackTrace(); } } } catch (Exception e) { System.out.println("error occurred"); e.printStackTrace(); } } }
        来自:
        帮助文档
        分布式消息服务Kafka
        开发指南
        Java
        Java客户端接入示例
      • Kafka客户端使用规范
        其他建议 连接数限制:3000 消息大小:不能超过10MB 使用saslssl协议访问Kafka:确保DNS具有反向解析能力,或者在hosts文件配置kafka所有节点ip和主机名映射,避免Kafka client做反向解析,阻塞连接建立。 磁盘容量申请超过业务量 副本数的2倍,即保留磁盘空闲50%左右。 业务进程JVM内存使用确保无频繁FGC,否则会阻塞消息的生产和消费。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        Kafka客户端使用规范
      • 连接未开启SASL的Kafka实例
        命令行模式连接实例 以下操作命令以Linux系统为例进行说明: 步骤 1 解压Kafka命令行工具。 进入文件压缩包所在目录,然后执行以下命令解压文件。 tar zxf [kafkatar] 其中, [kafkatar] 表示命令行工具的压缩包名称。 例如: tar zxf kafka2.122.7.2.tgz 步骤 2 进入Kafka命令行工具的“/bin”目录下。 注意,Windows系统下需要进入“/bin/windows”目录下。 步骤 3 执行如下命令进行生产消息。 ./kafkaconsoleproducer.sh brokerlist {连接地址} topic {Topic名称} 参数说明如下: 连接地址:从前提条件中获取的连接地址,如果是公网访问,请使用“公网连接地址”,如果是VPC内访问,请使用“内网连接地址”,请根据实际情况选择。 Topic名称:Kafka实例下创建的Topic名称。如果Kafka实例开启了自动创建Topic功能,此参数值可以填写已创建的Topic名称,也可以填写未创建的Topic名称。 本文以公网连接为例,获取的Kafka实例公网连接地址为“10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094”。执行完命令后输入内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin] ./kafkaconsoleproducer.sh brokerlist 10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094 topic topicdemo >Hello >DMS >Kafka! >^C[root@ecskafka bin] 如需停止生产使用Ctrl+C命令退出。 步骤 4 执行如下命令消费消息。 ./kafkaconsoleconsumer.sh bootstrapserver {连接地址} topic {Topic名称} group ${消费组名称} frombeginning 参数说明如下: 连接地址:从前提条件中获取的连接地址,如果是公网访问,请使用“公网连接地址”,如果是VPC内访问,请使用“内网连接地址”,请根据实际情况选择。 Topic名称:Kafka实例下创建的Topic名称。 消费组名称:根据您的业务需求,设定消费组名称。 如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败 。 消费组名称开头包含特殊字符,例如下划线“”、
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        连接Kafka实例
        连接未开启SASL的Kafka实例
      • Java开发环境搭建
        开发环境 Maven Apache Maven 2.5及以上版本,可至Maven官方网站下载。 JDK Java Development Kit 1.8及以上版本,可至Oracle官方网站下载。 安装后注意配置JAVA的环境变量。 IntelliJ IDEA 获取并安装IntelliJ IDEA,可至IntelliJ IDEA官方网站下载。 操作步骤 1. 下载Demo包kafkajavademo.zip 下载后解压,有如下文件: 表1 KAFKA Demo文件清单 文件名 路径 说明 ::: JavaKafkaConfigurer.java .srcmainjavajavaDemo 读取Kafka配置文件。 KafkaConsumerDemo.java .srcmainjavajavaDemo 消费消息。 KafkaMultiConsumerDemo.java .srcmainjavajavaDemo 批量消费消息。 KafkaProducerDemo.java .srcmainjavajavaDemo 生产消息。 kafka.properties .srcmainresources kafka配置参数 pom.xml . maven配置文件,包含Kafka客户端引用。 2. 打开IntelliJ IDEA,导入Demo。Demo是一个Maven构建的Java工程,因此需要配置JDK环境,以及IDEA的Maven插件。 3. 修改Kafka配置信息。 修改kafka.properties 修改demo文件中的属性变量
        来自:
        帮助文档
        分布式消息服务Kafka
        开发指南
        Java
        Java开发环境搭建
      • 批量修改Topic配置
        修改单个Topic配置 1. 登录Kafka控制台。 2. 在管理控制台左上角单击,选择Kafka实例所在的区域。 3. 单击Kafka实例的名称,进入实例详情页面。 4. 在左侧导航栏选择“实例管理 > Topic管理”,进入Topic列表页面。 5. 在待修改配置的Topic所在行,单击“编辑”。 6. 在“编辑Topic”对话框中,修改Topic的配置,单击“确定”。 设置成功后,在Topic列表页面,查看修改后的配置。 批量修改Topic配置 1. 登录Kafka控制台。 2. 在管理控制台左上角单击,选择Kafka实例所在的区域。 3. 单击Kafka实例的名称,进入实例详情页面。 4. 在左侧导航栏选择“实例管理 > Topic管理”,进入Topic列表页面。 5. 勾选待修改配置的Topic,单击信息栏左上侧的“批量编辑Topic”,弹出“批量编辑Topic”对话框。 6. 在“批量操作”区域,勾选如表2所示配置。在“批量数据预览”区域,查看修改前和修改后的配置信息。确认无误后,单击“确定”。 图1批量修改Topic配置 表2 Topic配置参数 参数名称 操作说明 同步复制 勾选“同步复制”,并开启或关闭同步复制功能。 同步落盘 勾选“同步落盘”,并开启或关闭同步落盘功能。 消息时间戳类型 勾选“消息时间戳类型”,并在下拉框中选择“CreateTime”/“LogAppendTime”。 批处理消息最大值(字节) 勾选“批处理消息最大值(字节)”,并输入批处理消息最大值。 设置成功后,在Topic列表页面,查看修改后的配置。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        批量修改Topic配置
      • 删除Topic
        本节主要介绍分布式消息服务Kafka的删除Topic步骤 场景描述 在以下场景中,可以考虑删除Kafka主题: 主题不再使用:当一个主题不再被使用或者不再需要时,可以选择删除该主题。这可能是因为业务需求变化、数据不再有效或者主题被合并到其他主题中等原因。 数据保留策略变更:Kafka中可以设置数据保留策略,即数据在主题中的保留时间或者大小。当需要更改数据保留策略时,可能需要删除旧的主题并创建一个新的主题来应用新的策略。 清理测试数据:在测试环境中,经常需要清理旧的测试数据,以确保环境的可用空间和性能。当测试数据不再需要时,可以删除相应的主题来释放资源。 主题配置错误:在创建主题时,可能会出现配置错误或者误操作导致主题创建失败或者无法正常使用。在这种情况下,可以删除有问题的主题,并重新创建正确的主题配置。 需要注意的是,在删除主题之前,需要确保主题中的数据已经备份或者不再需要。删除主题将永久删除主题中的所有数据,并且无法恢复。因此,在删除主题之前,建议先进行备份或者确认数据不再需要。 操作步骤 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“Topic管理”后进入Topic管理页面。 (5)在Topic所在行,点击其右侧的“更多按钮”,再单击“删除”,并选择确定。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        删除Topic
      • 查看分区状态
        介绍分布式消息服务Kafka查看分区状态功能操作内容。 场景描述 Kafka查看分区状态的场景描述如下: 监控和故障排查:通过查看分区状态,可以了解每个分区的健康状况和数据处理情况。如果某个分区出现延迟或者数据丢失等问题,可以及时发现并进行故障排查和修复。 性能优化:通过查看分区状态,可以了解每个分区的负载情况和数据处理速度。如果某个分区负载过高或者处理速度较慢,可以采取相应的措施进行性能优化,如增加分区数量或者优化消费者的消费能力。 容量规划:通过查看分区状态,可以了解每个分区的数据大小和数据增长趋势。根据分区状态,可以进行容量规划,确保有足够的存储空间来存储数据,并预测未来的数据增长趋势。 数据迁移和重平衡:当需要进行数据迁移或者重平衡时,查看分区状态可以帮助确定合适的迁移方案和平衡策略。通过了解每个分区的状态和负载情况,可以更好地规划和执行数据迁移和重平衡操作。 容错和冗余管理:通过查看分区状态,可以了解每个分区的副本分布情况和数据冗余情况。如果某个分区的副本数量不足或者副本分布不均衡,可以采取相应的措施来提高数据的容错性和冗余能力。 总之,Kafka查看分区状态的场景包括监控和故障排查、性能优化、容量规划、数据迁移和重平衡,以及容错和冗余管理等,以确保系统的稳定性、性能和可靠性。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        查看分区状态
      • 1
      • ...
      • 5
      • 6
      • 7
      • 8
      • 9
      • ...
      • 691
      跳转至
      推荐热词
      天翼云运维管理审计系统天翼云云服务平台云服务备份云日志服务应用运维管理云手机云电脑天翼云云hbase数据库电信云大数据saas服务电信云大数据paas服务轻量型云主机天翼云客户服务电话应用编排服务天翼云云安全解决方案云服务总线CSB天翼云服务器配置天翼云联邦学习产品天翼云云安全天翼云企业上云解决方案天翼云产品天翼云视频云存储

      天翼云最新活动

      云聚517 · 好价翼起拼

      爆款云主机低至25.83元/年,参与拼团享更多优惠,拼成得额外优惠券

      安全隔离版OpenClaw

      OpenClaw云服务器专属“龙虾“套餐低至1.5折起

      聚力AI赋能 天翼云大模型专项

      大模型特惠专区·Token Plan 轻享包低至9.9元起

      青云志云端助力计划

      一站式科研助手,海外资源安全访问平台,助力青年翼展宏图,平步青云

      企业出海解决方案

      助力您的业务扬帆出海,通达全球!

      天翼云信创专区

      “一云多芯、一云多态”,国产化软件全面适配,国产操作系统及硬件芯片支持丰富

      中小企业服务商合作专区

      国家云助力中小企业腾飞,高额上云补贴重磅上线

      云上钜惠

      爆款云主机全场特惠,2核4G只要1.8折起!

      产品推荐

      物理机 DPS

      多活容灾服务 MDR

      弹性伸缩服务 AS

      弹性高性能计算 E-HPC

      训推服务

      Token服务

      应用托管

      科研助手

      一站式智算服务平台

      推荐文档

      操作类

      云课堂 第十七课:天翼云在线语音合成AI云服务测试Demo

      CDN快速入门

      导入数据

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 息壤智算平台
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 天翼云国际站
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2026 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号