云主机开年特惠28.8元/年,0元秒杀等你来抢!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 安全隔离版OpenClaw NEW OpenClaw云服务器专属“龙虾“套餐低至1.5折起
  • 天翼云新春焕新季 NEW 云主机开年特惠28.8元/年,0元秒杀等你来抢!
  • 云上钜惠 爆款云主机全场特惠,2核4G只要1.8折起!
  • 中小企业服务商合作专区 国家云助力中小企业腾飞,高额上云补贴重磅上线
  • 出海产品促销专区 NEW 爆款云主机低至2折,高性价比,不限新老速来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

息壤智算

领先开放的智算服务平台,提供算力、平台、数据、模型、应用“五位一体”智算服务体系,构建全流程的AI基础设施能力
AI Store
  • 算力市场
  • 模型市场
  • 应用市场
  • MCP市场
公共算力服务
  • 裸金属
  • 定制裸金属
训推服务
  • 模型开发
  • 训练任务
  • 服务部署
模型推理服务
  • 模型广场
  • 体验中心
  • 服务接入
应用托管
  • 应用实例
科研助手
  • 科研智能体
  • 科研服务
  • 开发机
  • 并行计算
大模型
  • DeepSeek-V3.1
  • DeepSeek-R1-0528
  • DeepSeek-V3-0324
  • Qwen3-235B-A22B
  • Qwen3-32B
智算一体机
  • 智算一体机
模型适配专家服务
  • 模型适配专家服务
算力服务商
  • 入驻算力服务商

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场进入AI Store创新解决方案公有云生态专区智云上海应用生态专区
建站工具
  • 新域名服务
  • SSL证书
  • 翼建站
企业办公
  • 安全邮箱
  • WPS 365 天翼云版
  • 天翼企业云盘(标准服务版)
灾备迁移
  • 云管家2.0
  • 翼备份(SaaS版)

定价

协助您快速了解云产品计费模式、价格详情,轻松预估上云成本
价格计算器
  • 动态测算产品价格
定价策略
  • 快速了解计费模式

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼信创云专区
  • 信创云专区
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
培训与认证
  • 天翼云学堂
  • 天翼云认证
开源社区
  • 魔乐社区
  • OpenTeleDB

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 服务保障
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家
我要反馈
  • 建议与反馈
  • 用户体验官
