云主机开年特惠28.8元/年,0元秒杀等你来抢!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 安全隔离版OpenClaw NEW OpenClaw云服务器专属“龙虾“套餐低至1.5折起
  • 天翼云新春焕新季 NEW 云主机开年特惠28.8元/年,0元秒杀等你来抢!
  • 云上钜惠 爆款云主机全场特惠,2核4G只要1.8折起!
  • 中小企业服务商合作专区 国家云助力中小企业腾飞,高额上云补贴重磅上线
  • 出海产品促销专区 NEW 爆款云主机低至2折,高性价比,不限新老速来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

息壤智算

领先开放的智算服务平台,提供算力、平台、数据、模型、应用“五位一体”智算服务体系,构建全流程的AI基础设施能力
AI Store
  • 算力市场
  • 模型市场
  • 应用市场
  • MCP市场
公共算力服务
  • 裸金属
  • 定制裸金属
训推服务
  • 模型开发
  • 训练任务
  • 服务部署
模型推理服务
  • 模型广场
  • 体验中心
  • 服务接入
应用托管
  • 应用实例
科研助手
  • 科研智能体
  • 科研服务
  • 开发机
  • 并行计算
大模型
  • DeepSeek-V3.1
  • DeepSeek-R1-0528
  • DeepSeek-V3-0324
  • Qwen3-235B-A22B
  • Qwen3-32B
智算一体机
  • 智算一体机
模型适配专家服务
  • 模型适配专家服务
算力服务商
  • 入驻算力服务商

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场进入AI Store创新解决方案公有云生态专区智云上海应用生态专区
建站工具
  • 新域名服务
  • SSL证书
  • 翼建站
企业办公
  • 安全邮箱
  • WPS 365 天翼云版
  • 天翼企业云盘(标准服务版)
灾备迁移
  • 云管家2.0
  • 翼备份(SaaS版)

定价

协助您快速了解云产品计费模式、价格详情,轻松预估上云成本
价格计算器
  • 动态测算产品价格
定价策略
  • 快速了解计费模式

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼信创云专区
  • 信创云专区
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
培训与认证
  • 天翼云学堂
  • 天翼云认证
开源社区
  • 魔乐社区
  • OpenTeleDB

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 服务保障
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家
我要反馈
  • 建议与反馈
  • 用户体验官
