活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 安全隔离版OpenClaw NEW OpenClaw云服务器专属“龙虾“套餐低至1.5折起
  • 聚力AI赋能 天翼云大模型专项 大模型特惠专区·Token Plan 轻享包低至9.9元起
  • 青云志云端助力计划 NEW 一站式科研助手,海外资源安全访问平台,助力青年翼展宏图,平步青云
  • 企业出海解决方案 NEW 助力您的业务扬帆出海,通达全球!
  • 天翼云信创专区 NEW “一云多芯、一云多态”,国产化软件全面适配,国产操作系统及硬件芯片支持丰富
  • 中小企业服务商合作专区 国家云助力中小企业腾飞,高额上云补贴重磅上线
  • 云上钜惠 爆款云主机全场特惠,2核4G只要1.8折起!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

息壤智算

领先开放的智算服务平台,提供算力、平台、数据、模型、应用“五位一体”智算服务体系,构建全流程的AI基础设施能力
AI Store
  • 算力市场
  • 模型市场
  • 应用市场
公共算力服务
  • 裸金属
  • 定制裸金属
训推服务
  • 模型开发
  • 训练任务
  • 服务部署
Token服务
  • 模型广场
  • 体验中心
  • 服务接入
应用托管
  • 应用实例
科研助手
  • 科研智能体
  • 科研服务
  • 开发机
  • 并行计算
大模型
  • DeepSeek-V4-Flash
  • GLM-5.1
  • Qwen3.5-122B-A10B
  • DeepSeek-V3.2(旗舰版)
  • GLM-5(正式版)
智算一体机
  • 智算一体机
智能体引擎
  • 智能体引擎
智算安全专区
  • 大模型安全评测
  • 大模型安全护栏
模型适配专家服务
  • 模型适配专家服务
算力服务商
  • 入驻算力服务商

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场进入AI Store创新解决方案公有云生态专区智云上海应用生态专区
建站工具
  • 新域名服务
  • SSL证书
  • 翼建站
企业办公
  • 安全邮箱
  • WPS 365 天翼云版
  • 天翼企业云盘(标准服务版)
灾备迁移
  • 云管家2.0
  • 翼备份(SaaS版)

定价

协助您快速了解云产品计费模式、价格详情,轻松预估上云成本
价格计算器
  • 动态测算产品价格
定价策略
  • 快速了解计费模式

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼信创云专区
  • 信创云专区
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
培训与认证
  • 天翼云学堂
  • 天翼云认证
开源社区
  • 魔乐社区
  • OpenTeleDB

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 服务保障
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家
我要反馈
  • 建议与反馈
  • 用户体验官