信息公告
  • 客户公告

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 息壤智算
  • 产品
  • 解决方案
  • 应用商城
  • 定价
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      专属云分布式消息服务Kafka_相关内容
      • DMS for Kafka 消费者poll的优化
        本文主要介绍DMS for Kafka 消费者poll的优化。 场景介绍 在DMS提供的原生Kafka SDK中,消费者可以自定义拉取消息的时长,如果需要长时间的拉取消息,只需要把poll(long)方法的参数设置合适的值即可。但是这样的长连接可能会对客户端和服务端造成一定的压力,特别是分区数较多且每个消费者开启多个线程的情况下。 如图所示,Kafka队列含有多个分区,消费组中有多个消费者同时进行消费,每个线程均为长连接。当队列中消息较少或者没有时,连接不断开,所有消费者不间断地拉取消息,这样造成了一定的资源浪费。 图 Kafka消费者多线程消费模式 优化方案 在开了多个线程同时访问的情况下,如果队列里已经没有消息了,其实不需要所有的线程都在poll,只需要有一个线程poll各分区的消息就足够了,当在polling的线程发现队列中有消息,可以唤醒其他线程一起消费消息,以达到快速响应的目的。如图所示。 这种方案适用于对消费消息的实时性要求不高的应用场景。如果要求准实时消费消息,则建议保持所有消费者处于活跃状态。 图 优化后的多线程消费方案 说明 消费者(Consumer)和消息分区(Partition)并不强制数量相等,Kafka的poll(long)方法帮助实现获取消息、分区平衡、消费者与Kafka broker节点间的心跳检测等功能。 因此在对消费消息的实时性要求不高场景下,当消息数量不多的时候,可以选择让一部分消费者处于wait状态。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        DMS for Kafka 消费者poll的优化
      • KafkaProducer监控
        指标类别 指标 指标名称 指标说明 单位 数据类型 默认聚合方式 topic (topic,kafka的topic监控数据。) id id clientid和ip信息 ENUM LAST topic (topic,kafka的topic监控数据。) topic topic kafka的topic名称 ENUM LAST topic (topic,kafka的topic监控数据。) byteRate 每秒发送字节 每秒发送字节 Byte INT AVG topic (topic,kafka的topic监控数据。) recordErrorRate 每秒错误数 每秒错误数 INT AVG topic (topic,kafka的topic监控数据。) recordRetryRate 每秒重试数 每秒重试数 INT AVG topic (topic,kafka的topic监控数据。) recordSendRate 每秒发送数 每秒发送数 INT AVG topic (topic,kafka的topic监控数据。) seqIds Producer生成序列号 Producer生成序列号 STRING LAST topic (topic,kafka的topic监控数据。) recordSendTotal 总发送次数 总发送次数 INT SUM topic (topic,kafka的topic监控数据。) byteTotal 总发送字节数 总发送字节数 INT SUM KafkaProducer汇总(total,KafkaProducer汇总信息统计。) recordSendTotal 总发送次数 总发送次数 INT SUM KafkaProducer汇总(total,KafkaProducer汇总信息统计。) byteTotal 总发送字节数 总发送字节数 INT SUM 异常 (exception,kafka发送异常信息。) causeType 异常发生类 异常发生类 ENUM LAST 异常 (exception,kafka发送异常信息。) exceptionType 异常类 异常类 ENUM LAST 异常 (exception,kafka发送异常信息。) count 数量 异常数量 INT SUM 异常 (exception,kafka发送异常信息。) message 异常消息 异常消息 STRING LAST 异常 (exception,kafka发送异常信息。) stackTrace 异常堆栈 异常堆栈 CLOB LAST 发送方法(doSendMethod,发送消息方法监控。) topic topic topic ENUM LAST 发送方法(doSendMethod,发送消息方法监控。) concurrentMax 最大并发 最大并发 INT MAX 发送方法(doSendMethod,发送消息方法监控。) errorCount 错误数 错误数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) invokeCount 调用次数 调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) maxTime 最慢时延 最慢时延 INT MAX 发送方法(doSendMethod,发送消息方法监控。) range1 0–10ms 时延在010ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range2 10–100ms 时延在10–100ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range3 100–500ms 时延在100–500ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range4 500–1000ms 时延在500–1000ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range5 1–10s 时延在1–10s范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range6 10sn 时延在10s以上调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) totalTime 总时延 调用总耗时 INT SUM
        来自:
        帮助文档
        应用性能管理
        产品介绍
        指标总览
        消息队列
        KafkaProducer监控
      • 【优惠】正式开放2年7折,3年5折包年折扣
        面向长期稳定客户,我们特别推出了更加优惠的包年订阅选项,旨在通过深度折扣,帮助客户显著降低资源单位成本,优化整体支出。 自2024年11月9日起订购和续订分布式消息服务Kafka产品更长包周期即可享受2年7折,3年5折优惠。 注意 本次包年优惠适用于新资费产品范围,具体支持资源池请参阅
        来自:
        帮助文档
        分布式消息服务Kafka
        服务公告
        2024年
        【优惠】正式开放2年7折,3年5折包年折扣
      • 查看Topic日志
        查看Topic日志 1. 登录管理控制台。 2. 在管理控制台左上角单击,选择Kafka实例所在的区域。 3. 在管理控制台左上角单击,选择“应用服务 > 分布式消息服务Kafka”,进入Kafka总览页面。 4. 在左侧导航栏单击“Kafka实例”,进入Kafka实例列表页面。 5. 单击Kafka实例的名称,进入实例详情页面。 6. 在左侧导航栏选择“日志管理 > Topic日志”,进入“Topic日志”页面。 7. 在“日志查询”页签,查看Topic日志。 如果您需要搜索日志,请参考进入搜索LTS日志页面进行操作。 Topic日志示例如下: plaintext { "level": "INFO", "timestamp": "20241227 17:26:13,361", "message": { "topicPartition": "topic0", "targetState": "OnlinePartition", "leaderAndIsr": "LeaderAndIsr(leader1, leaderEpoch3, isrList(1, 0), leaderRecoveryStateRECOVERED, partitionEpoch3)", "partitionState": "OnlinePartition", "topic": "topic", "type": "ELECTLEADER" } } { "level": "INFO", "timestamp": "20241227 17:26:13,491", "message": { "leader": "1", "startOffset": "0", "topic": "topic", "type": "MAKELEADER", "topicPartition": "topic0", "epoch": "3" } } 参数说明如表1所示。 表1 Topic日志参数说明 参数名称 说明 level Topic日志的等级,只有“INFO”一种等级。 timestamp Topic分区选举Leader或确定Leader的时间。 topicPartition Topic分区。 targetState 目标状态,取值如下: NewPartition:表示分区处于新建状态。 OnlinePartition:表示分区处于正常工作状态。 OfflinePartition:表示分区处于下线状态。 NonExistentPartition:表示分区不存在或被删除。 leaderAndIsr leaderAndIsr请求的信息。 partitionState 分区状态,取值如下: NewPartition:表示分区处于新建状态。 OnlinePartition:表示分区处于正常工作状态。 OfflinePartition:表示分区处于下线状态。 NonExistentPartition:表示分区不存在或被删除。 topic Topic名称。 type Leader所处的阶段,取值如下: ELECTLEADER:选举Leader。 MAKELEADER:确定Leader。 leader Leader所在分区。 startOffset Leader在对应Epoch上写入第一条消息的Offset。 每个Epoch对应一个startOffset。 epoch Leader的选举次数,初始值为0。Leader每发生一次选举,Epoch值加一。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        查看Topic日志
      • 【通知】产品订购、续订包周期2年和3年选项调整为白名单特性
        尊敬的天翼云客户,分布式消息服务Kafka自2025年12月27日起,订购和续订2年、3年选项默认不开放,调整为白名单特性。 调整时间 2025年12月27日 影响范围 所有区域 调整影响 新订购和续订的实例默认不开放2年、3年选项,您可以选择1年包年选项,如仍需要23年包周期选项,请联系技术支持开通后使用。 已购买2年、3年且还在服务期间的实例仍可继续正常使用不受影响。
        来自:
        帮助文档
        分布式消息服务Kafka
        服务公告
        2025年
        【通知】产品订购、续订包周期2年和3年选项调整为白名单特性
      • 【通知】通用型主机规格调整为白名单特性
        分布式消息服务Kafka主机类型通用型规格调整为白名单特性,更多了解请查看计费项产品规格说明。 调整时间 2024年6月25日 影响范围 所有区域 调整影响 新用户默认不开放主机类型通用型规格订购开通,如需要该特性,请联系技术支持开通后使用。 已购买主机类型通用型规格实例的用户,原实例仍可正常使用,续费、扩容等操作不受影响。
        来自:
        帮助文档
        分布式消息服务Kafka
        服务公告
        2024年
        【通知】通用型主机规格调整为白名单特性
      • 管理Smart Connect任务
        启动/暂停Smart Connect任务 暂停任务后,Kafka实例的数据将不会再同步到另一个Kafka实例或者其他云服务中。 1、登录管理控制台。 2、在管理控制台左上角单击,选择Kafka实例所在的区域。 3、在管理控制台左上角单击,选择“应用服务 > 分布式消息服务 Kafka”,进入分布式消息服务Kafka专享版页面。 4、单击Kafka实例名称,进入实例详情页面。 5、在左侧导航栏单击“Smart Connect”,进入Smart Connect任务列表页面。 6、执行以下操作,启动/暂停Smart Connect任务。 启动:在待启动的Smart Connect任务所在行,单击“启动”。 暂停:在待暂停的Smart Connect任务所在行,单击“暂停”,弹出“暂停任务”对话框,单击“确定”。 结束 重启Smart Connect任务 1、登录管理控制台。 2、在管理控制台左上角单击,选择Kafka实例所在的区域。 3、在管理控制台左上角单击,选择“应用服务 > 分布式消息服务 Kafka”,进入分布式消息服务Kafka专享版页面。 4、单击Kafka实例名称,进入实例详情页面。 5、在左侧导航栏单击“Smart Connect”,进入Smart Connect任务列表页面。 6、在待重启的Smart Connect任务所在行,单击“重启”,弹出“重启任务”对话框。 重启Smart Connect任务前,请注意以下两点: Smart Connect任务创建后,修改了源端或者目标端的参数,可能会导致重启失败。 重启Smart Connect任务会重置同步进度,并重新开始同步任务。 7、单击“确定”,完成Smart Connect任务的重启。 当页面左上方显示“成功重启任务xxx”时,表示成功重启Smart Connect任务。 结束
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Kafka数据迁移
        使用Smart Connect迁移Kafka数据
        管理Smart Connect任务
      • 事件源参数
        resourceKey 是否必传 form value template instanceId 是 CONSTANT 分布式消息服务Kafka实例ID。 无 topic 是 CONSTANT 主题名称。 无 groupName 是 CONSTANT 消费组名。 无 initialOffset 是 CONSTANT 消息位点。 最新位点:latest 最早位点:earliest 无 dataFormat 否 CONSTANT 数据格式,不填时默认为JSON编码格式: JSON格式编码:JSON 文本格式编码:Text 二进制格式编码:Binary 无
        来自:
        帮助文档
        事件总线
        用户指南
        事件流
        事件源
        事件源参数
      • 网络带宽利用率监控告警配置指导
        本章介绍Kafka的网络带宽利用率监控告警配置指导。 使用场景建议 Kafka存在持续业务,避免出现客户端批量断链场景。 间断性业务场景不建议使用,会存在误告警。 新创建实例不建议设置告警。 告警通知设置,告警对象、告警组设置 1. 登录管理控制台。 2. 在管理控制台左上角单击,选择目标实例所在的区域。 3. 在管理控制台左上角单击,搜索CES服务进入“云监控服务”界面。 4. 选择“告警 >告警通知”,单击“通知对象 >创建通知对象”,填写要通知的对象以及相关联系方式,如果已存在则跳过该步骤,重复该步骤可创建多个通知对象。 5. 选择“通知组 >创建通知组”,把步骤4创建的告警对象都纳入当前组进行管理。 设置告警规则 1. 选择“告警 >告警规则 >创建告警规则”。 参数 说明 名称 自定义名称 描述 自定义描述 告警类型 指标 资源类型 分布式消息服务 维度 Kafka专享版 Broker节点 监控范围 指定资源 监控对象 选择指定kafka实例的所有broker,可选择多个kafka实例 触发规则 自定义创建 告警策略 若++网络带宽利用率++ 的++原始值连续3次<++xx则++每5分钟 告警一次++ ++根据实际情况设置xx的紧急、重要、次要等告警++ 发送通知 打开 通知方式 通知组 通知组 选择上一步创建的通知组 通知内容模板 都可以选择系统模版 生效时间 每日00:00 – 23:59 触发条件 出现告警恢复告警都选 2. 单击“立即创建”。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        网络带宽利用率监控告警配置指导
      • 通知外送
        配置项 说明 名称 日志外送接口的名称。必须为中文字符、字母、数字、“”、“.”或“”,长度不超过 64 字符。 Kafka节点地址 Kafka服务器的IP(域名)及端口号。例如:192.168.0.1:9200。 Kafka主题 消息投放到Kafka服务器的主题。 Kafka分区 消息投放到的Kafka服务器的分区。Kafka服务器通过主题(topic)、分区(partition)和消费组(consumergroup)三个概念灵活适应各种消息场合,通过提升硬件资源利用率提高系统吞吐量。 以上Kafka相关配置与服务器端保持一致即可。 审计日志模板 设置发送审计日志的模板,具体字段请依据填写说明编辑。 操作日志模板 设置发送操作日志的模板,具体字段请依据填写说明编辑。 告警日志模板 设置发送告警日志的模板,具体字段请依据填写说明编辑。 流量控制日志模板 设置发送流量控制日志的模板,具体字段请依据填写说明编辑。
        来自:
        帮助文档
        数据安全专区
        用户指南
        API安全网关操作指导
        通知外送
      • 旧资费
        说明 分布式消息服务Kafka旧资费根据实例规格分为基础版和高级版,存储空间按照不同类型收费。 目前在 芜湖2、上海7、重庆2、乌鲁木齐27、石家庄20、内蒙6、北京5 资源池开放订购。 实例价格如下: 规格 标准资费(元/月) 标准资费(元/小时) 基础版(100 MB/s) 1300 2.71 高级版(300 MB/s ) 2100 4.38 存储空间价格如下: 存储类型 标准资费(元/G/月) 标准资费(元/G/小时) 普通 IO 0.45 0.0008 高 IO 0.6 0.0013 超高 IO 1.5 0.0032
        来自:
        帮助文档
        分布式消息服务Kafka
        计费说明
        产品资费
        旧资费
      • 创建应用用户
        本节主要介绍分布式消息服务Kafka如何创建应用用户 场景描述 用户:主要用于规定生产消费指定加密主题的策略而需要,例如规定用户A可生产消费加密Topic1,用户B可生产消费加密Topic2,用户C可生产消费加密Topic1、Topic2,则需要为这三个用户创建用户,并分配加密主题权限。 操作步骤 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“用户管理”后进入用户管理页面,点击新建用户。 (5)在 新建用户的窗口中填入集群名、用户、密码、描述,然后保存。 批量创建用户 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“用户管理”后进入应用用户管理页面。 (5)点击“批量创建用户”后,出现如下上传文件界面,文件格式件批量下载说明。 (6)点击“上传”完成批量创建。 下载批量创建用户模板 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“用户管理”后进入用户管理页面。 (5)点击“下载模板”右侧下拉倒三角“下载模板”,内容如下图。 (6)参数说明。 参数 说明 username 用户名 password 密码 description 备注或描述
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        用户管理
        创建应用用户
      • Kafka客户端使用规范
        其他建议 连接数限制:3000 消息大小:不能超过10MB 使用saslssl协议访问Kafka:确保DNS具有反向解析能力,或者在hosts文件配置kafka所有节点ip和主机名映射,避免Kafka client做反向解析,阻塞连接建立。 磁盘容量申请超过业务量 副本数的2倍,即保留磁盘空闲50%左右。 业务进程JVM内存使用确保无频繁FGC,否则会阻塞消息的生产和消费。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        Kafka客户端使用规范
      • 连接已开启SASL的Kafka专享实例
        创建实例时开启SASLSSL访问,则数据加密传输,安全性更高。 由于安全问题,支持的加密套件为TLSECDHEECDSAWITHAES128CBCSHA256,TLSECDHERSAWITHAES128CBCSHA256和TLSECDHERSAWITHAES128GCMSHA256。 本节介绍如何使用开源的Kafka客户端访问开启SASL的Kafka专享实例的方法。 说明: 使用SASL方式连接Kafka实例时,为了客户端能够快速解析实例的Broker,建议在客户端所在主机的“/etc/hosts”文件中配置host和IP的映射关系。 其中,IP地址必须为实例连接地址(Broker地址),host为每个实例主机的名称(您可以自定义主机的名称,但不能重复)。 例如: 10.154.48.120 server01 10.154.48.121 server02 10.154.48.122 server03 前提条件 已配置正确的安全组。 访问开启SASL的Kafka专享实例时,支持VPC内访问。实例需要配置正确的安全组规则,具体安全组配置要求,请参考表32。 已获取连接Kafka专享版实例的地址。 使用VPC内访问,实例端口为9093,实例连接地址获取如下图。 获取VPC内访问Kafka专享实例的连接地址(实例已开启SASL) Kafka专享实例已创建Topic,否则请提前创建Topic。 已下载client.truststore.jks证书。如果没有,在控制台单击Kafka实例名称,进入实例详情页面,在“基本信息 > 高级配置 > Kafka SASLSSL”所在行,单击 。下载压缩包后解压,获取压缩包中的客户端证书文件:client.truststore.jks。 已下载Kafka命令行工具1.1.0版本或者Kafka命令行工具2.3.0版本,确保Kafka实例版本与命令行工具版本相同。 已在Kafka命令行工具的使用环境中安装Java Development Kit 1.8.111或以上版本,并完成环境变量配置。 命令行模式连接实例 以下操作命令以Linux系统为例进行说明。 解压Kafka命令行工具。 进入文件压缩包所在目录,然后执行以下命令解压文件。 tar zxf [kafkatar] 其中,[kafkatar]表示命令行工具的压缩包名称。 例如: tar zxf kafka2.111.1.0.tgz 修改Kafka命令行工具配置文件。 在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。 sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required username"" password""; sasl.mechanismPLAIN security.protocolSASLSSL ssl.truststore.location/opt/kafka2.111.1.0/config/client.truststore.jks ssl.truststore.passworddms@kafka ssl.endpoint.identification.algorithm 参数说明: username和password为创建Kafka专享实例过程中开启SASLSSL时填入的用户名和密码。 ssl.trustore.location配置为client.truststore.jks证书的存放路径。注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中拷贝路径时的“”,否则客户端获取证书失败。 ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka。 ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。 进入Kafka命令行工具的“/bin”目录下。 注意,Windows系统下需要进入“/bin/windows”目录下。 执行如下命令进行生产消息。 ./kafkaconsoleproducer.sh brokerlist ${连接地址} topic ${Topic名称} producer.config ../config/producer.properties 参数说明如下: 连接地址:从前提条件获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 如下示例,Kafka实例连接地址为“10.xxx.xxx.202:9093,10.xxx.xxx.197:9093,10.xxx.xxx.68:9093”。 执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin]
        来自:
        帮助文档
        专属云分布式消息服务Kafka
        用户指南
        连接已开启SASL的Kafka专享实例
      • 触发器事件消息格式
        参数 类型 示例值 描述 id string eca534636baf4d568f86cbdb748208ed 事件ID。标识事件的唯一值。 source string ctyun.faas.trigger.kafka 事件源。Kafka触发器固定为ctyun.faas.trigger.kafka。 specversion string 1.0 CloudEvents协议版本。 type string kafka:topic:sendmessage 事件类型。 datacontenttype string application/json 参数data的内容形式。 subject string kafkatriggermqbjvsezbpdialtest:testforfaas 事件主体。格式为[SourceName]:[消息topic]。 time string 20250522T02:04:16Z 消息被触发的时间。 data object Kafka触发器独有消息格式,详细参见下文RawData描述。
        来自:
        帮助文档
        函数计算
        用户指南
        事件触发
        触发器事件消息格式
      • 购买类常见问题
        本节介绍分布式消息服务Kafka计费类常见问题 可以购买哪些版本? 不同资源池可购买的版本、规格不一致,具体请查看产品规格说明 (1)华东1、华北2、西南1、华南2、上海36、青岛20、长沙42、南昌5、武汉41、杭州7、西南2贵州、太原4、郑州5、西安7、呼和浩特3 可以选购Kafka引擎,选择主机类型、节点规格、节点数和存储空间。 (2)芜湖2、上海7、重庆2、乌鲁木齐27、石家庄20、内蒙6、北京5 可以选购高级版和基础版两个版本。 到期后如何续费? 在集群列表中点击“续费”,进入购买时长页面,购买成功后自动续费。 手动续订:对于包年/包月订购的分布式缓存服务,用户在资源到期前进行续费操作,可以延长原有资源到期时间,避免资源到期后冻结或超过保留期后被系统回收。详细操作请参考费用中心续订管理手动续订。 自动续订:自动续订仅针对采用包月、包年计费模式的资源,详细操作请参考费用中心续订管理自动续订。 产品订购时可选资源池节点不一致? 已上线资源池节点的剩余容量达到一定比例后,为确保老客户权益,将不再面向新客户开放,产品订购时的可选资源节点范围以实际为准。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        计费与购买类
        购买类常见问题
      • Java客户端接入示例
        生产消息 import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.Future; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.SslConfigs; public class KafkaProducerDemo { public static void main(String args[]) { //加载kafka.properties Properties kafkaProperties JavaKafkaConfigurer.getKafkaProperties(); Properties props new Properties(); //设置接入点,请通过控制台获取对应Topic的接入点 props.put(ProducerConfig.BOOTSTRAPSERVERSCONFIG, kafkaProperties.getProperty("bootstrap.servers")); //设置SSL根证书的路径,请记得将XXX修改为自己的路径 props.put(SslConfigs.SSLTRUSTSTORELOCATIONCONFIG, kafkaProperties.getProperty("ssl.truststore.location")); //根证书store的密码,保持不变 props.put(SslConfigs.SSLTRUSTSTOREPASSWORDCONFIG, "c24f5210"); //接入协议,目前支持使用SSL协议接入 props.put(CommonClientConfigs.SECURITYPROTOCOLCONFIG, "SSL"); //Kafka消息的序列化方式 props.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //请求的最长等待时间 props.put(ProducerConfig.MAXBLOCKMSCONFIG, 30 1000); //设置客户端内部重试次数 props.put(ProducerConfig.RETRIESCONFIG, 5); //设置客户端内部重试间隔 props.put(ProducerConfig.RECONNECTBACKOFFMSCONFIG, 3000); //hostname校验改成空 props.put(SslConfigs.SSLENDPOINTIDENTIFICATIONALGORITHMCONFIG, ""); //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可; //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个 KafkaProducer producer new KafkaProducer (props); //构造一个Kafka消息 String topic kafkaProperties.getProperty("topic"); //消息所属的Topic,请在控制台申请之后,填写在这里 String value "this is the message's value"; //消息的内容 try { //批量获取 futures 可以加快速度, 但注意,批量不要太大 List > futures new ArrayList >(128); for (int i 0; i kafkaMessage new ProducerRecord (topic, value + ": " + i); Future metadataFuture producer.send(kafkaMessage); futures.add(metadataFuture); } producer.flush(); for (Future future: futures) { //同步获得Future对象的结果 try { RecordMetadata recordMetadata future.get(); System.out.println("Produce ok:" + recordMetadata.toString()); } catch (Throwable t) { t.printStackTrace(); } } } catch (Exception e) { System.out.println("error occurred"); e.printStackTrace(); } } }
        来自:
        帮助文档
        分布式消息服务Kafka
        开发指南
        Java
        Java客户端接入示例
      • Java开发环境搭建
        开发环境 Maven Apache Maven 2.5及以上版本,可至Maven官方网站下载。 JDK Java Development Kit 1.8及以上版本,可至Oracle官方网站下载。 安装后注意配置JAVA的环境变量。 IntelliJ IDEA 获取并安装IntelliJ IDEA,可至IntelliJ IDEA官方网站下载。 操作步骤 1. 下载Demo包kafkajavademo.zip 下载后解压,有如下文件: 表1 KAFKA Demo文件清单 文件名 路径 说明 ::: JavaKafkaConfigurer.java .srcmainjavajavaDemo 读取Kafka配置文件。 KafkaConsumerDemo.java .srcmainjavajavaDemo 消费消息。 KafkaMultiConsumerDemo.java .srcmainjavajavaDemo 批量消费消息。 KafkaProducerDemo.java .srcmainjavajavaDemo 生产消息。 kafka.properties .srcmainresources kafka配置参数 pom.xml . maven配置文件,包含Kafka客户端引用。 2. 打开IntelliJ IDEA,导入Demo。Demo是一个Maven构建的Java工程,因此需要配置JDK环境,以及IDEA的Maven插件。 3. 修改Kafka配置信息。 修改kafka.properties 修改demo文件中的属性变量
        来自:
        帮助文档
        分布式消息服务Kafka
        开发指南
        Java
        Java开发环境搭建
      • 连接未开启SASL的Kafka实例
        命令行模式连接实例 以下操作命令以Linux系统为例进行说明: 步骤 1 解压Kafka命令行工具。 进入文件压缩包所在目录,然后执行以下命令解压文件。 tar zxf [kafkatar] 其中, [kafkatar] 表示命令行工具的压缩包名称。 例如: tar zxf kafka2.122.7.2.tgz 步骤 2 进入Kafka命令行工具的“/bin”目录下。 注意,Windows系统下需要进入“/bin/windows”目录下。 步骤 3 执行如下命令进行生产消息。 ./kafkaconsoleproducer.sh brokerlist {连接地址} topic {Topic名称} 参数说明如下: 连接地址:从前提条件中获取的连接地址,如果是公网访问,请使用“公网连接地址”,如果是VPC内访问,请使用“内网连接地址”,请根据实际情况选择。 Topic名称:Kafka实例下创建的Topic名称。如果Kafka实例开启了自动创建Topic功能,此参数值可以填写已创建的Topic名称,也可以填写未创建的Topic名称。 本文以公网连接为例,获取的Kafka实例公网连接地址为“10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094”。执行完命令后输入内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin] ./kafkaconsoleproducer.sh brokerlist 10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094 topic topicdemo >Hello >DMS >Kafka! >^C[root@ecskafka bin] 如需停止生产使用Ctrl+C命令退出。 步骤 4 执行如下命令消费消息。 ./kafkaconsoleconsumer.sh bootstrapserver {连接地址} topic {Topic名称} group ${消费组名称} frombeginning 参数说明如下: 连接地址:从前提条件中获取的连接地址,如果是公网访问,请使用“公网连接地址”,如果是VPC内访问,请使用“内网连接地址”,请根据实际情况选择。 Topic名称:Kafka实例下创建的Topic名称。 消费组名称:根据您的业务需求,设定消费组名称。 如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败 。 消费组名称开头包含特殊字符,例如下划线“”、
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        连接Kafka实例
        连接未开启SASL的Kafka实例
      • SSL接入性能优化
        本文主要介绍消息队列 Kafka 通过SSL接入的最佳实践,从而帮助您更好的使用该产品。 文中的最佳实践基于消息队列 Kafka 的 Java 客户端;对于其它语言的客户端,其基本概念与思想是通用的,但实现细节可能有差异,仅供参考。 背景 云消息队列 Kafka 版实例如果选择SASLSSL接入时,可能会出现性能较差的情况。可以通过在客户端指定密码套件的方式,手动选择性能和安全性都较高的套件进行TLS通讯。 SSL握手时客户端发送Hello Client 并带上客户端支持的密码套件,服务端收到握手请求后,获取客户端带来的密码套件和服务端支持的密码套件取交集。密码套件加载顺序受客户端jdk版本影响,不同jdk版本,密码套件顺序不一样,可能会导致性能和安全性等无法保证。因此为了保证性能,SSL连接时客户端可以指定密码套件。 推荐的性能和安全性较高的密码套件 TLSECDHERSAWITHAES256GCMSHA384 TLSECDHERSAWITHAES128GCMSHA256 步骤 1.先按照SASLSSL协议接入Kafka,SASLSSL接入可参考文档 SASLSSL接入点接入 2.在此基础上增加 ssl.cipher.suites 配置 客户端关键参数 java Properties props new Properties(); props.put(ProducerConfig.BOOTSTRAPSERVERSCONFIG, BROKERADDR); props.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, StringSerializer.class.getName()); props.put(CommonClientConfigs.SECURITYPROTOCOLCONFIG, "SASLSSL"); props.put("sasl.mechanism", "SCRAMSHA512"); props.put(SaslConfigs.SASLJAASCONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username"testuser" password"f6eca4dbc78df78d63fba980e448185f";");//注:上面的密码f6eca4dbc78df78d63fba980e448185f,为用户管理里面创建用户时填入的密码进行md5的结果,md5取32位小写 props.put("ssl.truststore.location","/kafka/client.truststore.jks"); props.put("ssl.truststore.password","sJses2tin1@23"); props.put("ssl.endpoint.identification.algorithm",""); props.put("ssl.cipher.suites","TLSECDHERSAWITHAES256GCMSHA384,TLSECDHERSAWITHAES128GCMSHA256");//密码套件支持多个,用半角逗号分开
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        SSL接入性能优化
      • 查看示例代码
        本文主要介绍查看示例代码。 本章节指导您在控制台查看Java、Go和Python生产消费消息的示例代码。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例的名称,进入实例详情页面。 步骤 5 选择“Topic管理”页签,显示已创建的Topic详情。 步骤 6 单击“查看示例代码”,弹出“生产消费示例代码”对话框。 查看Java、Go和Python生产消费消息的示例代码。示例代码区分是否开启SASLSSL认证,“接入类型”为“PLAINTEXT”时,表示未开启SASLSSL认证。“接入类型”为“SASLSSL”时,表示已开启SASLSSL认证。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        查看示例代码
      • 删除Topic
        本节主要介绍分布式消息服务Kafka的删除Topic步骤 场景描述 在以下场景中,可以考虑删除Kafka主题: 主题不再使用:当一个主题不再被使用或者不再需要时,可以选择删除该主题。这可能是因为业务需求变化、数据不再有效或者主题被合并到其他主题中等原因。 数据保留策略变更:Kafka中可以设置数据保留策略,即数据在主题中的保留时间或者大小。当需要更改数据保留策略时,可能需要删除旧的主题并创建一个新的主题来应用新的策略。 清理测试数据:在测试环境中,经常需要清理旧的测试数据,以确保环境的可用空间和性能。当测试数据不再需要时,可以删除相应的主题来释放资源。 主题配置错误:在创建主题时,可能会出现配置错误或者误操作导致主题创建失败或者无法正常使用。在这种情况下,可以删除有问题的主题,并重新创建正确的主题配置。 需要注意的是,在删除主题之前,需要确保主题中的数据已经备份或者不再需要。删除主题将永久删除主题中的所有数据,并且无法恢复。因此,在删除主题之前,建议先进行备份或者确认数据不再需要。 操作步骤 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“Topic管理”后进入Topic管理页面。 (5)在Topic所在行,点击其右侧的“更多按钮”,再单击“删除”,并选择确定。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        删除Topic
      • 查看磁盘使用量
        本文主要介绍查看磁盘使用量。 本章节指导您在控制台查看每个代理上磁盘的使用量。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例的名称,进入实例的“基本信息”页签。 步骤 5 在“磁盘存储统计”页签,查看磁盘的使用量。 图 查看磁盘的使用量 页面支持从以下三个方面对代理中的Topic磁盘使用量进行查询:使用量排名前xx 个、使用量达到xx GB和使用占比达 xx %。 在页面右上角,单击“查看监控数据”,跳转到云监控服务的监控指标页面。在监控指标页面,您可以查看Kafka实例的各项监控指标。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        实例管理
        查看磁盘使用量
      • 产品规格
        Kafka实例规格参考 kafka.2u4g.cluster,三个代理 Kafka客户端连接数在3000以内,消费组个数在60个以内,业务TPS为100000以内时推荐选用。 kafka.4u8g.cluster,三个代理 Kafka客户端连接数在10000以内,消费组个数在300个以内,业务TPS为300000以内时推荐选用。 kafka.8u16g.cluster,三个代理 Kafka客户端连接数在20000以内,消费组个数在600个以内,业务TPS为600000以内时推荐选用。 kafka.12u24g.cluster,三个代理 Kafka客户端连接数在20000以内,消费组个数在600个以内,业务TPS为900000以内时推荐选用。 kafka.16u32g.cluster,三个代理 Kafka客户端连接数在20000以内,消费组个数在600个以内,业务TPS为1200000以内时推荐选用。 Kafka实例的存储空间估算参考 Kafka实例支持多副本存储,存储空间包含所有副本存储空间总和,您在创建Kafka实例,选择初始存储空间时,建议根据业务消息体积预估以及副本数量选择合适的存储空间。 例如:业务消息体积预估100GB,则磁盘容量最少应为100GB副本数 + 预留磁盘大小100GB。 Kafka实例支持对存储进行扩容,根据业务增长,随时扩容,节约成本。 Kafka实例Topic数量计算 Kafka实例对Topic分区数之和设置了上限,当达到上限之后,用户无法继续创建Topic。 所以,Topic数量和实例分区数上限、每个Topic的分区数有关,其中,每个Topic分区数可在创建Topic时设置,如下图,实例分区数上限参考上表。 图Topic的分区数 kafka.2u4g.cluster 3 broker实例的分区数上限为750。 如果该实例下每个Topic的分区个数都为3,则Topic个数为750/3250个。 如果该实例下每个Topic的分区个数都为1,则Topic个数为750/1750个。
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        产品规格
      • 查看分区状态
        介绍分布式消息服务Kafka查看分区状态功能操作内容。 场景描述 Kafka查看分区状态的场景描述如下: 监控和故障排查:通过查看分区状态,可以了解每个分区的健康状况和数据处理情况。如果某个分区出现延迟或者数据丢失等问题,可以及时发现并进行故障排查和修复。 性能优化:通过查看分区状态,可以了解每个分区的负载情况和数据处理速度。如果某个分区负载过高或者处理速度较慢,可以采取相应的措施进行性能优化,如增加分区数量或者优化消费者的消费能力。 容量规划:通过查看分区状态,可以了解每个分区的数据大小和数据增长趋势。根据分区状态,可以进行容量规划,确保有足够的存储空间来存储数据,并预测未来的数据增长趋势。 数据迁移和重平衡:当需要进行数据迁移或者重平衡时,查看分区状态可以帮助确定合适的迁移方案和平衡策略。通过了解每个分区的状态和负载情况,可以更好地规划和执行数据迁移和重平衡操作。 容错和冗余管理:通过查看分区状态,可以了解每个分区的副本分布情况和数据冗余情况。如果某个分区的副本数量不足或者副本分布不均衡,可以采取相应的措施来提高数据的容错性和冗余能力。 总之,Kafka查看分区状态的场景包括监控和故障排查、性能优化、容量规划、数据迁移和重平衡,以及容错和冗余管理等,以确保系统的稳定性、性能和可靠性。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        查看分区状态
      • 连接已开启SASL的Kafka实例
        命令行模式连接实例 以下操作命令以Linux系统为例进行说明。 步骤 1 在客户端所在主机的“/etc/hosts”文件中配置host和IP的映射关系,以便客户端能够快速解析实例的Broker。 其中,IP地址必须为实例连接地址(从前提条件获取的连接地址),host为每个实例主机的名称(主机的名称由您自行设置,但不能重复)。 例如: 10.154.48.120 server01 10.154.48.121 server02 10.154.48.122 server03 步骤 2 解压Kafka命令行工具。 进入文件压缩包所在目录,然后执行以下命令解压文件。 tar zxf [kafkatar] 其中, [kafkatar] 表示命令行工具的压缩包名称。 例如: tar zxf kafka2.122.7.2.tgz 步骤 3 根据SASL认证机制,修改Kafka命令行工具配置文件。 1、PLAIN机制: 在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。 sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required username"" password""; sasl.mechanismPLAINsecurity.protocolSASLSSL ssl.truststore.location{ssltruststorepath} ssl.truststore.passworddms@kafka ssl.endpoint.identification.algorithm 参数说明: username和password为创建Kafka实例过程中开启SASLSSL时填入的用户名和密码,或者创建SASLSSL用户时设置的用户名和密码。 ssl.truststore.location配置为client.jks证书的存放路径。注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中复制路径时的“”,否则客户端获取证书失败。 ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka 。 ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要 保持关闭状态,必须设置为空 。 2、SCRAMSHA512机制: 在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。 sasl.jaas.configorg.apache.kafka.common.security.scram.ScramLoginModule required username"" password""; sasl.mechanismSCRAMSHA512 security.protocolSASLSSL ssl.truststore.location{ssltruststorepath} ssl.truststore.passworddms@kafka ssl.endpoint.identification.algorithm 参数说明: username和password为创建Kafka实例过程中开启SASLSSL时填入的用户名和密码,或者创建SASLSSL用户时设置的用户名和密码。 ssl.truststore.location配置为client.jks证书的存放路径。注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中复制路径时的“”,否则客户端获取证书失败。 ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka 。 ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要 保持关闭状态,必须设置为空 。 步骤 4 进入Kafka命令行工具的“/bin”目录下。 注意,Windows系统下需要进入“/bin/windows”目录下。 步骤 5 执行如下命令进行生产消息。 ./kafkaconsoleproducer.sh brokerlist {连接地址} topic {Topic名称} producer.config ../config/producer.properties 参数说明如下: 连接地址:从前提条件获取的连接地址,如果是公网访问,请使用“公网连接地址”,如果是VPC内访问,请使用“内网连接地址”,请根据实际情况选择。 Topic名称:Kafka实例下创建的Topic名称。如果Kafka实例开启了自动创建Topic功能,此参数值可以填写已创建的Topic名称,也可以填写未创建的Topic名称。 本文以公网访问为例,Kafka实例连接地址为“10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095”。 执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin]
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        连接Kafka实例
        连接已开启SASL的Kafka实例
      • 重启Kafka Manager
        本文主要介绍 重启Kafka Manager。 操作场景 当Kafka Manager无法登录或者无法使用时,例如下图中的报错,可以通过重启Kafka Manager,使Kafka Manager恢复正常。 图 报错信息 说明 重启Kafka Manager不会影响业务。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 通过以下任意一种方法,重启Kafka Manager。 在待重启Manager的Kafka实例所在行,单击“更多 > 重启Manager”。 单击Kafka实例名称,进入实例详情页面。单击右上角的“更多 > 重启Manager”。 步骤 5 单击“确定”。 您可以在实例的“后台任务管理”页面,查看当前任务的操作进度。任务状态为“成功”,表示重启成功。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        实例管理
        重启Kafka Manager
      • Go
        消费消息 发送以下命令消费消息。 go run modvendor consumer/consumer.go 消费消息示例代码如下: package main import ( "encoding/json" "fmt" "github.com/confluentinc/confluentkafkago/kafka" "os" "path/filepath" ) type KafkaConfig struct { Topic string json:"topic" Topic2 string json:"topic2" GroupId string json:"group.id" BootstrapServers string json:"bootstrap.servers" SecurityProtocol string json:"security.protocol" } // config should be a pointer to structure, if not, panic func loadJsonConfig() KafkaConfig { workPath, err : os.Getwd() if err ! nil { panic(err) } configPath : filepath.Join(workPath, "conf") fullPath : filepath.Join(configPath, "kafka.json") file, err : os.Open(fullPath); if (err ! nil) { msg : fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err) panic(msg) } defer file.Close() decoder : json.NewDecoder(file) var config &KafkaConfig{} err decoder.Decode(config); if (err ! nil) { msg : fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err) panic(msg) } json.Marshal(config) return config } func doInitConsumer(cfg KafkaConfig) kafka.Consumer { fmt.Print("init kafka consumer, it may take a few seconds to init the connectionn") //common arguments var kafkaconf &kafka.ConfigMap{ "api.version.request": "true", "auto.offset.reset": "latest", "heartbeat.interval.ms": 3000, "session.timeout.ms": 30000, "max.poll.interval.ms": 120000, "fetch.max.bytes": 1024000, "max.partition.fetch.bytes": 256000} kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers); kafkaconf.SetKey("group.id", cfg.GroupId) switch cfg.SecurityProtocol { case "PLAINTEXT" : kafkaconf.SetKey("security.protocol", "plaintext"); case "SSL": kafkaconf.SetKey("security.protocol", "ssl"); kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem") case "SASLSSL": kafkaconf.SetKey("security.protocol", "saslssl"); kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem"); kafkaconf.SetKey("sasl.username", cfg.SaslUsername); kafkaconf.SetKey("sasl.password", cfg.SaslPassword); kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism) case "SASLPLAINTEXT": kafkaconf.SetKey("security.protocol", "saslplaintext"); kafkaconf.SetKey("sasl.username", cfg.SaslUsername); kafkaconf.SetKey("sasl.password", cfg.SaslPassword); kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism) default: panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true)) } consumer, err : kafka.NewConsumer(kafkaconf) if err ! nil { panic(err) } fmt.Print("init kafka consumer successn") return consumer; } func main() { // Choose the correct protocol cfg : loadJsonConfig(); consumer : doInitConsumer(cfg) consumer.SubscribeTopics([]string{cfg.Topic, cfg.Topic2}, nil) for { msg, err : consumer.ReadMessage(1) if err nil { fmt.Printf("Message on %s: %sn", msg.TopicPartition, string(msg.Value)) } else { // The client will //automatically try to recover from all errors. fmt.Printf("Consumer error: %v (%v)n", err, msg) } } consumer.Close() }
        来自:
        帮助文档
        分布式消息服务Kafka
        开发指南
        Go
      • 自建Apache Kafka事件源
        事件示例 plaintext { "id": "601f1f756f8c4d97941e5c04f7add896", "specversion": "1.0", "source": "apache:kafka", "type": "kafka:Topic:SendMessage", "subject": "apache:kafka:bb9fdb42056xxxxxx0242ac110002:dab4124510dd4xxxxxxxxx5c6a6db69:topic:streamsource", "datacontenttype": "application/json", "time": "20251030T07:16:07.692257848Z", "data": { "topic": "streamsource", "partition": 0, "offset": 36, "value": "msg17" }, "ctyunaccountid": "dab4124510dd4xxxxxxxxx5c6a6db69", "ctyunregion": "bb9fdb42056xxxxxx0242ac110002" } data字段包含的参数解释如下表所示: 参数 类型 示例值 描述 topic String source1 Topic名称。 offset Integer 1 消费位点。 key String test 消息Key值。 value Object Hello,Kafka! 消息体,默认以JSON格式编码。
        来自:
        帮助文档
        事件总线
        用户指南
        事件流
        事件源
        自建Apache Kafka事件源
      • 将Oracle同步到Kafka
        参数 描述 Topic 选择目标端需要同步到的Topic。 投送到kafka的数据格式 选择Oracle投送到kafka的数据格式。 Avro:可以显示Avro二进制编码,高效获取数据。 Json:为Json消息格式。详细格式可参考Kafka消息格式。 同步对象 同步对象支持表级同步,您可以根据业务场景选择对应的数据进行同步。 选择对象的时候支持搜索,以便您快速选择需要的数据库对象。
        来自:
        帮助文档
        数据库复制
        用户指南
        实时同步
        自建到自建
        将Oracle同步到Kafka
      • 连接和查看Kafka Manager
        Kafka Manager是开源的Kafka集群管理工具,需要通过浏览器才能访问Kafka Manager的地址。在Kafka Manager页面,您可以查看Kafka集群的监控、节点等信息。 前提条件 已正确配置安全组。 登录Kafka Manager 创建一台与Kafka专享实例相同VPC和相同安全组的Windows服务器。 获取Kafka Manager地址。 在实例详情信息页面,获取Kafka Manager的地址。 在浏览器中输入Kafka Manager的地址,进入Kafka Manager登录页面。 输入创建实例时设置的Kafka Manager用户名和密码,即可管理Kafka集群。 查看Kafka Manager 在进入Kafka Manger集群管理页面后,您可以查看Kafka集群的监控、节点等信息。 集群信息页 单击Clusters中的集群列表,即可进入集群信息页。如图所示。 − 图中①区域表示功能导航栏 Cluster: 集群,统计集群列表和集群详情。 Brokers: 节点,统计当前集群中各节点的状态信息。 Topic: 队列,统计当前集群中的kafka队列。 Preferred Replica Election: 强制进行一次队列leader的最优选举(不建议用户操作)。 Reassign Partitions: 进行分区副本的重分配(不建议用户操作)。 Consumers: 统计集群中的消费组状态。 − 图中②区域表示集群信息统计,包含集群的Topic数和集群的节点数。 集群信息页 集群所有节点统计页 单击功能导航栏中的Brokers,即可进入节点统计页。如图54所示。 − 图中①区域节点列表,包含总的字节流入和字节流出。 − 图中②集群监控信息。 所有节点统计页 具体节点统计页 单击id列表中具体的Broker,即可查看对应节点的统计信息。如图55所示。 − 图中①区域表示对应节点总的统计信息,包括队列数、分区数、分区leader数、消息速率占比、写入字节占比以及流出字节占比。 − 图中②区域表示节点监控信息。 具体Broker信息 查看实例的Topic 在导航栏选择Topic,并在下拉列表中选择List。页面如图56所示,展示了队列列表以及分区数等。 列表中以“”开头的队列为内部队列,严禁操作,否则可能导致业务问题。 查看实例的Topic 队列详情页 单击具体的Topic名称,进入如图57所示页面。 − 图中①区域表示队列基本信息,包括副本数(Replication),分区数(Number of Partitions),消息数(Sum of partition offsets)等。 − 图中②区域表示节点与队列分区的对应关系。 − 图中③区域表示该队列的消费组列表。单击消费组名称可进入该消费组的详情页。 − 图中④区域表示队列的配置信息。详情参考kafka队列官方配置文档(
        来自:
        帮助文档
        专属云分布式消息服务Kafka
        用户指南
        连接和查看Kafka Manager
      • 删除用户
        介绍分布式消息服务Kafka删除用户功能操作内容。 场景描述 Kafka删除用户的场景描述如下: 用户离职或不再需要访问权限:当用户离开组织或不再需要访问Kafka集群时,可以删除其用户账户。这可以确保已离职的用户无法再访问和操作Kafka,提高安全性和合规性。 角色调整或合并:在某些情况下,可能需要对用户角色进行调整或合并。如果某个角色不再需要或与其他角色重复,可以删除相关的用户账户和角色,以简化管理和维护。 用户账户过期或失效:如果用户账户的有效期已过或由于某种原因失效,可以删除该用户账户。这有助于清理无效的用户账户,减少安全风险和资源浪费。 安全审计和合规要求:根据安全审计和合规要求,可能需要删除某些用户账户。例如,当用户账户存在安全风险或违反合规规定时,需要及时删除相关账户以保护系统安全和数据隐私。 数据隔离和资源管理:在多租户环境中,当某个租户不再需要使用Kafka集群时,可以删除其相关的用户账户。这有助于释放资源,提高资源利用率和性能。 操作步骤 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“用户管理”后进入应用用户管理页面,点击指定用户其右侧的“删除”按钮。 注意事项 Kafka删除用户时需要注意以下事项: 确认用户身份:在删除用户之前,确保要删除的用户的身份是正确的。验证用户的身份可以防止误删除或删除错误的用户账户。 备份和迁移数据:如果用户账户中存在重要的数据或信息,建议在删除用户之前进行数据备份和迁移。这可以确保数据的安全性和完整性,并防止数据的丢失或泄露。 通知相关人员:在删除用户之前,及时通知相关的管理人员或团队成员。这有助于协调和沟通,确保删除用户的操作得到充分的授权和确认。 关注系统性能:删除用户时,要注意对Kafka集群的性能影响。大规模的用户删除操作可能会对系统造成一定的负载和延迟,因此在合适的时间窗口进行删除操作,以减少对业务的影响。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        用户管理
        删除用户
      • 1
      • ...
      • 5
      • 6
      • 7
      • 8
      • 9
      • ...
      • 644
      跳转至
      推荐热词
      天翼云运维管理审计系统天翼云云服务平台云服务备份云日志服务应用运维管理云手机云电脑天翼云云hbase数据库电信云大数据saas服务电信云大数据paas服务轻量型云主机天翼云客户服务电话应用编排服务天翼云云安全解决方案云服务总线CSB天翼云服务器配置天翼云联邦学习产品天翼云云安全天翼云企业上云解决方案天翼云产品天翼云视频云存储

      天翼云最新活动

      安全隔离版OpenClaw

      OpenClaw云服务器专属“龙虾“套餐低至1.5折起

      天翼云新春焕新季

      云主机开年特惠28.8元/年,0元秒杀等你来抢!

      云上钜惠

      爆款云主机全场特惠,2核4G只要1.8折起!

      中小企业服务商合作专区

      国家云助力中小企业腾飞,高额上云补贴重磅上线

      出海产品促销专区

      爆款云主机低至2折,高性价比,不限新老速来抢购!

      天翼云奖励推广计划

      加入成为云推官,推荐新用户注册下单得现金奖励

      产品推荐

      弹性云主机 ECS

      多活容灾服务

      GPU云主机

      轻量型云主机

      弹性高性能计算 E-HPC

      AI Store

      模型推理服务

      应用托管

      科研助手

      推荐文档

      产品类

      控制中心

      产品功能

      玩转天翼云①:如何获得管理员权限的方法

      入门教程⑤:策略路由

      产品功能

      开通备案

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 天翼云国际站
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2026 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号