信息公告
  • 客户公告

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 息壤智算
  • 产品
  • 解决方案
  • 应用商城
  • 定价
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      消息队列Kafka版_相关内容
      • 创建的Kafka实例是集群模式么?
        本节介绍分布式消息服务Kafka集群模式 是的,Kafka实例通常是以集群模式创建的。Kafka集群由多个Kafka节点组成,每个节点负责存储和处理消息。集群模式可以提供高可用性、容错性和扩展性。 Kafka集群中的每个节点都可以承担多个分区的领导者或副本角色,从而实现数据的冗余和故障恢复。通过复制机制,Kafka可以提供高可用性和容错性,即使在节点故障的情况下也能保证数据的可靠性和可用性。 注意 Kafka集群的规模和配置取决于应用的需求和可用的资源。可以根据实际情况来决定集群中节点的数量、分区的数量以及数据的复制策略,以满足性能、可用性和扩展性的要求。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        实例问题
        创建的Kafka实例是集群模式么?
      • 配置DMS Kafka连接
        本章节主要介绍配置DMS Kafka连接。 连接DMS的Kafka队列时,相关参数详见下表:DMS Kafka连接参数 参数名 说明 取值样例 名称 连接的名称,根据连接的数据源类型,用户可自定义便于记忆、区分的连接名。 dmslink 服务类型 选择DMS Kafka版本,目前只有专享版。 专享版 Kafka Broker Kafka专享版实例的地址,格式为host:port。 Kafka SASLSSL 选择是否打开客户端连接Kafka专享版实例时SSL认证的开关。 开启Kafka SASLSSL,则数据加密传输,安全性更高,但性能会下降。 是 用户名 开启Kafka SASLSSL时显示该参数,表示连接DMS Kafka的用户名。 密码 开启Kafka SASLSSL时显示该参数,表示连接DMS Kafka的密码。
        来自:
        帮助文档
        数据治理中心 DataArts Studio
        用户指南
        数据集成
        管理连接
        配置DMS Kafka连接
      • 变更实例规格
        介绍分布式消息服务Kafka扩容相关内容。 场景描述 当需要处理大量消息时,Kafka实例的扩容是一种常见的解决方案。扩容可以增加Kafka集群的吞吐量、存储能力和高可用性。分布式消息服务Kafka提供三类扩容方案,分别为节点、规格和磁盘扩容,更好满足用户不同场景下的扩容需求。 ● 代理数量扩容:指向Kafka集群中添加更多的节点以增加系统的吞吐量和可靠性。通过扩容,可以将消息的发送和消费负载分摊到更多的节点上,从而提高系统的并发处理能力。 ● 规格扩容:指通过增加Kafka的资源配置来提升系统的处理能力和性能。 ● 磁盘扩容:指增加磁盘的存储容量,以满足更多消息的存储需求。 变更实例规格的影响 变更配置类型 影响 磁盘扩容 磁盘扩容不会影响业务。 代理数量扩容 1.代理数量扩容不会影响原来的代理,业务也不受影响(如果实例已配置分区自动重平衡,会触发重平衡,客户端会触发重连)。 2.新创建的Topic才会分布在新代理上,原有Topic还分布在原有代理上。通过修改Kafka分区平衡,实现将原有Topic分区的副本迁移到新代理上。 规格扩容缩容 1.若Topic为单副本,扩容/缩容期间会出现无法对该Topic生产消息或消费消息,会造成业务中断。 2.若Topic为多副本,代理会滚动重启,代理滚动重启造成分区Leader切换,会发生秒级连接闪断,Leader切换时长一般为1分钟以内。建议您在业务低峰期扩容/缩容,并且再生产客户端配置重试机制,方法如下: (1).生产客户端为Kafka开源客户端时,检查是否配置retries和retry.backoff.ms参数。建议参数值分别配置为:retries10,retry.backoff.ms1000。 (2).生产客户端为Flink客户端时,检查是否配置重启策略,配置重启策略可以参考如下代码。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.seconds(20)));
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        实例管理
        变更实例规格
      • 创建Topic
        参数 参数说明 Topic名称 Topic名称,英文字母、数字、下划线开头,且只能由英文字母、数字、中划线、下划线组成,长度为364个字符。创建Topic后不能修改名称。 分区数 您可以设置Topic的分区数,分区数越大消费的并发度越大。该参数设置为1时,消费消息时会按照先入先出的顺序进行消费。取值范围:1100,默认值:6 副本数 每个Topic设置副本的数量,Kafka会自动在每个副本上备份数据,当其中一个Broker节点故障时数据依然是可用的,副本数越大可靠性越高。默认值:3 分区容量 每个分区的数据量的最大值,超过这个值后前面生产的消息将会被删除,保证了数据不会无限上涨挤爆磁盘。 是否同步刷盘 同步刷盘即确保消息被写入磁盘才会被认定为生产成功,该参数可提高可靠性,但是会影响性能。 消息保留时长 当消息生存时间超过该时长后,将会被清理,可用于控制存储成本。 最小同步副本数 该参数使得消息必须写入设定值个数的副本后,才能被认定生产成功,该参数可提高可靠性,但是过大会影响性能,且必须不大于副本数。 批处理消息最大值 每个批次中最大允许的消息大小,这影响了每次请求中能包含的消息总量和大小。 消息时间戳类型 CreateTime: 这是消息被生产者发送到Kafka时的时间戳,它表示消息创建的实际时间;LogAppendTime: 这是消息被Kafka日志接收并写入到日志文件时的时间戳,它表示消息写入 Kafka 的实际时间。 描述 topic的描述字段,可用作标记和说明。 标签 标签用于从不同维度对资源分类管理。 预设ACL策略 勾选提前设置好的 ACL 策略,具体 ACL 策略设置可参考用户管理页面的ACL策略管理。 消息清除策略 delete:超过消息保留时长后,消息将被删除。compact:超过消息保留时长后,消息将会被压缩而不是直接删除。 允许unclean副本选举 开启该选项后,不在ISR的副本也将可以参与leader选举。 分片滚动时间 当一个日志段的最老消息的创建时间与当前时间的差值达到该配置的值时,Kafka 会创建一个新的日志段。 分片大小 当一个日志段的大小达到该配置的值时,Kafka 会创建一个新的日志段。该配置和分片滚动时间配置可以同时使用,具体看哪个条件先行触发。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        创建Topic
      • 连接未开启SASL的Kafka实例
        命令行模式连接实例 以下操作命令以Linux系统为例进行说明: 步骤 1 解压Kafka命令行工具。 进入文件压缩包所在目录,然后执行以下命令解压文件。 tar zxf [kafkatar] 其中, [kafkatar] 表示命令行工具的压缩包名称。 例如: tar zxf kafka2.122.7.2.tgz 步骤 2 进入Kafka命令行工具的“/bin”目录下。 注意,Windows系统下需要进入“/bin/windows”目录下。 步骤 3 执行如下命令进行生产消息。 ./kafkaconsoleproducer.sh brokerlist {连接地址} topic {Topic名称} 参数说明如下: 连接地址:从前提条件中获取的连接地址,如果是公网访问,请使用“公网连接地址”,如果是VPC内访问,请使用“内网连接地址”,请根据实际情况选择。 Topic名称:Kafka实例下创建的Topic名称。如果Kafka实例开启了自动创建Topic功能,此参数值可以填写已创建的Topic名称,也可以填写未创建的Topic名称。 本文以公网连接为例,获取的Kafka实例公网连接地址为“10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094”。执行完命令后输入内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin] ./kafkaconsoleproducer.sh brokerlist 10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094 topic topicdemo >Hello >DMS >Kafka! >^C[root@ecskafka bin] 如需停止生产使用Ctrl+C命令退出。 步骤 4 执行如下命令消费消息。 ./kafkaconsoleconsumer.sh bootstrapserver {连接地址} topic {Topic名称} group ${消费组名称} frombeginning 参数说明如下: 连接地址:从前提条件中获取的连接地址,如果是公网访问,请使用“公网连接地址”,如果是VPC内访问,请使用“内网连接地址”,请根据实际情况选择。 Topic名称:Kafka实例下创建的Topic名称。 消费组名称:根据您的业务需求,设定消费组名称。 如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败 。 消费组名称开头包含特殊字符,例如下划线“”、
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        连接Kafka实例
        连接未开启SASL的Kafka实例
      • 事件规则参数
        分布式消息服务Kafka 当事件目标选择分布式消息服务Kafka时,事件目标的type值为kafka,eventTargets中的resourceKey字段中含义如下表所示。 resourceKey 是否必传 form value template instanceId 是 CONSTANT 分布式消息服务Kafka实例Id。 说明 实例Id可在分布式消息服务Kafka管理控制台实例详情页获取。 无 topic 是 CONSTANT Kafka主题。 无 value 是 CONSTANT ORIGINAL JSONPATH TEMPLATE 消息体。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 key 是 CONSTANT EMPTY JSONPATH TEMPLATE 消息键值。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 请求示例: plaintext { "eventBusName": "test", "eventRuleName": "test", "desc": "", "filterPattern": "{}", "eventTargets": [ { "type": "kafka", "eventTargetName": "1gQfuPljs9MJsWM1eHYch", "params": [ { "resourceKey": "instanceId", "value": "356b3496b87xxxxxxxxfe7deef7fe35", "form": "CONSTANT" }, { "resourceKey": "topic", "value": "test2", "form": "CONSTANT" }, { "resourceKey": "value", "value": "", "form": "ORIGINAL" }, { "resourceKey": "key", "value": "", "form": "EMPTY" } ] } ] } 分布式消息服务RocketMQ 当事件目标选择分布式消息服务RocketMQ时,事件目标的type值为rocketmq,eventTargets中的resourceKey字段中含义如下表所示。 resourceKey 是否必传 form value template instanceId 是 CONSTANT 分布式消息服务RocketMQ实例ID。 说明 实例Id可在分布式消息服务RocketMQ管理控制台实例详情页获取。 无 topic 是 CONSTANT RocketMQ主题。 无 body 是 CONSTANT ORIGINAL JSONPATH TEMPLATE 消息体。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 keys 是 CONSTANT EMPTY JSONPATH TEMPLATE 消息索引。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 properties 是 CONSTANT EMPTY JSONPATH TEMPLATE 消息自定义属性。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 tags 是 CONSTANT EMPTY JSONPATH TEMPLATE 消息标签。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 请求示例: plaintext { "eventBusName": "test", "eventRuleName": "test", "desc": "", "filterPattern": "{}", "eventTargets": [ { "type": "rocketmq", "eventTargetName": "lMVBoqi55L3Q6QaCKFWYA", "params": [ { "resourceKey": "instanceId", "value": "89ceb110331e4c968499744c2ccbdbcc", "form": "CONSTANT" }, { "resourceKey": "topic", "value": "TopicA", "form": "CONSTANT" }, { "resourceKey": "body", "value": "", "form": "ORIGINAL" }, { "resourceKey": "properties", "value": "", "form": "EMPTY" }, { "resourceKey": "keys", "value": "", "form": "EMPTY" }, { "resourceKey": "tags", "value": "", "form": "EMPTY" } ] } ] }
        来自:
        帮助文档
        事件总线
        用户指南
        事件总线
        事件规则
        事件规则参数
      • 使用Kafka触发器
        本章介绍函数工作流如何使用Kafka触发器。 使用Kafka触发器后,FunctionGraph会定期轮询Kafka实例指定Topic下的新消息,FunctionGraph将轮询得到的消息作为参数传递来调用函数。 前提条件 进行操作之前,需要做好以下准备。 已经创建函数。 创建Kafka触发器,必须开启函数工作流VPC访问,请参见配置网络。 已经创建Kafka实例。 在Kafka实例下创建主题。 创建Kafka触发器 1、登录函数工作流控制台,在左侧的导航栏选择“函数 > 函数列表”。 2、选择待配置的函数,单击进入函数详情页。 3、选择“设置 > 触发器”,单击“创建触发器”,弹出“创建触发器”对话框。 4、设置以下信息。 触发器类型:选择“分布式消息服务(Kafka)”。 实例:选择已创建专享版Kafka实例。 主题:选择专享版Kafka实例的Topic。 批处理大小:每次从Topic消费的消息数量。 用户名:Kafka实例开启SSL时需要填写。连接Kafka专享版实例的用户名。 密码:Kafka实例开启SSL时需要填写。连接Kafka专享版实例的密码。 5、单击“确定”,完成kafka触发器的创建。 说明 开启函数流VPC访问后,需要在Kafka服务安全组配置对应子网的权限。如何开启VPC访问请参见 Kafka触发器当前支持选择多个Topic主题,从而避免Topic过多导致创建的触发器数量被限制。 配置Kafka事件触发函数。 1、返回函数工作流控制台,在左侧的导航栏选择“函数 > 函数列表”。 2、选择待配置的函数,单击进入函数详情页。 3、在函数详情页,选择函数版本。 4、在“代码”页签下,单击“测试”,弹出“配置测试事件”对话框。 5、填写如下表示测试信息后,单击“保存”。 测试信息 参数 说明 配置测试事件 可创建新的测试事件也可编辑已有的测试事件。选择默认值:“创建新的测试事件”。 事件模板 选择"kafkaeventtemplate"模板,使用系统内置Kafka事件模板。 事件名称 事件名称必须以大写或小写字母开头,支持字母(大写或小写),数字和下划线“”(或中划线“”),并以字母或数字结尾,长度为125个字符,例如kafka123test。 测试事件 自动加载系统内置kafka事件模板,本例不做修改。 6、单击“测试”,可以得到函数运行结果,函数会返回输入kafka消息数据。
        来自:
        帮助文档
        函数工作流
        用户指南
        配置触发器
        使用Kafka触发器
      • 消息收发
        本文为您分布式消息服务MQTT消息收发内容。 消息数据存储 终端消息数据按父topic存储至kafka队列,需先在控制台创建父topic;对未创建父topic的消息可正常收发,但不会存储至kafka队列。 Kafka存储内容格式: { "clientId": 设备clientId, "topic": 主题, "payload": 消息内容, "ts": 发送的时间戳 } 会话机制 终端 clean sessiontrue,断线后会话信息清除,再次上线后之前所有的订阅关系以及离线消息丢失。 clean sessionfalse断开连接的情况下,MQTT Broker也会为断连客户端保存一个会话,默认2小时,超期未重连订购关系清除;对于clean sessionfalse的客户端断线重连后可接收Qos>0的离线消息。对于客户端因网络等各种原因断线,需要加上重连和订购关系重新订购机制。 离线消息 对于clean sessionfalse的客户端,在未超出会话失效期,断线重连后可接收Qos>0的离线消息。 系统主题 系统主题 说明 mq2mqtt 用于云端服务向终端发送消息。发往该主题消息会转发至MQTT Broker实现云端与移动端互通 mqttdeviceconnect 设备上线主题,内容 {"clientid":客户端ID,"ts":上线时间戳 } mqttdevicedisconnect 设备下线主题,内容 {"clientid":客户端ID,"ts":下线时间戳 } SDK支持 分布式消息服务MQTT支持标准的MQTT协议,理论上适配所有的MQTT客户端SDK。 推荐对应的第三方 SDK 如下表: 语言/平台 推荐的第三方SDK Java Eclipse Paho SDK iOS MQTTClientFramework Android Eclipse Paho SDK JavaScript Eclipse Paho JavaScript Python Eclipse Paho Python SDK C Eclipse Paho C SDK C
        来自:
        帮助文档
        分布式消息服务MQTT
        快速入门
        消息收发
      • 重启实例
        本文主要介绍重启实例。 操作场景 分布式消息服务Kafka管理控制台支持重启运行中的Kafka实例,且可实现批量重启Kafka实例。 说明 在Kafka实例重启过程中,客户端的生产与消费消息等请求会被拒绝。 前提条件 只有当Kafka实例处于“运行中”或“故障”状态,才能执行重启操作。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 通过以下任意一种方法,重启Kafka实例。 勾选Kafka实例名称左侧的方框,可选一个或多个,单击信息栏左上侧的“重启”。 在待重启Kafka实例所在行,单击“重启”。 单击Kafka实例名称,进入实例详情页面。单击右上角的“重启”。 步骤 5 在“重启实例”对话框中,单击“是”,完成重启Kafka实例。 重启Kafka实例大约需要3到15分钟。Kafka实例重启成功后,实例状态切换为“运行中”。 说明 重启Kafka实例只会重启实例进程,不会重启实例所在虚拟机。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        实例管理
        重启实例
      • Java客户端接入示例
        生产消息 import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.Future; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.SslConfigs; public class KafkaProducerDemo { public static void main(String args[]) { //加载kafka.properties Properties kafkaProperties JavaKafkaConfigurer.getKafkaProperties(); Properties props new Properties(); //设置接入点,请通过控制台获取对应Topic的接入点 props.put(ProducerConfig.BOOTSTRAPSERVERSCONFIG, kafkaProperties.getProperty("bootstrap.servers")); //设置SSL根证书的路径,请记得将XXX修改为自己的路径 props.put(SslConfigs.SSLTRUSTSTORELOCATIONCONFIG, kafkaProperties.getProperty("ssl.truststore.location")); //根证书store的密码,保持不变 props.put(SslConfigs.SSLTRUSTSTOREPASSWORDCONFIG, "c24f5210"); //接入协议,目前支持使用SSL协议接入 props.put(CommonClientConfigs.SECURITYPROTOCOLCONFIG, "SSL"); //Kafka消息的序列化方式 props.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //请求的最长等待时间 props.put(ProducerConfig.MAXBLOCKMSCONFIG, 30 1000); //设置客户端内部重试次数 props.put(ProducerConfig.RETRIESCONFIG, 5); //设置客户端内部重试间隔 props.put(ProducerConfig.RECONNECTBACKOFFMSCONFIG, 3000); //hostname校验改成空 props.put(SslConfigs.SSLENDPOINTIDENTIFICATIONALGORITHMCONFIG, ""); //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可; //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个 KafkaProducer producer new KafkaProducer (props); //构造一个Kafka消息 String topic kafkaProperties.getProperty("topic"); //消息所属的Topic,请在控制台申请之后,填写在这里 String value "this is the message's value"; //消息的内容 try { //批量获取 futures 可以加快速度, 但注意,批量不要太大 List > futures new ArrayList >(128); for (int i 0; i kafkaMessage new ProducerRecord (topic, value + ": " + i); Future metadataFuture producer.send(kafkaMessage); futures.add(metadataFuture); } producer.flush(); for (Future future: futures) { //同步获得Future对象的结果 try { RecordMetadata recordMetadata future.get(); System.out.println("Produce ok:" + recordMetadata.toString()); } catch (Throwable t) { t.printStackTrace(); } } } catch (Exception e) { System.out.println("error occurred"); e.printStackTrace(); } } }
        来自:
        帮助文档
        分布式消息服务Kafka
        开发指南
        Java
        Java客户端接入示例
      • 为什么Group不存在但能消费消息?
        本节介绍Group不存在但能消费消息原因 我在分布式消息服务Kafka控制台上,未查看到对应的Group,但此Group下却有消费线程在消费消息。 可能原因 如果客户端使用assign方式消费消息,那么即使不创建Group,也可能消费消息。 如果客户端使用subscribe方式消费消息,删除Group后,消费线程未停止或者未发生Rebalance,那么消费线程还可以继续正常消费。 解决方案 如果客户端使用assign方式消费消息,请提前在分布式消息服务Kafka控制台创建Group。 请尽量复用Group,避免创建过多的Group而影响集群的稳定性。 在删除Group前,请确保已停止该Group下的所有消费线程。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        操作类
        为什么Group不存在但能消费消息?
      • 工作流调度简介
        触发器名称 调用方式 详细介绍 定时触发器 同步调用 按照指定的时间间隔触发工作流执行。 HTTP触发器 同步调用 当通过HTTP请求触发工作流时,工作流会根据请求参数执行。 Kafka触发器 同步/异步调用 当消息队列中出现新消息时,触发工作流执行。 RocketMQ触发器 同步/异步调用 当消息队列中出现新消息时,触发工作流执行。 云原生网关触发器 同步调用 当通过云原生网关触发工作流时,工作流会根据请求参数执行。
        来自:
        帮助文档
        函数计算
        用户指南
        云工作流
        控制台操作
        工作流调度
        工作流调度简介
      • Logstash对接Kafka
        介绍Logstash对接Kafka具体内容。 应用场景 通过Logstash对接Kafka,可以实现以下功能: 1. 数据收集:Logstash可以从Kafka主题中消费数据,将数据从Kafka集群中获取到Logstash中进行处理和转发。这样可以方便地将分布式系统、应用程序、传感器数据等各种数据源的数据集中收集起来。 2. 数据处理和转换:Logstash提供了丰富的过滤器插件,可以对从Kafka中消费的数据进行各种处理和转换操作。例如,可以进行数据清洗、解析、分割、合并、字段映射等操作,以满足不同数据源和目标的数据格式要求。 3. 数据传输和转发:Logstash可以将处理后的数据发送到不同的目标位置,如Elasticsearch、MySQL、文件系统、消息队列等。通过配置适当的输出插件,可以将数据传输到目标系统,以便后续的数据分析、存储、可视化等操作。 4. 实时数据处理:Logstash与Kafka结合使用,可以实现实时的数据处理和传输。Kafka作为高吞吐量的消息队列,可以确保数据的高效传输和缓冲。而Logstash作为数据处理引擎,可以对从Kafka中消费的数据进行实时处理,满足实时数据分析和监控的需求。 5. 分布式部署和负载均衡:Logstash支持分布式部署,可以通过配置多个Logstash节点来实现高可用性和负载均衡。多个Logstash节点可以同时从Kafka主题中消费数据,并进行并行处理和转发,以提高整体系统的性能和吞吐量。 总之,通过Logstash对接Kafka,可以实现灵活、可扩展和高效的数据处理和传输。Logstash提供了丰富的插件和配置选项,可以根据实际需求进行定制化的数据处理流程。同时,Logstash还具有良好的可扩展性和可靠性,适用于各种规模和类型的数据处理场景。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        Logstash对接Kafka
      • 自建Apache Kafka事件源
        事件示例 plaintext { "id": "601f1f756f8c4d97941e5c04f7add896", "specversion": "1.0", "source": "apache:kafka", "type": "kafka:Topic:SendMessage", "subject": "apache:kafka:bb9fdb42056xxxxxx0242ac110002:dab4124510dd4xxxxxxxxx5c6a6db69:topic:streamsource", "datacontenttype": "application/json", "time": "20251030T07:16:07.692257848Z", "data": { "topic": "streamsource", "partition": 0, "offset": 36, "value": "msg17" }, "ctyunaccountid": "dab4124510dd4xxxxxxxxx5c6a6db69", "ctyunregion": "bb9fdb42056xxxxxx0242ac110002" } data字段包含的参数解释如下表所示: 参数 类型 示例值 描述 topic String source1 Topic名称。 offset Integer 1 消费位点。 key String test 消息Key值。 value Object Hello,Kafka! 消息体,默认以JSON格式编码。
        来自:
        帮助文档
        事件总线
        用户指南
        事件流
        事件源
        自建Apache Kafka事件源
      • 删除消息
        本文主要介绍如何在控制台删除消息。 操作场景 说明 消息删除后无法恢复,请谨慎操作。 前提条件 删除消息前,请先在客户端中设置“auto.offset.reset”参数。“auto.offset.reset”用来指定当Kafka中没有初始偏移量或者当前偏移量不存在(例如当前偏移量已被删除)时,消费者的消费策略。取值如下: latest:偏移量自动被重置到最晚偏移量。 earliest:偏移量自动被重置到最早偏移量。 none:向消费者抛出异常。 操作步骤 1. 登录管理控制台。 2. 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 3. 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 4. 单击Kafka实例名称,进入实例详情页面。 5. 在左侧导航栏选择“Topic管理”,进入Topic列表页面。 6. 在待删除消息的Topic所在行,单击“消息删除”,弹出“消息删除”对话框。 7. 设置消息删除参数,如下表所示。 图消息删除 表 消息删除参数说明 参数 说明 分区 选择消息所在的分区编号。 偏移量 输入偏移量,此偏移量之前的数据将被删除。 说明 如果“偏移量”设置为“1”,表示删除分区中所有的消息。 如果您输入的偏移量不在指定分区的最早偏移量和最晚偏移量之间,消息将不会被删除。 如果需要删除多个分区的消息,单击“添加分区”,设置需要删除消息的分区和偏移量。每次最多可选择10个分区。 8. 单击“确定”,弹出“清理结果”对话框,单击“确定”,完成消息的删除。 图 清理结果
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        消息管理
        删除消息
      • 使用限制
        本文主要介绍应用性能监控的各语言使用限制。 自研JAVA探针使用限制 类型 名称 版本 工具 JDK jdk8、jdk11、jdk17 通讯协议 httpclient apache httpclient2.0+、apache asynchttpclient1.9+ Java框架 spring 3.1.x~5.0.x Java框架 springboot 1.2.x~1.5.x、2.0.4~2.0.9 Java框架 Dubbo 2.7+ Java框架 gRPC 1.6+ 数据库 MySQL mysqlconnectorjava 5.1.X 数据库 Oracle ojdbc5、ojdbc6、ojdbc14 数据库 druid 1.0 数据库 c3p0 0.9.2+ 数据库 dbcp2 2.0+ 数据库 JDBC java8+ web服务器 Tomcat 7.0.x, 8.5.x, 9.0.x, 10.0.x 消息队列 RabbitMQ springrabbit1.0+、amqpclient 2.7+ 消息队列 Kafka kafkaproducer 0.11+、kakfaconsumer 0.11+、kafkastream 0.11+ NoSQL Redis jedis 1.4+,lettuce 4.0+ NoSQL Mongodb 3.1+ NoSQL ElasticSearch 5.0+ Rest Client Common HTTP java http client java11+、HttpURLConnection java8+ Go使用限制 Opentelemetry支持的框架列表 OpenTelemetry提供了若干半自动埋点插件,支持为常见的框架自动创建Span。
        来自:
        帮助文档
        应用性能监控 APM
        产品介绍
        使用限制
      • 如何判断和处理消息堆积?
        本节介绍Kafka 判断和处理消息堆积 判断消息堆积是否属于正常情况 登录“分布式消息服务Kafka”控制台,在“消费组管理”页面,找到目标消费组,进入“消息堆积”页面。 (1)堆积量保持在一个稳定的数值之间波动,没有持续扩大。说明客户端一直在拉取最新消息,没有消息堆积,属于正常情况。 (2)堆积量逐步扩大,并且当前位点一直不变。客户端的消费线程因为某些原因卡住,没有继续消费,也没有继续向服务端提交位点,属于异常情况,即消息的确堆积了。 (3)堆积量逐步扩大,同时当前位点在前进。说明客户端还在消费中,但是消息的消费速度慢于消息的发送速度。消息堆积大多是消费速度过慢或者消费线程阻塞造成的,建议不要在消费逻辑中有太多耗时的操作。 消息堆积的处理方式 经过上述判断,确认消息的确存在堆积情况时,建议打印消息的消费耗时,或者根据堆栈信息查看线程执行情况,适当调整以加快消息的消费速度,避免出现消息堆积。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        操作类
        如何判断和处理消息堆积?
      • 查看示例代码
        本文主要介绍查看示例代码。 本章节指导您在控制台查看Java、Go和Python生产消费消息的示例代码。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例的名称,进入实例详情页面。 步骤 5 选择“Topic管理”页签,显示已创建的Topic详情。 步骤 6 单击“查看示例代码”,弹出“生产消费示例代码”对话框。 查看Java、Go和Python生产消费消息的示例代码。示例代码区分是否开启SASLSSL认证,“接入类型”为“PLAINTEXT”时,表示未开启SASLSSL认证。“接入类型”为“SASLSSL”时,表示已开启SASLSSL认证。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        查看示例代码
      • 数据订阅
        参数 说明 示例 规则名称 订阅规则名称 输入:aomkafkatest。 订阅内容 支持“指标”和“告警”。 选择:指标。 订阅目标类型 选择“自定义Kafka”或“分布式消息服务DMS”。 分布式消息服务DMS 实例 选择DMS实例,如没有DMS实例,请单击“创建DMS实例”,创建DMS实例。 kafkaaom7160
        来自:
        帮助文档
        应用运维管理
        用户指南
        配置管理
        数据订阅
      • 重启Kafka Manager
        本文主要介绍 重启Kafka Manager。 操作场景 当Kafka Manager无法登录或者无法使用时,例如下图中的报错,可以通过重启Kafka Manager,使Kafka Manager恢复正常。 图 报错信息 说明 重启Kafka Manager不会影响业务。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 通过以下任意一种方法,重启Kafka Manager。 在待重启Manager的Kafka实例所在行,单击“更多 > 重启Manager”。 单击Kafka实例名称,进入实例详情页面。单击右上角的“更多 > 重启Manager”。 步骤 5 单击“确定”。 您可以在实例的“后台任务管理”页面,查看当前任务的操作进度。任务状态为“成功”,表示重启成功。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        实例管理
        重启Kafka Manager
      • 必须配置的监控告警
        本文主要介绍必须配置的监控告警。 本章节主要介绍部分监控指标的告警策略,以及配置操作。在实际业务中,建议按照以下告警策略,配置监控指标的告警规则。 表 Kafka实例配置告警的指标 指标ID 指标名称 告警策略 指标说明 告警处理建议 brokerdiskusage 磁盘容量使用率 告警阈值:原始值>80%连续触发次数:1告警级别:紧急 该指标为从Kafka节点虚拟机层面采集的磁盘容量使用率。 出现该告警时,需要修改实例存储空间 。具体操作,请参考变更实例规格。 brokercpucoreload CPU核均负载 告警阈值:原始值>2连续触发次数:3告警级别:重要 该指标为从Kafka节点虚拟机层面采集的CPU每个核的平均负载。 出现该告警时,先检查该监控是否长期处于接近或超过告警阈值状态,如果是,需要修改实例基准带宽/代理个数,即扩节点。具体操作,请参考。 brokermemoryusage 内存使用率 告警阈值:原始值>90%连续触发次数:3告警级别:紧急 该指标为Kafka节点虚拟机层面采集的内存使用率。 出现该告警时,需要修改实例基准带宽/代理个数,即扩节点。具体操作,请参考。 currentpartitions 分区数 告警阈值:原始值>分区数上限的90%,不同实例规格分区数上限不同,具体参考产品规格。连续触发次数:1告警级别:重要 该指标用于统计Kafka实例中已经使用的分区数量。 出现该告警时,如果业务后续还需要新增Topic,则需要修改实例基准带宽/代理个数或将业务拆分至多个实例。修改实例基准带宽/代理个数的具体操作,请参考。 brokercpuusage CPU使用率 告警阈值:原始值>90%连续触发次数:3告警级别:重要 统计Kafka节点虚拟机的CPU使用率。 出现该告警时,先检查该监控是否长期处于接近或超过告警阈值状态,如果是,需要修改实例基准带宽/代理个数,即扩节点。具体操作,请参考。 groupmsgs 堆积消息数 告警阈值:原始值>积压上限的90%,积压上限由您根据业务实际情况设定连续触发次数:1告警级别:重要 该指标用于统计Kafka实例中所有消费组中总堆积消息数。 出现该告警时,首先排查是否有闲置消费组,如果有,则删除。其次,可以考虑加快消费速度,例如增加组内消费者数量等。 topicmessagesremained 队列可消费消息数 告警阈值:原始值>积压上限的90%,积压上限由您根据业务实际情况设定连续触发次数:1告警级别:重要 该指标用于统计消费组指定队列可以消费的消息个数。 出现该告警时,首先排查消费者代码逻辑是否有误,例如消费者出现了异常不再消费等。其次,可以考虑加快消息的消费,例如增加队列消费者,并确保分区数大于或等于消费者数。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        监控
        必须配置的监控告警
      • SDK概述
        本文介绍了分布式消息服务Kafka版提供的SDK语言版本。 SDK列表 下表提供了分布式消息服务Kafka版支持的SDK列表。 编程语言 参考文档 :: Java 开发指南Java Python 开发指南Python Go 开发指南Go
        来自:
        帮助文档
        分布式消息服务Kafka
        SDK参考
        SDK概述
      • Kafka Connect接入Kafka
        本节主要介绍Kafka Connect接入Kafka。 最佳实践概述 场景描述 Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理。而导出工作则是将数据从Kafka Topic中导出到其它数据存储系统、查询系统或者离线分析系统等。 如果您想要在流数据平台场景下,使用Kafka Connect快速实现数据从Kafka和其它系统之间进行导入导出,可以参考本实践方案。 技术架构图 暂无。 方案优势 用户可按业务需要,参考Kafka Connect数据导入导出,实现流数据的消息消费。 前提条件 需已购买Kafka实例、创建Topic,并且已成功消费消息。 资源规划 本实践方案内容仅涉及Kafka专享版实例和Flink的安装配置。 分布式消息服务 Figure 1 分布式消息服务 资源类型 配置项 配置明细 说明 :::: 企业中间件 DMS Kafka专享实例 需已购买kafka专享实例,创建好Topic,并成功消费消息。 Kafka Connect流数据平台 Figure 2 Kafka Connect流数据平台 资源类型 配置项 配置明细 说明 :::: 应用软件 Flink框架 Flink 1.14 使用开源Apache Flink 方案正文 Kafka Connect 目前支持两种执行模式:standalone 和 distributed。 standalone模式 通过以下命令以 standalone 模式启动 connect: 接入Kafka专享版与接入开源 Apache Kafka 没有区别,仅需要修改 bootstrap.servers 为申请实例时分配的 IP。 distributed模式 通过以下命令以 distributed 模式启动 connect: 该模式下,Kafka Connect 会将 offsets、configs 和 task status 信息存储在 Kafka Topic 中,存储的Topic 在 connectdistributed 中的以下字段配置: 这三个 Topic 需要手动创建,才能保证创建的属性符合 connect 的要求。 config.storage.topic 需要保证只有一个 partition,多副本且为 compact 模式。 offset.storage.topic 需要有多个 partition,多副本且为 compact 模式。 status.storage.topic 需要有多个 partition,多副本且为 compact 模式。 配置 bootstrap.servers 为申请实例是分配的 IP。 配置 group.id,用于标识 connect 集群,需要与消费者分组区分开来。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        Kafka Connect接入Kafka
      • 应用场景
        本节主要介绍分布式消息服务Kafka常用的应用场景 分布式消息服务Kafka主要适用于以下几种场景: 日志收集 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦。 支持实时在线分析系统和类似于 Hadoop 的离线分析系统。 Kafka本身的性能是非常高效的,同时Kafka的特性决定它非常适合作为"日志收集中心",这是因为Kafka在采集日志的时候业务是无感知的,其能够兼容自己的上游,能够直接地通过配置加密消息。当日志数据发送到Kafka集群里面,其实对于业务而言是完全无侵入的。同时其在下游又能够直接地对接Hadoop/ODPS等离线仓库存储和Strom/Spark等实现实时在线分析。在这样的情况之下,使用Kafka,只需要用户去关注整个流程里面的业务逻辑,而无需做更多的开发就能够实现统计、分析以及报表。 流计算处理 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦。 通过支持流计算引擎,可对接开源 Storm/Samza/Spark 流计算引擎。 Kafka能够做到流计算处理,比如股市走向分析、气象数据测控、网站用户行为分析等领域,由于在这些领域中数据产生快、实时性强、数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需求。而Kafka Stream以及Storm/Samza/Spark等流计算引擎的出现,可以根据业务需求对数据进行计算分析,最终把结果保存或者分发给需要的组件。
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        应用场景
      • Kafka磁盘选择超高IO还是高IO?
        本节介绍分布式消息服务Kafka磁盘类型 选择Kafka磁盘类型主要取决于应用的需求和预算。通常,对于Kafka来说,高IO磁盘是更常见的选择,因为它提供了更好的性能和吞吐量。 高IO磁盘具有更快的随机读写速度和更低的访问延迟。这对于Kafka来说非常重要,因为Kafka的性能和吞吐量受限于磁盘的读写速度。高IO磁盘适用于需要处理大量读写操作的场景,例如高频率的消息传递和数据写入。它可以提供更快的消息处理和更低的延迟,从而提高系统的响应性能。 超高IO磁盘也是一个可选的方案,它提供了更高的随机读写速度和更低的延迟,相对于传统的高IO磁盘来说性能更好。超高IO磁盘适用于对性能要求非常高的场景,例如大规模的实时数据处理和高吞吐量的消息队列。它可以提供更高的IOPS(每秒输入/输出操作数)和更低的延迟,从而支持更快的数据处理和更高的系统吞吐量。 综上所述,对于Kafka来说,一般情况下选择高IO磁盘是比较常见的选择,因为它提供了良好的性能和吞吐量。但如果预算允许,超高IO磁盘可以提供更高的性能和吞吐量,适用于对性能要求非常高的场景。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        计费与购买类
        Kafka磁盘选择超高IO还是高IO?
      • 步骤五:配置必须的监控告警
        本文主要介绍 步骤五:配置必须的监控告警。 本章节主要介绍部分监控指标的告警策略,以及配置操作。在实际业务中,建议按照以下告警策略,配置监控指标的告警规则。 是否接近性能上限 ,表示当前资源支撑的性能为告警策略中设置的告警阈值,如果继续上升,业务可能出现问题。 表 Kafka实例配置告警的指标 指标ID 指标名称 告警策略 指标说明 告警处理建议 brokerdiskusage 磁盘容量使用率 告警阈值:原始值>80%连续触发次数:1 告警级别:紧急 该指标为从Kafka节点虚拟机层面采集的磁盘容量使用率。 出现该告警时,需要修改实例存储空间 。具体操作,请参考变更实例规格。 brokercpucoreload CPU核均负载 告警阈值:原始值>2连续触发次数:3 告警级别:重要 该指标为从Kafka节点虚拟机层面采集的CPU每个核的平均负载。 出现该告警时,先检查该监控是否长期处于接近或超过告警阈值状态,如果是,需要修改实例基准带宽/代理个数 ,即扩节点。具体操作,请参考。 brokermemoryusage 内存使用率 告警阈值:原始值>90%连续触发次数:3 告警级别:紧急 该指标为Kafka节点虚拟机层面采集的内存使用率。 出现该告警时,需要修改实例基准带宽/代理个数 ,即扩节点。具体操作,请参考。 currentpartitions 分区数 告警阈值:原始值>分区数上限的90%,不同实例规格分区数上限不同,具体参考产品规格。 连续触发次数:1 告警级别:重要 该指标用于统计Kafka实例中已经使用的分区数量。 出现该告警时,如果业务后续还需要新增Topic,则需要修改实例基准带宽/代理个数或将业务拆分至多个实例。修改实例基准带宽/代理个数的具体操作,请参考。 brokercpuusage CPU使用率 告警阈值:原始值>90%连续触发次数:3 告警级别:重要 统计Kafka节点虚拟机的CPU使用率。 出现该告警时,先检查该监控是否长期处于接近或超过告警阈值状态,如果是,需要修改实例基准带宽/代理个数 ,即扩节点。具体操作,请参考。 groupmsgs 堆积消息数 告警阈值:原始值>积压上限的90%,积压上限由您根据业务实际情况设定连续触发次数:1 告警级别:重要 该指标用于统计Kafka实例中所有消费组中总堆积消息数。 出现该告警时,首先排查是否有闲置消费组,如果有,则删除。其次,可以考虑加快消费速度,例如增加组内消费者数量等。 topicmessagesremained 队列可消费消息数 告警阈值:原始值>积压上限的90%,积压上限由您根据业务实际情况设定连续触发次数:1 告警级别:重要 该指标用于统计消费组指定队列可以消费的消息个数。 出现该告警时,首先排查消费者代码逻辑是否有误,例如消费者出现了异常不再消费等。其次,可以考虑加快消息的消费,例如增加队列消费者,并确保分区数大于或等于消费者数。
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        步骤五:配置必须的监控告警
      • Go
        消费消息 发送以下命令消费消息。 go run modvendor consumer/consumer.go 消费消息示例代码如下: package main import ( "encoding/json" "fmt" "github.com/confluentinc/confluentkafkago/kafka" "os" "path/filepath" ) type KafkaConfig struct { Topic string json:"topic" Topic2 string json:"topic2" GroupId string json:"group.id" BootstrapServers string json:"bootstrap.servers" SecurityProtocol string json:"security.protocol" } // config should be a pointer to structure, if not, panic func loadJsonConfig() KafkaConfig { workPath, err : os.Getwd() if err ! nil { panic(err) } configPath : filepath.Join(workPath, "conf") fullPath : filepath.Join(configPath, "kafka.json") file, err : os.Open(fullPath); if (err ! nil) { msg : fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err) panic(msg) } defer file.Close() decoder : json.NewDecoder(file) var config &KafkaConfig{} err decoder.Decode(config); if (err ! nil) { msg : fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err) panic(msg) } json.Marshal(config) return config } func doInitConsumer(cfg KafkaConfig) kafka.Consumer { fmt.Print("init kafka consumer, it may take a few seconds to init the connectionn") //common arguments var kafkaconf &kafka.ConfigMap{ "api.version.request": "true", "auto.offset.reset": "latest", "heartbeat.interval.ms": 3000, "session.timeout.ms": 30000, "max.poll.interval.ms": 120000, "fetch.max.bytes": 1024000, "max.partition.fetch.bytes": 256000} kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers); kafkaconf.SetKey("group.id", cfg.GroupId) switch cfg.SecurityProtocol { case "PLAINTEXT" : kafkaconf.SetKey("security.protocol", "plaintext"); case "SSL": kafkaconf.SetKey("security.protocol", "ssl"); kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem") case "SASLSSL": kafkaconf.SetKey("security.protocol", "saslssl"); kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem"); kafkaconf.SetKey("sasl.username", cfg.SaslUsername); kafkaconf.SetKey("sasl.password", cfg.SaslPassword); kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism) case "SASLPLAINTEXT": kafkaconf.SetKey("security.protocol", "saslplaintext"); kafkaconf.SetKey("sasl.username", cfg.SaslUsername); kafkaconf.SetKey("sasl.password", cfg.SaslPassword); kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism) default: panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true)) } consumer, err : kafka.NewConsumer(kafkaconf) if err ! nil { panic(err) } fmt.Print("init kafka consumer successn") return consumer; } func main() { // Choose the correct protocol cfg : loadJsonConfig(); consumer : doInitConsumer(cfg) consumer.SubscribeTopics([]string{cfg.Topic, cfg.Topic2}, nil) for { msg, err : consumer.ReadMessage(1) if err nil { fmt.Printf("Message on %s: %sn", msg.TopicPartition, string(msg.Value)) } else { // The client will //automatically try to recover from all errors. fmt.Printf("Consumer error: %v (%v)n", err, msg) } } consumer.Close() }
        来自:
        帮助文档
        分布式消息服务Kafka
        开发指南
        Go
      • 查询消息
        本文主要介绍查询消息。 操作场景 您可以查看指定Topic不同分区的偏移量、消息大小、创建时间以及消息正文。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例名称,进入实例详情页面。 步骤 5 选择“消息查询”页签,在消息页签页面,设置查询的Topic名称、分区以及查询方式。 如果未设置具体分区,查询结果显示Topic所有分区的消息。 查询方式支持以下两种方式: 按创建时间查询:即按生产该消息的时间。 按偏移量查询:即记录消息的位置。 说明 当Topic中的数据量比较大时,单副本Topic查询消息可能会报“内部服务错误”,建议根据数据量适当减小查询时间范围。 步骤 6 单击“搜索”,查询消息。 查询结果如下: 图 查询Topic消息 消息的参数说明如下: Topic名称:消息所在的Topic名称。 分区:消息所在的分区。 偏移量:消息在分区中的位置。 消息大小:消息存入磁盘的大小,单位为B。 创建时间:消息的创建时间。创建时间由生产客户端在生产消息时通过CreateTime指定的,如果生产消息时没有设置此参数,创建时间会默认为1970。 步骤 7 单击“查看消息正文”,弹出“查看消息正文”对话框,查看消息的内容,包括Topic名称、分区、偏移量、创建时间和消息正文。 步骤 8 (可选)如果需要恢复默认设置,单击“重置”。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        消息管理
        查询消息
      • 删除实例
        本文主要介绍删除实例。 操作场景 分布式消息服务Kafka管理控制台支持删除Kafka实例,且可实现批量删除Kafka实例、一键式删除创建失败的Kafka实例、单个删除创建失败的实例记录。 说明 Kafka实例删除后,实例中原有的数据将被删除,且没有备份,请谨慎操作。 前提条件 Kafka实例状态为运行中、故障、已冻结的按需付费实例才能执行删除操作。 包年/包月类型的Kafka实例,不支持进行删除和批量删除操作。若不再使用,可单击“操作”栏下的“更多 > 退订”进行退订。 删除Kafka实例 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 通过以下任意一种方法,删除Kafka实例。 勾选Kafka实例名称左侧的方框,可选一个或多个,单击信息栏左上侧的“更多 > 删除”。 在待删除Kafka实例所在行,单击“更多 > 删除”。 单击Kafka实例名称,进入实例详情页面。单击右上角的“更多 > 删除”。 说明 Kafka实例状态为创建中、启动中、变更中、变更失败、重启中时不允许执行删除操作。 步骤 5 在“删除实例”对话框中,单击“是”,完成删除Kafka实例。 删除Kafka实例大约需要1到60秒。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        实例管理
        删除实例
      • 创建SASL_SSL用户
        本文主要介绍 创建SASLSSL用户。 分布式消息服务Kafka支持对Topic进行ACL(Access Control List)权限管理,您可以在Topic中为不同的SASLSSL用户设置不同的权限,以达到SASLSSL用户之间的权限隔离。 本章节主要介绍Kafka实例开启SASLSSL后,如何创建SASLSSL用户。创建用户后,对Topic进行SASLSSL用户授权的操作,请参见设置Topic权限。 一个Kafka实例最多创建20个用户。 前提条件 创建Kafka实例时,已开启SASLSSL功能。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例名称,进入实例详情页面。 步骤 5 在“用户管理”页签,单击“创建用户”。 步骤 6 在弹出的“创建用户”对话框中,设置用户名和密码,单击“确定”,完成用户的创建。 SASLSSL用户创建成功后,参考设置Topic权限为SASLSSL用户授权。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        SASL_SSL用户管理
        创建SASL_SSL用户
      • 管理Smart Connect任务
        启动/暂停Smart Connect任务 暂停任务后,Kafka实例的数据将不会再同步到另一个Kafka实例或者其他云服务中。 1、登录管理控制台。 2、在管理控制台左上角单击,选择Kafka实例所在的区域。 3、在管理控制台左上角单击,选择“应用服务 > 分布式消息服务 Kafka”,进入分布式消息服务Kafka专享版页面。 4、单击Kafka实例名称,进入实例详情页面。 5、在左侧导航栏单击“Smart Connect”,进入Smart Connect任务列表页面。 6、执行以下操作,启动/暂停Smart Connect任务。 启动:在待启动的Smart Connect任务所在行,单击“启动”。 暂停:在待暂停的Smart Connect任务所在行,单击“暂停”,弹出“暂停任务”对话框,单击“确定”。 结束 重启Smart Connect任务 1、登录管理控制台。 2、在管理控制台左上角单击,选择Kafka实例所在的区域。 3、在管理控制台左上角单击,选择“应用服务 > 分布式消息服务 Kafka”,进入分布式消息服务Kafka专享版页面。 4、单击Kafka实例名称,进入实例详情页面。 5、在左侧导航栏单击“Smart Connect”,进入Smart Connect任务列表页面。 6、在待重启的Smart Connect任务所在行,单击“重启”,弹出“重启任务”对话框。 重启Smart Connect任务前,请注意以下两点: Smart Connect任务创建后,修改了源端或者目标端的参数,可能会导致重启失败。 重启Smart Connect任务会重置同步进度,并重新开始同步任务。 7、单击“确定”,完成Smart Connect任务的重启。 当页面左上方显示“成功重启任务xxx”时,表示成功重启Smart Connect任务。 结束
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Kafka数据迁移
        使用Smart Connect迁移Kafka数据
        管理Smart Connect任务
      • 设置Topic权限
        (可选)删除Topic权限 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例名称,进入实例详情页面。 步骤 5 在导航栏单击“Topic管理”,进入Topic列表页面。 步骤 6 在待删除用户权限的Topic所在行,单击“设置用户权限”,弹出“设置用户权限”对话框。 步骤 7 在“已选择”区域,在待删除权限的SASLSSL用户所在行,单击“删除”,然后单击“确定”。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        设置Topic权限
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • ...
      • 244
      跳转至
      推荐热词
      天翼云运维管理审计系统天翼云云服务平台云服务备份云日志服务应用运维管理云手机云电脑天翼云云hbase数据库电信云大数据saas服务电信云大数据paas服务轻量型云主机天翼云客户服务电话应用编排服务天翼云云安全解决方案云服务总线CSB天翼云服务器配置天翼云联邦学习产品天翼云云安全天翼云企业上云解决方案天翼云产品天翼云视频云存储

      天翼云最新活动

      安全隔离版OpenClaw

      OpenClaw云服务器专属“龙虾“套餐低至1.5折起

      天翼云新春焕新季

      云主机开年特惠28.8元/年,0元秒杀等你来抢!

      云上钜惠

      爆款云主机全场特惠,2核4G只要1.8折起!

      中小企业服务商合作专区

      国家云助力中小企业腾飞,高额上云补贴重磅上线

      出海产品促销专区

      爆款云主机低至2折,高性价比,不限新老速来抢购!

      天翼云奖励推广计划

      加入成为云推官,推荐新用户注册下单得现金奖励

      产品推荐

      多活容灾服务

      轻量型云主机

      训推服务

      模型推理服务

      一站式智算服务平台

      知识库问答

      人脸检测

      人脸实名认证

      人脸比对

      推荐文档

      免费注册

      系统必备组件安装

      运维说明

      常见问题

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 天翼云国际站
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2026 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号