信息公告
  • 客户公告

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2026智能云生态大会
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 息壤智算
  • 产品
  • 解决方案
  • 应用商城
  • 定价
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      消息队列Kafka版_相关内容
      • 日志处理工具Logstash接入Kafka
        本文主要介绍日志处理工具Logstash接入Kafka。 最佳实践概述 场景描述 Kafka与Logstash互联,可实现将Kafka作为inputs把消息传递给Logstash进行处理、或者是作为outputs将Kafka用作消息目的地。 技术架构图 暂无。 方案优势 可以异步处理数据:防止突发流量。 解耦:当发生异常的时候不会影响上游工作。 前提条件 需已购买Kafka实例、创建Topic,并且已成功消费消息。 资源规划 本实践方案内容仅涉及Kafka专享版实例和Logstash。 分布式消息服务 Figure 分布式消息服务 资源类型 配置项 配置明细 说明 :::: 企业中间件 DMS Kafka专享实例 需已购买kafka专享实例,创建好Topic,并成功消费消息。 Logstash下载与安装 下载并安装Logstash,验证运行成功。 方案正文 配置Kafka 1、登录分布式消息服务Kafka版控制台,按需要单击“实例名称”,进入实例基本信息页面。 图 Kafka控制台实例列表 2、在“连接地址”模块,获取Kafka连接IP地址。 图 连接IP地址与端口 3、创建Logstashtest的Topic。 图 创建Topic
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        日志处理工具Logstash接入Kafka
      • 连接未开启SASL的Kafka专享实例
        目前提供Kafka专享版实例的服务,Kafka专享版实例采用物理隔离的方式部署,租户独占Kafka实例。创建Kafka专享版实例之后,使用开源Kafka客户端向Kafka专享版实例生产消息和消费消息。 本章节介绍如何使用开源的Kafka客户端访问未开启SASL的Kafka专享实例方法。 多语言客户端使用请参考Kafka官网: 说明:Kafka服务器允许客户端单IP连接的个数为200个,如果超过了,会出现连接失败问题。 前提条件 已配置正确的安全组。 访问未开启SASL的Kafka专享实例时,支持VPC内访问。实例需要配置正确的安全组规则,具体安全组配置要求,请参考下表。 已获取连接Kafka专享版实例的地址。 使用VPC内访问,实例端口为9092,实例连接地址获取如下图。 获取VPC内访问Kafka专享实例的连接地址(实例未开启SASL) Kafka专享实例已创建Topic,否则请提前创建Topic。 已下载Kafka命令行工具1.1.0版本或者Kafka命令行工具2.3.0版本,确保Kafka实例版本与命令行工具版本相同。 已在Kafka命令行工具的使用环境中安装Java Development Kit 1.8.111或以上版本,并完成环境变量配置。 命令行模式连接实例 以下操作命令以Linux系统为例进行说明: 解压Kafka命令行工具。 进入文件压缩包所在目录,然后执行以下命令解压文件。 tar zxf [kafkatar] 其中,[kafkatar]表示命令行工具的压缩包名称。 例如: tar zxf kafka2.111.1.0.tgz 进入Kafka命令行工具的“/bin”目录下。 注意,Windows系统下需要进入“/bin/windows”目录下。 执行如下命令进行生产消息。 ./kafkaconsoleproducer.sh brokerlist ${连接地址} topic ${Topic名称} 参数说明如下: 连接地址:从前提条件中获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 以获取的Kafka实例连接地址为“10.3.196.45:9092,10.78.42.127:9092,10.4.49.103:9092”为例。执行完命令后输入内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin]
        来自:
        帮助文档
        专属云分布式消息服务Kafka
        用户指南
        连接未开启SASL的Kafka专享实例
      • 分布式消息服务Kafka事件目标
        参数 说明 示例 实例 选择分布式消息服务Kafka实例。 instancexxx Topic 选择分布式消息服务Kafka实例的Topic。 topic1 消息体 选择消息体(Body)的内容,更多信息请参考 完整事件 消息键值 选择消息键值(Key)的内容,更多信息请参考 空
        来自:
        帮助文档
        事件总线
        用户指南
        事件总线
        事件规则
        事件目标
        目标服务类型
        分布式消息服务Kafka事件目标
      • 使用限制
        本文主要介绍应用性能监控的各语言使用限制。 自研JAVA探针使用限制 类型 名称 版本 工具 JDK jdk8、jdk11、jdk17 通讯协议 httpclient apache httpclient2.0+、apache asynchttpclient1.9+ Java框架 spring 3.1.x~5.0.x Java框架 springboot 1.2.x~1.5.x、2.0.4~2.0.9 Java框架 Dubbo 2.7+ Java框架 gRPC 1.6+ 数据库 MySQL mysqlconnectorjava 5.1.X 数据库 Oracle ojdbc5、ojdbc6、ojdbc14 数据库 druid 1.0 数据库 c3p0 0.9.2+ 数据库 dbcp2 2.0+ 数据库 JDBC java8+ web服务器 Tomcat 7.0.x, 8.5.x, 9.0.x, 10.0.x 消息队列 RabbitMQ springrabbit1.0+、amqpclient 2.7+ 消息队列 Kafka kafkaproducer 0.11+、kafkaconsumer 0.11+、kafkastream 0.11+ NoSQL Redis jedis 1.4+,lettuce 4.0+ NoSQL Mongodb 3.1+ NoSQL ElasticSearch 5.0+ Rest Client Common HTTP java http client java11+、HttpURLConnection java8+ Go使用限制 Opentelemetry支持的框架列表 OpenTelemetry提供了若干半自动埋点插件,支持为常见的框架自动创建Span。
        来自:
        帮助文档
        应用性能监控 APM
        产品介绍
        使用限制
      • 与其他服务的关系
        专属云(计算独享型) 该服务为专属云Kafka开通前必选服务,在开通了专属云计算独享后,才能在控制台中节点选择此专属云标识的环境,再进入控制台的分布式消息服务,来操作专属云Kafka的开通。 专属云(存储独享型) 该服务为专属云Kafka开通时的可选服务,在进如专属云Kafka的开通过程中,队列实例规格必须在专属云计算独享型,而队列存储的部分支持选择公有云的云盘承载,也可选择已经购买的专属云存储独享型,可根据需求灵活选择。
        来自:
        帮助文档
        专属云分布式消息服务Kafka
        产品简介
        与其他服务的关系
      • 配置DMS Kafka连接
        本章节主要介绍配置DMS Kafka连接。 连接DMS的Kafka队列时,相关参数详见下表:DMS Kafka连接参数 参数名 说明 取值样例 名称 连接的名称,根据连接的数据源类型,用户可自定义便于记忆、区分的连接名。 dmslink 服务类型 选择DMS Kafka版本,目前只有专享版。 专享版 Kafka Broker Kafka专享版实例的地址,格式为host:port。 Kafka SASLSSL 选择是否打开客户端连接Kafka专享版实例时SSL认证的开关。 开启Kafka SASLSSL,则数据加密传输,安全性更高,但性能会下降。 是 用户名 开启Kafka SASLSSL时显示该参数,表示连接DMS Kafka的用户名。 密码 开启Kafka SASLSSL时显示该参数,表示连接DMS Kafka的密码。
        来自:
        帮助文档
        数据治理中心 DataArts Studio
        用户指南
        数据集成
        管理连接
        配置DMS Kafka连接
      • 生产消息
        介绍分布式消息服务Kafka生产拨测功能操作内容。 场景描述 通过生产消息,确保Kafka集群能够稳定接收并传递生产者发送的消息,同时消费者能够成功订阅并处理这些消息,以此确认集群的基本功能和网络连接均处于正常状态。 操作步骤 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“Topic管理”后进入主题管理页面。 (5)选择需要生产消息的Topic,点击其右侧“更多”,在下拉框中点击“生产消息”按钮。 (6)在弹窗中,输入消息内容、消息条数等信息,点击确认即可发送。
        来自:
      • 入门指引
        本章节将为您介绍分布式消息服务Kafka入门的基本流程,主要包括环境准备、购买实例、创建Topic以及编译运行Demo Java工程,帮助您快速上手Kafka。 操作流程 操作流程如下: 1. 环境准备 创建Kafka实例先要准备好虚拟私有云、子网和安全组,可选弹性公网IP。 2. 创建实例 在订购分布式消息Kafka填写和确认实例名称、计费模式等信息,确认费用后点击下一步,等待开通流程结果通知成功后完成创建实例。 3. 创建Topic 开通实例后,在控制台相关页面按照指引创建主题,用于发送消息。点击“生产消息”来测试Topic是否正常。 4. 编译运行Demo Java工程 以上工作完成后,在客户端应用编译工程进行生产消费,包括引入依赖,生产消息和消费消息。
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        入门指引
      • 流数据实时处理框架Flink接入Kafka
        本文主要介绍流数据实时处理框架Flink接入Kafka 。 最佳实践概述 场景描述 如果您想要在流数据实时处理场景下,使用Kafka作为流数据的源或者使用Kafka作为流数据的目的,需要将Kafka与Flink对接,可以参考本实践方案。 技术架构图 暂无。 方案优势 用户可按业务需要,参考设置Kafka与Flink框架对接,实现流数据的消息消费。 前提条件 需已购买Kafka实例、创建Topic,并且已成功消费消息。 资源规划 本实践方案内容仅涉及Kafka专享版实例和Flink的安装配置。 分布式消息服务 Figure 分布式消息服务 资源类型 配置项 配置明细 说明 :::: 企业中间件 DMS Kafka专享实例 需已购买kafka专享实例,创建好Topic,并成功消费消息。 Flink流数据处理框架 Figure Flink框架 资源类型 配置项 配置明细 说明 :::: 应用软件 Flink框架 Flink 1.14 使用开源Apache Flink 方案正文 获取Kafka访问地址 1、登录分布式消息服务Kafka版控制台,按需要单击“实例名称”,进入实例基本信息页面。 图 Kafka控制台实例列表 2、在“连接地址”模块,获取Kafka连接IP地址。 图 连接IP地址与端口
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        流数据实时处理框架Flink接入Kafka
      • Logstash对接Kafka
        介绍Logstash对接Kafka具体内容。 应用场景 通过Logstash对接Kafka,可以实现以下功能: 1. 数据收集:Logstash可以从Kafka主题中消费数据,将数据从Kafka集群中获取到Logstash中进行处理和转发。这样可以方便地将分布式系统、应用程序、传感器数据等各种数据源的数据集中收集起来。 2. 数据处理和转换:Logstash提供了丰富的过滤器插件,可以对从Kafka中消费的数据进行各种处理和转换操作。例如,可以进行数据清洗、解析、分割、合并、字段映射等操作,以满足不同数据源和目标的数据格式要求。 3. 数据传输和转发:Logstash可以将处理后的数据发送到不同的目标位置,如Elasticsearch、MySQL、文件系统、消息队列等。通过配置适当的输出插件,可以将数据传输到目标系统,以便后续的数据分析、存储、可视化等操作。 4. 实时数据处理:Logstash与Kafka结合使用,可以实现实时的数据处理和传输。Kafka作为高吞吐量的消息队列,可以确保数据的高效传输和缓冲。而Logstash作为数据处理引擎,可以对从Kafka中消费的数据进行实时处理,满足实时数据分析和监控的需求。 5. 分布式部署和负载均衡:Logstash支持分布式部署,可以通过配置多个Logstash节点来实现高可用性和负载均衡。多个Logstash节点可以同时从Kafka主题中消费数据,并进行并行处理和转发,以提高整体系统的性能和吞吐量。 总之,通过Logstash对接Kafka,可以实现灵活、可扩展和高效的数据处理和传输。Logstash提供了丰富的插件和配置选项,可以根据实际需求进行定制化的数据处理流程。同时,Logstash还具有良好的可扩展性和可靠性,适用于各种规模和类型的数据处理场景。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        Logstash对接Kafka
      • 创建的Kafka实例是集群模式么?
        本节介绍分布式消息服务Kafka集群模式 是的,Kafka实例通常是以集群模式创建的。Kafka集群由多个Kafka节点组成,每个节点负责存储和处理消息。集群模式可以提供高可用性、容错性和扩展性。 Kafka集群中的每个节点都可以承担多个分区的领导者或副本角色,从而实现数据的冗余和故障恢复。通过复制机制,Kafka可以提供高可用性和容错性,即使在节点故障的情况下也能保证数据的可靠性和可用性。 注意 Kafka集群的规模和配置取决于应用的需求和可用的资源。可以根据实际情况来决定集群中节点的数量、分区的数量以及数据的复制策略,以满足性能、可用性和扩展性的要求。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        实例问题
        创建的Kafka实例是集群模式么?
      • 变更实例规格
        介绍分布式消息服务Kafka扩容相关内容。 场景描述 当需要处理大量消息时,Kafka实例的扩容是一种常见的解决方案。扩容可以增加Kafka集群的吞吐量、存储能力和高可用性。分布式消息服务Kafka提供三类扩容方案,分别为节点、规格和磁盘扩容,更好满足用户不同场景下的扩容需求。 ● 代理数量扩容:指向Kafka集群中添加更多的节点以增加系统的吞吐量和可靠性。通过扩容,可以将消息的发送和消费负载分摊到更多的节点上,从而提高系统的并发处理能力。 ● 规格扩容:指通过增加Kafka的资源配置来提升系统的处理能力和性能。 ● 磁盘扩容:指增加磁盘的存储容量,以满足更多消息的存储需求。 变更实例规格的影响 变更配置类型 影响 磁盘扩容 磁盘扩容不会影响业务。 代理数量扩容 1.代理数量扩容不会影响原来的代理,业务也不受影响(如果实例已配置分区自动重平衡,会触发重平衡,客户端会触发重连)。 2.新创建的Topic才会分布在新代理上,原有Topic还分布在原有代理上。通过修改Kafka分区平衡,实现将原有Topic分区的副本迁移到新代理上。 规格扩容缩容 1.若Topic为单副本,扩容/缩容期间会出现无法对该Topic生产消息或消费消息,会造成业务中断。 2.若Topic为多副本,代理会滚动重启,代理滚动重启造成分区Leader切换,会发生秒级连接闪断,Leader切换时长一般为1分钟以内。建议您在业务低峰期扩容/缩容,并且再生产客户端配置重试机制,方法如下: (1).生产客户端为Kafka开源客户端时,检查是否配置retries和retry.backoff.ms参数。建议参数值分别配置为:retries10,retry.backoff.ms1000。 (2).生产客户端为Flink客户端时,检查是否配置重启策略,配置重启策略可以参考如下代码。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.seconds(20)));
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        实例管理
        变更实例规格
      • 使用Kafka触发器
        本章介绍函数工作流如何使用Kafka触发器。 使用Kafka触发器后,FunctionGraph会定期轮询Kafka实例指定Topic下的新消息,FunctionGraph将轮询得到的消息作为参数传递来调用函数。 前提条件 进行操作之前,需要做好以下准备。 已经创建函数。 创建Kafka触发器,必须开启函数工作流VPC访问,请参见配置网络。 已经创建Kafka实例。 在Kafka实例下创建主题。 创建Kafka触发器 1、登录函数工作流控制台,在左侧的导航栏选择“函数 > 函数列表”。 2、选择待配置的函数,单击进入函数详情页。 3、选择“设置 > 触发器”,单击“创建触发器”,弹出“创建触发器”对话框。 4、设置以下信息。 触发器类型:选择“分布式消息服务(Kafka)”。 实例:选择已创建专享版Kafka实例。 主题:选择专享版Kafka实例的Topic。 批处理大小:每次从Topic消费的消息数量。 用户名:Kafka实例开启SSL时需要填写。连接Kafka专享版实例的用户名。 密码:Kafka实例开启SSL时需要填写。连接Kafka专享版实例的密码。 5、单击“确定”,完成kafka触发器的创建。 说明 开启函数流VPC访问后,需要在Kafka服务安全组配置对应子网的权限。如何开启VPC访问请参见 Kafka触发器当前支持选择多个Topic主题,从而避免Topic过多导致创建的触发器数量被限制。 配置Kafka事件触发函数。 1、返回函数工作流控制台,在左侧的导航栏选择“函数 > 函数列表”。 2、选择待配置的函数,单击进入函数详情页。 3、在函数详情页,选择函数版本。 4、在“代码”页签下,单击“测试”,弹出“配置测试事件”对话框。 5、填写如下表示测试信息后,单击“保存”。 测试信息 参数 说明 配置测试事件 可创建新的测试事件也可编辑已有的测试事件。选择默认值:“创建新的测试事件”。 事件模板 选择"kafkaeventtemplate"模板,使用系统内置Kafka事件模板。 事件名称 事件名称必须以大写或小写字母开头,支持字母(大写或小写),数字和下划线“”(或中划线“”),并以字母或数字结尾,长度为125个字符,例如kafka123test。 测试事件 自动加载系统内置kafka事件模板,本例不做修改。 6、单击“测试”,可以得到函数运行结果,函数会返回输入kafka消息数据。
        来自:
        帮助文档
        函数工作流
        用户指南
        配置触发器
        使用Kafka触发器
      • 连接未开启SASL的Kafka实例
        命令行模式连接实例 以下操作命令以Linux系统为例进行说明: 步骤 1 解压Kafka命令行工具。 进入文件压缩包所在目录,然后执行以下命令解压文件。 tar zxf [kafkatar] 其中, [kafkatar] 表示命令行工具的压缩包名称。 例如: tar zxf kafka2.122.7.2.tgz 步骤 2 进入Kafka命令行工具的“/bin”目录下。 注意,Windows系统下需要进入“/bin/windows”目录下。 步骤 3 执行如下命令进行生产消息。 ./kafkaconsoleproducer.sh brokerlist {连接地址} topic {Topic名称} 参数说明如下: 连接地址:从前提条件中获取的连接地址,如果是公网访问,请使用“公网连接地址”,如果是VPC内访问,请使用“内网连接地址”,请根据实际情况选择。 Topic名称:Kafka实例下创建的Topic名称。如果Kafka实例开启了自动创建Topic功能,此参数值可以填写已创建的Topic名称,也可以填写未创建的Topic名称。 本文以公网连接为例,获取的Kafka实例公网连接地址为“10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094”。执行完命令后输入内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin] ./kafkaconsoleproducer.sh brokerlist 10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094 topic topicdemo >Hello >DMS >Kafka! >^C[root@ecskafka bin] 如需停止生产使用Ctrl+C命令退出。 步骤 4 执行如下命令消费消息。 ./kafkaconsoleconsumer.sh bootstrapserver {连接地址} topic {Topic名称} group ${消费组名称} frombeginning 参数说明如下: 连接地址:从前提条件中获取的连接地址,如果是公网访问,请使用“公网连接地址”,如果是VPC内访问,请使用“内网连接地址”,请根据实际情况选择。 Topic名称:Kafka实例下创建的Topic名称。 消费组名称:根据您的业务需求,设定消费组名称。 如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败 。 消费组名称开头包含特殊字符,例如下划线“”、
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        连接Kafka实例
        连接未开启SASL的Kafka实例
      • 创建Topic
        参数 参数说明 Topic名称 Topic名称,英文字母、数字、下划线开头,且只能由英文字母、数字、中划线、下划线组成,长度为364个字符。创建Topic后不能修改名称。 分区数 您可以设置Topic的分区数,分区数越大消费的并发度越大。该参数设置为1时,消费消息时会按照先入先出的顺序进行消费。取值范围:1100,默认值:6 副本数 每个Topic设置副本的数量,Kafka会自动在每个副本上备份数据,当其中一个Broker节点故障时数据依然是可用的,副本数越大可靠性越高。默认值:3 分区容量 每个分区的数据量的最大值,超过这个值后前面生产的消息将会被删除,保证了数据不会无限上涨挤爆磁盘。 是否同步刷盘 同步刷盘即确保消息被写入磁盘才会被认定为生产成功,该参数可提高可靠性,但是会影响性能。 消息保留时长 当消息生存时间超过该时长后,将会被清理,可用于控制存储成本。 最小同步副本数 该参数使得消息必须写入设定值个数的副本后,才能被认定生产成功,该参数可提高可靠性,但是过大会影响性能,且必须不大于副本数。 批处理消息最大值 每个批次中最大允许的消息大小,这影响了每次请求中能包含的消息总量和大小。 消息时间戳类型 CreateTime: 这是消息被生产者发送到Kafka时的时间戳,它表示消息创建的实际时间;LogAppendTime: 这是消息被Kafka日志接收并写入到日志文件时的时间戳,它表示消息写入 Kafka 的实际时间。 描述 topic的描述字段,可用作标记和说明。 标签 标签用于从不同维度对资源分类管理。 预设ACL策略 勾选提前设置好的 ACL 策略,具体 ACL 策略设置可参考用户管理页面的ACL策略管理。 消息清除策略 delete:超过消息保留时长后,消息将被删除。compact:超过消息保留时长后,消息将会被压缩而不是直接删除。 允许unclean副本选举 开启该选项后,不在ISR的副本也将可以参与leader选举。 分片滚动时间 当一个日志段的最老消息的创建时间与当前时间的差值达到该配置的值时,Kafka 会创建一个新的日志段。 分片大小 当一个日志段的大小达到该配置的值时,Kafka 会创建一个新的日志段。该配置和分片滚动时间配置可以同时使用,具体看哪个条件先行触发。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        创建Topic
      • 重启实例
        本文主要介绍重启实例。 操作场景 分布式消息服务Kafka管理控制台支持重启运行中的Kafka实例,且可实现批量重启Kafka实例。 说明 在Kafka实例重启过程中,客户端的生产与消费消息等请求会被拒绝。 前提条件 只有当Kafka实例处于“运行中”或“故障”状态,才能执行重启操作。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 通过以下任意一种方法,重启Kafka实例。 勾选Kafka实例名称左侧的方框,可选一个或多个,单击信息栏左上侧的“重启”。 在待重启Kafka实例所在行,单击“重启”。 单击Kafka实例名称,进入实例详情页面。单击右上角的“重启”。 步骤 5 在“重启实例”对话框中,单击“是”,完成重启Kafka实例。 重启Kafka实例大约需要3到15分钟。Kafka实例重启成功后,实例状态切换为“运行中”。 说明 重启Kafka实例只会重启实例进程,不会重启实例所在虚拟机。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        实例管理
        重启实例
      • 事件规则参数
        分布式消息服务Kafka 当事件目标选择分布式消息服务Kafka时,事件目标的type值为kafka,eventTargets中的resourceKey字段中含义如下表所示。 resourceKey 是否必传 form value template instanceId 是 CONSTANT 分布式消息服务Kafka实例Id。 说明 实例Id可在分布式消息服务Kafka管理控制台实例详情页获取。 无 topic 是 CONSTANT Kafka主题。 无 value 是 CONSTANT ORIGINAL JSONPATH TEMPLATE 消息体。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 key 是 CONSTANT EMPTY JSONPATH TEMPLATE 消息键值。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 请求示例: plaintext { "eventBusName": "test", "eventRuleName": "test", "desc": "", "filterPattern": "{}", "eventTargets": [ { "type": "kafka", "eventTargetName": "1gQfuPljs9MJsWM1eHYch", "params": [ { "resourceKey": "instanceId", "value": "356b3496b87xxxxxxxxfe7deef7fe35", "form": "CONSTANT" }, { "resourceKey": "topic", "value": "test2", "form": "CONSTANT" }, { "resourceKey": "value", "value": "", "form": "ORIGINAL" }, { "resourceKey": "key", "value": "", "form": "EMPTY" } ] } ] } 分布式消息服务RocketMQ 当事件目标选择分布式消息服务RocketMQ时,事件目标的type值为rocketmq,eventTargets中的resourceKey字段中含义如下表所示。 resourceKey 是否必传 form value template instanceId 是 CONSTANT 分布式消息服务RocketMQ实例ID。 说明 实例Id可在分布式消息服务RocketMQ管理控制台实例详情页获取。 无 topic 是 CONSTANT RocketMQ主题。 无 body 是 CONSTANT ORIGINAL JSONPATH TEMPLATE 消息体。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 keys 是 CONSTANT EMPTY JSONPATH TEMPLATE 消息索引。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 properties 是 CONSTANT EMPTY JSONPATH TEMPLATE 消息自定义属性。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 tags 是 CONSTANT EMPTY JSONPATH TEMPLATE 消息标签。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 请求示例: plaintext { "eventBusName": "test", "eventRuleName": "test", "desc": "", "filterPattern": "{}", "eventTargets": [ { "type": "rocketmq", "eventTargetName": "lMVBoqi55L3Q6QaCKFWYA", "params": [ { "resourceKey": "instanceId", "value": "89ceb110331e4c968499744c2ccbdbcc", "form": "CONSTANT" }, { "resourceKey": "topic", "value": "TopicA", "form": "CONSTANT" }, { "resourceKey": "body", "value": "", "form": "ORIGINAL" }, { "resourceKey": "properties", "value": "", "form": "EMPTY" }, { "resourceKey": "keys", "value": "", "form": "EMPTY" }, { "resourceKey": "tags", "value": "", "form": "EMPTY" } ] } ] }
        来自:
        帮助文档
        事件总线
        用户指南
        事件总线
        事件规则
        事件规则参数
      • Java客户端接入示例
        生产消息 import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.Future; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.SslConfigs; public class KafkaProducerDemo { public static void main(String args[]) { //加载kafka.properties Properties kafkaProperties JavaKafkaConfigurer.getKafkaProperties(); Properties props new Properties(); //设置接入点,请通过控制台获取对应Topic的接入点 props.put(ProducerConfig.BOOTSTRAPSERVERSCONFIG, kafkaProperties.getProperty("bootstrap.servers")); //设置SSL根证书的路径,请记得将XXX修改为自己的路径 props.put(SslConfigs.SSLTRUSTSTORELOCATIONCONFIG, kafkaProperties.getProperty("ssl.truststore.location")); //根证书store的密码,保持不变 props.put(SslConfigs.SSLTRUSTSTOREPASSWORDCONFIG, "c24f5210"); //接入协议,目前支持使用SSL协议接入 props.put(CommonClientConfigs.SECURITYPROTOCOLCONFIG, "SSL"); //Kafka消息的序列化方式 props.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //请求的最长等待时间 props.put(ProducerConfig.MAXBLOCKMSCONFIG, 30 1000); //设置客户端内部重试次数 props.put(ProducerConfig.RETRIESCONFIG, 5); //设置客户端内部重试间隔 props.put(ProducerConfig.RECONNECTBACKOFFMSCONFIG, 3000); //hostname校验改成空 props.put(SslConfigs.SSLENDPOINTIDENTIFICATIONALGORITHMCONFIG, ""); //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可; //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个 KafkaProducer producer new KafkaProducer (props); //构造一个Kafka消息 String topic kafkaProperties.getProperty("topic"); //消息所属的Topic,请在控制台申请之后,填写在这里 String value "this is the message's value"; //消息的内容 try { //批量获取 futures 可以加快速度, 但注意,批量不要太大 List > futures new ArrayList >(128); for (int i 0; i kafkaMessage new ProducerRecord (topic, value + ": " + i); Future metadataFuture producer.send(kafkaMessage); futures.add(metadataFuture); } producer.flush(); for (Future future: futures) { //同步获得Future对象的结果 try { RecordMetadata recordMetadata future.get(); System.out.println("Produce ok:" + recordMetadata.toString()); } catch (Throwable t) { t.printStackTrace(); } } } catch (Exception e) { System.out.println("error occurred"); e.printStackTrace(); } } }
        来自:
        帮助文档
        分布式消息服务Kafka
        开发指南
        Java
        Java客户端接入示例
      • 为什么Group不存在但能消费消息?
        本节介绍Group不存在但能消费消息原因 我在分布式消息服务Kafka控制台上,未查看到对应的Group,但此Group下却有消费线程在消费消息。 可能原因 如果客户端使用assign方式消费消息,那么即使不创建Group,也可能消费消息。 如果客户端使用subscribe方式消费消息,删除Group后,消费线程未停止或者未发生Rebalance,那么消费线程还可以继续正常消费。 解决方案 如果客户端使用assign方式消费消息,请提前在分布式消息服务Kafka控制台创建Group。 请尽量复用Group,避免创建过多的Group而影响集群的稳定性。 在删除Group前,请确保已停止该Group下的所有消费线程。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        操作类
        为什么Group不存在但能消费消息?
      • 删除消息
        本文主要介绍如何在控制台删除消息。 操作场景 说明 消息删除后无法恢复,请谨慎操作。 前提条件 删除消息前,请先在客户端中设置“auto.offset.reset”参数。“auto.offset.reset”用来指定当Kafka中没有初始偏移量或者当前偏移量不存在(例如当前偏移量已被删除)时,消费者的消费策略。取值如下: latest:偏移量自动被重置到最晚偏移量。 earliest:偏移量自动被重置到最早偏移量。 none:向消费者抛出异常。 操作步骤 1. 登录管理控制台。 2. 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 3. 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 4. 单击Kafka实例名称,进入实例详情页面。 5. 在左侧导航栏选择“Topic管理”,进入Topic列表页面。 6. 在待删除消息的Topic所在行,单击“消息删除”,弹出“消息删除”对话框。 7. 设置消息删除参数,如下表所示。 图消息删除 表 消息删除参数说明 参数 说明 分区 选择消息所在的分区编号。 偏移量 输入偏移量,此偏移量之前的数据将被删除。 说明 如果“偏移量”设置为“1”,表示删除分区中所有的消息。 如果您输入的偏移量不在指定分区的最早偏移量和最晚偏移量之间,消息将不会被删除。 如果需要删除多个分区的消息,单击“添加分区”,设置需要删除消息的分区和偏移量。每次最多可选择10个分区。 8. 单击“确定”,弹出“清理结果”对话框,单击“确定”,完成消息的删除。 图 清理结果
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        消息管理
        删除消息
      • 工作流调度简介
        触发器名称 调用方式 详细介绍 定时触发器 同步调用 按照指定的时间间隔触发工作流执行。 HTTP触发器 同步调用 当通过HTTP请求触发工作流时,工作流会根据请求参数执行。 Kafka触发器 同步/异步调用 当消息队列中出现新消息时,触发工作流执行。 RocketMQ触发器 同步/异步调用 当消息队列中出现新消息时,触发工作流执行。 云原生网关触发器 同步调用 当通过云原生网关触发工作流时,工作流会根据请求参数执行。
        来自:
        帮助文档
        函数计算
        用户指南
        云工作流
        控制台操作
        工作流调度
        工作流调度简介
      • 自建Apache Kafka事件源
        事件示例 plaintext { "id": "601f1f756f8c4d97941e5c04f7add896", "specversion": "1.0", "source": "apache:kafka", "type": "kafka:Topic:SendMessage", "subject": "apache:kafka:bb9fdb42056xxxxxx0242ac110002:dab4124510dd4xxxxxxxxx5c6a6db69:topic:streamsource", "datacontenttype": "application/json", "time": "20251030T07:16:07.692257848Z", "data": { "topic": "streamsource", "partition": 0, "offset": 36, "value": "msg17" }, "ctyunaccountid": "dab4124510dd4xxxxxxxxx5c6a6db69", "ctyunregion": "bb9fdb42056xxxxxx0242ac110002" } data字段包含的参数解释如下表所示: 参数 类型 示例值 描述 topic String source1 Topic名称。 offset Integer 1 消费位点。 key String test 消息Key值。 value Object Hello,Kafka! 消息体,默认以JSON格式编码。
        来自:
        帮助文档
        事件总线
        用户指南
        事件流
        事件源
        自建Apache Kafka事件源
      • Kafka磁盘选择超高IO还是高IO?
        本节介绍分布式消息服务Kafka磁盘类型 选择Kafka磁盘类型主要取决于应用的需求和预算。通常,对于Kafka来说,高IO磁盘是更常见的选择,因为它提供了更好的性能和吞吐量。 高IO磁盘具有更快的随机读写速度和更低的访问延迟。这对于Kafka来说非常重要,因为Kafka的性能和吞吐量受限于磁盘的读写速度。高IO磁盘适用于需要处理大量读写操作的场景,例如高频率的消息传递和数据写入。它可以提供更快的消息处理和更低的延迟,从而提高系统的响应性能。 超高IO磁盘也是一个可选的方案,它提供了更高的随机读写速度和更低的延迟,相对于传统的高IO磁盘来说性能更好。超高IO磁盘适用于对性能要求非常高的场景,例如大规模的实时数据处理和高吞吐量的消息队列。它可以提供更高的IOPS(每秒输入/输出操作数)和更低的延迟,从而支持更快的数据处理和更高的系统吞吐量。 综上所述,对于Kafka来说,一般情况下选择高IO磁盘是比较常见的选择,因为它提供了良好的性能和吞吐量。但如果预算允许,超高IO磁盘可以提供更高的性能和吞吐量,适用于对性能要求非常高的场景。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        计费与购买类
        Kafka磁盘选择超高IO还是高IO?
      • 查看示例代码
        本文主要介绍查看示例代码。 本章节指导您在控制台查看Java、Go和Python生产消费消息的示例代码。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例的名称,进入实例详情页面。 步骤 5 选择“Topic管理”页签,显示已创建的Topic详情。 步骤 6 单击“查看示例代码”,弹出“生产消费示例代码”对话框。 查看Java、Go和Python生产消费消息的示例代码。示例代码区分是否开启SASLSSL认证,“接入类型”为“PLAINTEXT”时,表示未开启SASLSSL认证。“接入类型”为“SASLSSL”时,表示已开启SASLSSL认证。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        查看示例代码
      • 如何判断和处理消息堆积?
        本节介绍Kafka 判断和处理消息堆积 判断消息堆积是否属于正常情况 登录“分布式消息服务Kafka”控制台,在“消费组管理”页面,找到目标消费组,进入“消息堆积”页面。 (1)堆积量保持在一个稳定的数值之间波动,没有持续扩大。说明客户端一直在拉取最新消息,没有消息堆积,属于正常情况。 (2)堆积量逐步扩大,并且当前位点一直不变。客户端的消费线程因为某些原因卡住,没有继续消费,也没有继续向服务端提交位点,属于异常情况,即消息的确堆积了。 (3)堆积量逐步扩大,同时当前位点在前进。说明客户端还在消费中,但是消息的消费速度慢于消息的发送速度。消息堆积大多是消费速度过慢或者消费线程阻塞造成的,建议不要在消费逻辑中有太多耗时的操作。 消息堆积的处理方式 经过上述判断,确认消息的确存在堆积情况时,建议打印消息的消费耗时,或者根据堆栈信息查看线程执行情况,适当调整以加快消息的消费速度,避免出现消息堆积。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        操作类
        如何判断和处理消息堆积?
      • 必须配置的监控告警
        本文主要介绍必须配置的监控告警。 本章节主要介绍部分监控指标的告警策略,以及配置操作。在实际业务中,建议按照以下告警策略,配置监控指标的告警规则。 表 Kafka实例配置告警的指标 指标ID 指标名称 告警策略 指标说明 告警处理建议 brokerdiskusage 磁盘容量使用率 告警阈值:原始值>80%连续触发次数:1告警级别:紧急 该指标为从Kafka节点虚拟机层面采集的磁盘容量使用率。 出现该告警时,需要修改实例存储空间 。具体操作,请参考变更实例规格。 brokercpucoreload CPU核均负载 告警阈值:原始值>2连续触发次数:3告警级别:重要 该指标为从Kafka节点虚拟机层面采集的CPU每个核的平均负载。 出现该告警时,先检查该监控是否长期处于接近或超过告警阈值状态,如果是,需要修改实例基准带宽/代理个数,即扩节点。具体操作,请参考。 brokermemoryusage 内存使用率 告警阈值:原始值>90%连续触发次数:3告警级别:紧急 该指标为Kafka节点虚拟机层面采集的内存使用率。 出现该告警时,需要修改实例基准带宽/代理个数,即扩节点。具体操作,请参考。 currentpartitions 分区数 告警阈值:原始值>分区数上限的90%,不同实例规格分区数上限不同,具体参考产品规格。连续触发次数:1告警级别:重要 该指标用于统计Kafka实例中已经使用的分区数量。 出现该告警时,如果业务后续还需要新增Topic,则需要修改实例基准带宽/代理个数或将业务拆分至多个实例。修改实例基准带宽/代理个数的具体操作,请参考。 brokercpuusage CPU使用率 告警阈值:原始值>90%连续触发次数:3告警级别:重要 统计Kafka节点虚拟机的CPU使用率。 出现该告警时,先检查该监控是否长期处于接近或超过告警阈值状态,如果是,需要修改实例基准带宽/代理个数,即扩节点。具体操作,请参考。 groupmsgs 堆积消息数 告警阈值:原始值>积压上限的90%,积压上限由您根据业务实际情况设定连续触发次数:1告警级别:重要 该指标用于统计Kafka实例中所有消费组中总堆积消息数。 出现该告警时,首先排查是否有闲置消费组,如果有,则删除。其次,可以考虑加快消费速度,例如增加组内消费者数量等。 topicmessagesremained 队列可消费消息数 告警阈值:原始值>积压上限的90%,积压上限由您根据业务实际情况设定连续触发次数:1告警级别:重要 该指标用于统计消费组指定队列可以消费的消息个数。 出现该告警时,首先排查消费者代码逻辑是否有误,例如消费者出现了异常不再消费等。其次,可以考虑加快消息的消费,例如增加队列消费者,并确保分区数大于或等于消费者数。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        监控
        必须配置的监控告警
      • 重启Kafka Manager
        本文主要介绍 重启Kafka Manager。 操作场景 当Kafka Manager无法登录或者无法使用时,例如下图中的报错,可以通过重启Kafka Manager,使Kafka Manager恢复正常。 图 报错信息 说明 重启Kafka Manager不会影响业务。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 通过以下任意一种方法,重启Kafka Manager。 在待重启Manager的Kafka实例所在行,单击“更多 > 重启Manager”。 单击Kafka实例名称,进入实例详情页面。单击右上角的“更多 > 重启Manager”。 步骤 5 单击“确定”。 您可以在实例的“后台任务管理”页面,查看当前任务的操作进度。任务状态为“成功”,表示重启成功。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        实例管理
        重启Kafka Manager
      • 步骤五:配置必须的监控告警
        指标ID 指标名称 告警策略 指标说明 告警处理建议 brokerdiskusage 磁盘容量使用率 告警阈值:原始值>80%连续触发次数:1 告警级别:紧急 该指标为从Kafka节点虚拟机层面采集的磁盘容量使用率。 出现该告警时,需要修改实例 存储空间 。具体操作,请参考 变更实例规格 。 brokercpucoreload CPU核均负载 告警阈值:原始值>2连续触发次数:3 告警级别:重要 该指标为从Kafka节点虚拟机层面采集的CPU每个核的平均负载。 出现该告警时,先检查该监控是否长期处于接近或超过告警阈值状态,如果是,需要修改实例 基准带宽/代理个数 ,即扩节点。具体操作,请参考。 brokermemoryusage 内存使用率 告警阈值:原始值>90%连续触发次数:3 告警级别:紧急 该指标为Kafka节点虚拟机层面采集的内存使用率。 出现该告警时,需要修改实例 基准带宽/代理个数 ,即扩节点。具体操作,请参考。 currentpartitions 分区数 告警阈值:原始值>分区数上限的90%,不同实例规格分区数上限不同,具体参考 该指标用于统计Kafka实例中已经使用的分区数量。 出现该告警时,如果业务后续还需要新增Topic,则需要修改实例 基准带宽/代理个数 或将业务拆分至多个实例。修改实例基准带宽/代理个数的具体操作,请参考。 brokercpuusage CPU使用率 告警阈值:原始值>90%连续触发次数:3 告警级别:重要 统计Kafka节点虚拟机的CPU使用率。 出现该告警时,先检查该监控是否长期处于接近或超过告警阈值状态,如果是,需要修改实例 基准带宽/代理个数 ,即扩节点。具体操作,请参考。 groupmsgs 堆积消息数 告警阈值:原始值>积压上限的90%,积压上限由您根据业务实际情况设定连续触发次数:1 告警级别:重要 该指标用于统计Kafka实例中所有消费组中总堆积消息数。 出现该告警时,首先排查是否有闲置消费组,如果有,则删除。其次,可以考虑加快消费速度,例如增加组内消费者数量等。 topicmessagesremained 队列可消费消息数 告警阈值:原始值>积压上限的90%,积压上限由您根据业务实际情况设定连续触发次数:1 告警级别:重要 该指标用于统计消费组指定队列可以消费的消息个数。 出现该告警时,首先排查消费者代码逻辑是否有误,例如消费者出现了异常不再消费等。其次,可以考虑加快消息的消费,例如增加队列消费者,并确保分区数大于或等于消费者数。
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        步骤五:配置必须的监控告警
      • Kafka Connect接入Kafka
        本节主要介绍Kafka Connect接入Kafka。 最佳实践概述 场景描述 Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理。而导出工作则是将数据从Kafka Topic中导出到其它数据存储系统、查询系统或者离线分析系统等。 如果您想要在流数据平台场景下,使用Kafka Connect快速实现数据从Kafka和其它系统之间进行导入导出,可以参考本实践方案。 技术架构图 暂无。 方案优势 用户可按业务需要,参考Kafka Connect数据导入导出,实现流数据的消息消费。 前提条件 需已购买Kafka实例、创建Topic,并且已成功消费消息。 资源规划 本实践方案内容仅涉及Kafka专享版实例和Flink的安装配置。 分布式消息服务 Figure 1 分布式消息服务 资源类型 配置项 配置明细 说明 :::: 企业中间件 DMS Kafka专享实例 需已购买kafka专享实例,创建好Topic,并成功消费消息。 Kafka Connect流数据平台 Figure 2 Kafka Connect流数据平台 资源类型 配置项 配置明细 说明 :::: 应用软件 Flink框架 Flink 1.14 使用开源Apache Flink 方案正文 Kafka Connect 目前支持两种执行模式:standalone 和 distributed。 standalone模式 通过以下命令以 standalone 模式启动 connect: 接入Kafka专享版与接入开源 Apache Kafka 没有区别,仅需要修改 bootstrap.servers 为申请实例时分配的 IP。 distributed模式 通过以下命令以 distributed 模式启动 connect: 该模式下,Kafka Connect 会将 offsets、configs 和 task status 信息存储在 Kafka Topic 中,存储的Topic 在 connectdistributed 中的以下字段配置: 这三个 Topic 需要手动创建,才能保证创建的属性符合 connect 的要求。 config.storage.topic 需要保证只有一个 partition,多副本且为 compact 模式。 offset.storage.topic 需要有多个 partition,多副本且为 compact 模式。 status.storage.topic 需要有多个 partition,多副本且为 compact 模式。 配置 bootstrap.servers 为申请实例是分配的 IP。 配置 group.id,用于标识 connect 集群,需要与消费者分组区分开来。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        Kafka Connect接入Kafka
      • Go
        消费消息 发送以下命令消费消息。 go run modvendor consumer/consumer.go 消费消息示例代码如下: package main import ( "encoding/json" "fmt" "github.com/confluentinc/confluentkafkago/kafka" "os" "path/filepath" ) type KafkaConfig struct { Topic string json:"topic" Topic2 string json:"topic2" GroupId string json:"group.id" BootstrapServers string json:"bootstrap.servers" SecurityProtocol string json:"security.protocol" } // config should be a pointer to structure, if not, panic func loadJsonConfig() KafkaConfig { workPath, err : os.Getwd() if err ! nil { panic(err) } configPath : filepath.Join(workPath, "conf") fullPath : filepath.Join(configPath, "kafka.json") file, err : os.Open(fullPath); if (err ! nil) { msg : fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err) panic(msg) } defer file.Close() decoder : json.NewDecoder(file) var config &KafkaConfig{} err decoder.Decode(config); if (err ! nil) { msg : fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err) panic(msg) } json.Marshal(config) return config } func doInitConsumer(cfg KafkaConfig) kafka.Consumer { fmt.Print("init kafka consumer, it may take a few seconds to init the connectionn") //common arguments var kafkaconf &kafka.ConfigMap{ "api.version.request": "true", "auto.offset.reset": "latest", "heartbeat.interval.ms": 3000, "session.timeout.ms": 30000, "max.poll.interval.ms": 120000, "fetch.max.bytes": 1024000, "max.partition.fetch.bytes": 256000} kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers); kafkaconf.SetKey("group.id", cfg.GroupId) switch cfg.SecurityProtocol { case "PLAINTEXT" : kafkaconf.SetKey("security.protocol", "plaintext"); case "SSL": kafkaconf.SetKey("security.protocol", "ssl"); kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem") case "SASLSSL": kafkaconf.SetKey("security.protocol", "saslssl"); kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem"); kafkaconf.SetKey("sasl.username", cfg.SaslUsername); kafkaconf.SetKey("sasl.password", cfg.SaslPassword); kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism) case "SASLPLAINTEXT": kafkaconf.SetKey("security.protocol", "saslplaintext"); kafkaconf.SetKey("sasl.username", cfg.SaslUsername); kafkaconf.SetKey("sasl.password", cfg.SaslPassword); kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism) default: panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true)) } consumer, err : kafka.NewConsumer(kafkaconf) if err ! nil { panic(err) } fmt.Print("init kafka consumer successn") return consumer; } func main() { // Choose the correct protocol cfg : loadJsonConfig(); consumer : doInitConsumer(cfg) consumer.SubscribeTopics([]string{cfg.Topic, cfg.Topic2}, nil) for { msg, err : consumer.ReadMessage(1) if err nil { fmt.Printf("Message on %s: %sn", msg.TopicPartition, string(msg.Value)) } else { // The client will //automatically try to recover from all errors. fmt.Printf("Consumer error: %v (%v)n", err, msg) } } consumer.Close() }
        来自:
        帮助文档
        分布式消息服务Kafka
        开发指南
        Go
      • 查询消息
        本文主要介绍查询消息。 操作场景 您可以查看指定Topic不同分区的偏移量、消息大小、创建时间以及消息正文。 操作步骤 步骤 1 登录管理控制台。 步骤 2 在管理控制台右上角单击,选择区域。 说明 请选择Kafka实例所在的区域。 步骤 3 在管理控制台左上角单击,选择“企业中间件”“分布式消息服务”“Kafka专享版”,进入分布式消息服务Kafka专享版页面。 步骤 4 单击Kafka实例名称,进入实例详情页面。 步骤 5 选择“消息查询”页签,在消息页签页面,设置查询的Topic名称、分区以及查询方式。 如果未设置具体分区,查询结果显示Topic所有分区的消息。 查询方式支持以下两种方式: 按创建时间查询:即按生产该消息的时间。 按偏移量查询:即记录消息的位置。 说明 当Topic中的数据量比较大时,单副本Topic查询消息可能会报“内部服务错误”,建议根据数据量适当减小查询时间范围。 步骤 6 单击“搜索”,查询消息。 查询结果如下: 图 查询Topic消息 消息的参数说明如下: Topic名称:消息所在的Topic名称。 分区:消息所在的分区。 偏移量:消息在分区中的位置。 消息大小:消息存入磁盘的大小,单位为B。 创建时间:消息的创建时间。创建时间由生产客户端在生产消息时通过CreateTime指定的,如果生产消息时没有设置此参数,创建时间会默认为1970。 步骤 7 单击“查看消息正文”,弹出“查看消息正文”对话框,查看消息的内容,包括Topic名称、分区、偏移量、创建时间和消息正文。 步骤 8 (可选)如果需要恢复默认设置,单击“重置”。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        消息管理
        查询消息
      • 数据订阅
        参数 说明 示例 规则名称 订阅规则名称 输入:aomkafkatest。 订阅内容 支持“指标”和“告警”。 选择:指标。 订阅目标类型 选择“自定义Kafka”或“分布式消息服务DMS”。 分布式消息服务DMS 实例 选择DMS实例,如没有DMS实例,请单击“创建DMS实例”,创建DMS实例。 kafkaaom7160
        来自:
        帮助文档
        应用运维管理
        用户指南
        配置管理
        数据订阅
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • ...
      • 262
      跳转至
      推荐热词
      天翼云运维管理审计系统天翼云云服务平台云服务备份云日志服务应用运维管理云手机云电脑天翼云云hbase数据库电信云大数据saas服务电信云大数据paas服务轻量型云主机天翼云客户服务电话应用编排服务天翼云云安全解决方案云服务总线CSB天翼云服务器配置天翼云联邦学习产品天翼云云安全天翼云企业上云解决方案天翼云产品天翼云视频云存储

      天翼云最新活动

      安全隔离版OpenClaw

      OpenClaw云服务器专属“龙虾“套餐低至1.5折起

      聚力AI赋能 天翼云大模型专项

      大模型特惠专区·Token Plan 轻享包低至9.9元起

      青云志云端助力计划

      一站式科研助手,海外资源安全访问平台,助力青年翼展宏图,平步青云

      企业出海解决方案

      助力您的业务扬帆出海,通达全球!

      天翼云信创专区

      “一云多芯、一云多态”,国产化软件全面适配,国产操作系统及硬件芯片支持丰富

      中小企业服务商合作专区

      国家云助力中小企业腾飞,高额上云补贴重磅上线

      云上钜惠

      爆款云主机全场特惠,2核4G只要1.8折起!

      天翼云奖励推广计划

      加入成为云推官,推荐新用户注册下单得现金奖励

      产品推荐

      物理机 DPS

      多活容灾服务 MDR

      GPU云主机

      天翼云CTyunOS系统

      AI Store

      Token服务

      应用托管

      一站式智算服务平台

      人脸检测

      推荐文档

      产品功能

      实名认证流程?

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 息壤智算平台
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 天翼云国际站
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2026 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号