云主机开年特惠28.8元/年,0元秒杀等你来抢!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 天翼云新春焕新季 NEW 云主机开年特惠28.8元/年,0元秒杀等你来抢!
  • 云上钜惠 爆款云主机全场特惠,2核4G只要1.8折起!
  • 中小企业服务商合作专区 国家云助力中小企业腾飞,高额上云补贴重磅上线
  • 出海产品促销专区 NEW 爆款云主机低至2折,高性价比,不限新老速来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

息壤智算

领先开放的智算服务平台,提供算力、平台、数据、模型、应用“五位一体”智算服务体系,构建全流程的AI基础设施能力
AI Store
  • 算力市场
  • 模型市场
  • 应用市场
  • MCP市场
公共算力服务
  • 裸金属
  • 定制裸金属
训推服务
  • 模型开发
  • 训练任务
  • 服务部署
模型推理服务
  • 模型广场
  • 体验中心
  • 服务接入
应用托管
  • 应用实例
科研助手
  • 科研智能体
  • 科研服务
  • 开发机
  • 并行计算
大模型
  • DeepSeek-V3.1
  • DeepSeek-R1-0528
  • DeepSeek-V3-0324
  • Qwen3-235B-A22B
  • Qwen3-32B
智算一体机
  • 智算一体机
模型适配专家服务
  • 模型适配专家服务
算力服务商
  • 入驻算力服务商

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场进入AI Store创新解决方案公有云生态专区智云上海应用生态专区
建站工具
  • 新域名服务
  • SSL证书
  • 翼建站
企业办公
  • 安全邮箱
  • WPS 365 天翼云版
  • 天翼企业云盘(标准服务版)
灾备迁移
  • 云管家2.0
  • 翼备份(SaaS版)

定价

协助您快速了解云产品计费模式、价格详情,轻松预估上云成本
价格计算器
  • 动态测算产品价格
定价策略
  • 快速了解计费模式

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼信创云专区
  • 信创云专区
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
培训与认证
  • 天翼云学堂
  • 天翼云认证
开源社区
  • 魔乐社区
  • OpenTeleDB

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 服务保障
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家
我要反馈
  • 建议与反馈
  • 用户体验官
信息公告
  • 客户公告

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 息壤智算
  • 产品
  • 解决方案
  • 应用商城
  • 定价
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      消息队列Kafka版_相关内容
      • Topic和分区问题
        Kafka支持减少分区数吗? Kafka不支持减少分区数,您可以通过删除原先的Topic,然后创建新Topic,重新设置分区数。 Kafka实例创建Topic失败 可能原因:已创建的Topic,分区数之和达到实例规格的分区数上限。不同规格实例配置的分区数上限不同,具体请参考产品规格。 解决方案:对Kafka实例扩容,或者删除不需要的Topic。 Kafka实例支持批量导入Topic功能么?或者是自动生成Topic功能? 支持自动生成Topic功能,但不支持Topic批量导入功能,仅支持批量导出Topic功能。 通过以下任意一种方法,开启自动生成Topic功能: 创建实例时,开启Kafka自动创建Topic。 创建实例后,在实例详情页开启Kafka自动创建Topic。 为什么删除Topic不生效?删除后该Topic仍然存在 可能原因:您开启了自动创建Topic功能,且有消费者正在连接该Topic。所以,如果没有停止您的业务,删除了Topic后,还会有消息生产行为,并自动创建Topic。 解决办法:需要关闭自动创建Topic功能,才可以正常删除Topic。 Kafka实例是否支持查看单个Topic占用磁盘空间? 支持。通过以下任意一种方法,查看单个Topic占用磁盘空间大小。 在Kafka实例名称后,单击,跳转到云监控页面。在“队列”页签中,“队列”选择待查看磁盘空间大小的Topic名称,“监控类型”选择“基本监控”,查看“队列数据容量”,该指标表示该队列当前的消息数据大小。 单击Kafka实例名称,进入实例详情页。在左侧导航栏单击“监控”,进入监控页面。在“主题”页签中,“主题”选择待查看磁盘空间大小的Topic名称,“监控类型”选择“基本监控”,查看“队列数据容量”,该指标表示该队列当前的消息数据大小。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        Topic和分区问题
      • 产品规格
        本节介绍分布式消息服务Kafka的产品规格,以便您正确理解和使用。 Kafka实例规格 (1)以下规格适用于 华东1、华北2、西南1、华南2、上海36、青岛20、长沙42、南昌5、武汉41、杭州7、西南2贵州、太原4、郑州5、西安7、呼和浩特3 节点 注意 通用型规格已调整为白名单特性,如需了解该规格参数请联系技术支持。 单机版实例面向用户体验和业务测试场景,无法保证性能和高可用。如果需要在生产环境使用Kafka实例,建议购买集群版实例。 集群版 Intel计算增强型 实例规格 代理个数 单个代理TPS 单个代理分区上限 单个代理建议主题数 单个代理建议消费组数 单个代理客户端总连接数上限 存储空间范围 单个代理流量规格(MB/S) kafka.2u4g.cluster 350 30000 250 250 20 2000 300500000GB 100 kafka.4u8g.cluster 350 100000 500 500 100 4000 300500000GB 200 kafka.8u16g.cluster 350 150000 1000 1000 150 4000 300500000GB 375 kafka.12u24g.cluster 350 200000 1500 1500 200 4000 300500000GB 625 kafka.16u32g.cluster 350 250000 2000 2000 200 4000 300500000GB 750 kafka.24u48g.cluster 350 250000 2000 2000 200 4000 300500000GB 1125 kafka.32u64g.cluster 350 250000 2000 2000 200 4000 300500000GB 1500 kafka.48u96g.cluster 350 250000 2000 2000 200 4000 300500000GB 2250 kafka.64u128g.cluster 350 250000 2000 2000 200 4000 300500000GB 3000
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        产品规格
      • 分布式消息产品选型
        介绍分布式消息产品选型对比项。 特性 Kafka RabbitMQ RocketMQ 功能 支持功能较少,不支持延迟发送,消息重试等功能 功能丰富,支持多个队列种类(优先级队列、延迟队列、死信队列镜像队列等),提供丰富的策略分配 功能完善,支持事务消息、定时消息、事务消息等 单机吞吐量 十万级 万级 几万级 稳定性 队列/分区多时性能不稳定 消息堆积时,性能不稳定 队列较多、消息堆积时性能保持稳定 可用性 非常高(分布式)具有主备故障自动切换 较高,基于主从架构实现高可用性 非常高(分布式)具有主备故障自动切换 选型建议 性能要求高,数据量大,适合产生大量数据的互联网服务的数据收集业务,如日志采集处理、需对接大数据应用等,kafka是首选 数据量少,吞吐量需求不大;数据可靠性要求较高,对功能丰富性极高 可靠性要求很高且性能要求较高的场景以及业务削峰场景,如电商、订单处理等
        来自:
        帮助文档
        分布式消息服务RocketMQ
        产品简介
        分布式消息产品选型
      • KafkaConsumer监控
        指标类别 指标 指标名称 指标说明 单位 数据类型 默认聚合方式 主题 (topic,kafka的topic监控数据。) id id clientid和ip信息 ENUM LAST 主题 (topic,kafka的topic监控数据。) topic topic kafka的topic名称 ENUM LAST 主题 (topic,kafka的topic监控数据。) bytesConsumedRate 每秒消费字节 每秒消费字节 Byte INT AVG 主题 (topic,kafka的topic监控数据。) fetchSizeAvg 请求获取平均字节 请求获取平均字节 Byte INT AVG 主题 (topic,kafka的topic监控数据。) fetchSizeMax 请求获取最大字节 请求获取最大字节 Byte INT MAX 主题 (topic,kafka的topic监控数据。) recordsConsumedRate 每秒消费消息数 每秒消费消息数 INT AVG 主题 (topic,kafka的topic监控数据。) recordsPerRequestAvg 单次请求平均消息数 单次请求平均消息数 INT AVG 主题 (topic,kafka的topic监控数据。) seqIds Producer生成序列号 Producer生成序列号 STRING LAST 主题 (topic,kafka的topic监控数据。) recordConsumedTotal 总消费次数 总消费次数 INT SUM 主题 (topic,kafka的topic监控数据。) bytesConsumedTotal 总消费字节数 总消费字节数 INT SUM fetch (fetch,kafka的fetch监控数据) id id clientid和ip信息 ENUM LAST fetch (fetch,kafka的fetch监控数据) bytesConsumedRate 每秒消费字节 每秒消费字节 Byte INT AVG fetch (fetch,kafka的fetch监控数据) fetchLatencyAvg 请求平均时延 请求平均时延 ms INT AVG fetch (fetch,kafka的fetch监控数据) fetchLatencyMax 请求最大时延 请求最大时延 ms INT MAX fetch (fetch,kafka的fetch监控数据) fetchRate 每秒请求数 每秒请求数 INT AVG fetch (fetch,kafka的fetch监控数据) fetchSizeAvg 请求获取平均字节 请求获取平均字节 Byte INT AVG fetch (fetch,kafka的fetch监控数据) fetchSizeMax 请求获取最大字节 请求获取最大字节 Byte INT MAX fetch (fetch,kafka的fetch监控数据) recordsConsumedRate 每秒消费消息数 每秒消费消息数 INT AVG fetch (fetch,kafka的fetch监控数据) recordsLagMax 最大堆积消息数 最大堆积消息数 INT MAX fetch (fetch,kafka的fetch监控数据) recordsPerRequestAvg 单次请求平均消息数 单次请求平均消息数 INT AVG fetch (fetch,kafka的fetch监控数据) seqIds Producer生成序列号 Producer生成序列号 STRING LAST fetch (fetch,kafka的fetch监控数据) recordConsumedTotal 总消费次数 总消费次数 INT SUM fetch (fetch,kafka的fetch监控数据) bytesConsumedTotal 总消费字节数 总消费字节数 INT SUM partition (partition,kafka的partition监控数据。) id id clientid和ip信息 ENUM LAST partition (partition,kafka的partition监控数据。) partition partition kafka的partition名称 ENUM LAST partition (partition,kafka的partition监控数据。) recordsLag 堆积消息数 堆积消息数 INT LAST partition (partition,kafka的partition监控数据。) recordsLagAvg 平均堆积消息数 平均堆积消息数 INT AVG partition (partition,kafka的partition监控数据。) recordsLagMax 最大堆积消息数 最大堆积消息数 INT MAX partition (partition,kafka的partition监控数据。) seqIds Producer生成序列号 Producer生成序列号 STRING LAST kafka消费方法监控 (consumer,kafka消费方法监控。) method method 消费方法 ENUM LAST kafka消费方法监控 (consumer,kafka消费方法监控。) concurrentMax 最大并发 最大并发 INT MAX kafka消费方法监控 (consumer,kafka消费方法监控。) errorCount 错误数 错误数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) invokeCount 调用次数 调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) lastError 错误信息 发生错误时产生的错误信息 STRING LAST kafka消费方法监控 (consumer,kafka消费方法监控。) maxTime 最大响应时间 采集周期内最大响应时间 INT MAX kafka消费方法监控 (consumer,kafka消费方法监控。) range1 010ms 响应时间在010ms范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range2 10100ms 响应时间在10100ms范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range3 100500ms 响应时间在100500ms范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range4 5001000ms 响应时间在5001000ms范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range5 110s 响应时间在110s范围调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) range6 10s以上 响应时间在10s以上调用次数 INT SUM kafka消费方法监控 (consumer,kafka消费方法监控。) totalTime 总响应时间 总响应时间 INT SUM KafkaConsumer汇总(total,KafkaConsumer汇总信息统计。) recordConsumedTotal 总消费次数 总消费次数 INT SUM KafkaConsumer汇总(total,KafkaConsumer汇总信息统计。) bytesConsumedTotal 总消费字节数 总消费字节数 INT SUM KafkaConsumer汇总(total,KafkaConsumer汇总信息统计。) recordsLag 总堆积消息数 总堆积消息数 INT LAST 异常 (exception,kafka消费异常信息。) causeType 异常发生类 异常发生类 ENUM LAST 异常 (exception,kafka消费异常信息。) exceptionType 异常类 异常类 ENUM LAST 异常 (exception,kafka消费异常信息。) count 数量 异常数量 INT SUM 异常 (exception,kafka消费异常信息。) message 异常消息 异常消息 STRING LAST 异常 (exception,kafka消费异常信息。) stackTrace 异常堆栈 异常堆栈 CLOB LAST
        来自:
        帮助文档
        应用性能管理
        产品介绍
        指标总览
        消息队列
        KafkaConsumer监控
      • SSL接入性能优化
        本文主要介绍消息队列 Kafka 通过SSL接入的最佳实践,从而帮助您更好的使用该产品。 文中的最佳实践基于消息队列 Kafka 的 Java 客户端;对于其它语言的客户端,其基本概念与思想是通用的,但实现细节可能有差异,仅供参考。 背景 云消息队列 Kafka 版实例如果选择SASLSSL接入时,可能会出现性能较差的情况。可以通过在客户端指定密码套件的方式,手动选择性能和安全性都较高的套件进行TLS通讯。 SSL握手时客户端发送Hello Client 并带上客户端支持的密码套件,服务端收到握手请求后,获取客户端带来的密码套件和服务端支持的密码套件取交集。密码套件加载顺序受客户端jdk版本影响,不同jdk版本,密码套件顺序不一样,可能会导致性能和安全性等无法保证。因此为了保证性能,SSL连接时客户端可以指定密码套件。 推荐的性能和安全性较高的密码套件 TLSECDHERSAWITHAES256GCMSHA384 TLSECDHERSAWITHAES128GCMSHA256 步骤 1.先按照SASLSSL协议接入Kafka,SASLSSL接入可参考文档 SASLSSL接入点接入 2.在此基础上增加 ssl.cipher.suites 配置 客户端关键参数 java Properties props new Properties(); props.put(ProducerConfig.BOOTSTRAPSERVERSCONFIG, BROKERADDR); props.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, StringSerializer.class.getName()); props.put(CommonClientConfigs.SECURITYPROTOCOLCONFIG, "SASLSSL"); props.put("sasl.mechanism", "SCRAMSHA512"); props.put(SaslConfigs.SASLJAASCONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username"testuser" password"f6eca4dbc78df78d63fba980e448185f";");//注:上面的密码f6eca4dbc78df78d63fba980e448185f,为用户管理里面创建用户时填入的密码进行md5的结果,md5取32位小写 props.put("ssl.truststore.location","/kafka/client.truststore.jks"); props.put("ssl.truststore.password","sJses2tin1@23"); props.put("ssl.endpoint.identification.algorithm",""); props.put("ssl.cipher.suites","TLSECDHERSAWITHAES256GCMSHA384,TLSECDHERSAWITHAES128GCMSHA256");//密码套件支持多个,用半角逗号分开
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        SSL接入性能优化
      • 基于事件流实现Kafka消息路由
        本文介绍如何应用事件总线EventBridge的事件流功能实现分布式消息服务Kafka的消息路由。 前提条件 开通事件总线EventBridge并授权。 开通分布式消息服务Kafka并创建最少两个主题。 背景信息 事件流作为更轻量、实时端到端的流式事件通道,提供轻量级的流式数据的过滤和转换的能力,在不同的数据仓库之间、数据处理程序之间、数据分析和处理系统之间进行数据同步。源端分布式消息服务Kafka生产的消息可以通过事件流这个通道被路由到目标端的分布式消息服务Kafka。 步骤一:创建事件流 1. 登录事件总线EventBridge控制台。 2. 在左侧导航栏,单击事件流。 3. 在事件流页面,单击创建事件流。 4. 在创建事件流面板,设置任务名称和描述,配置以下参数,然后单击保存。 a.在Source(源)配置向导,选择数据提供方为分布式消息服务Kafka ,设置以下参数,然后单击下一步。 参数 说明 示例 Kafka实例 选择Kafka实例。 instancexxx Kafka Topic 选择Kafka topic。 topicxxx Group 选择消费组。 快速创建 消费位点 选择消费位点。 最新位点 b.在Filtering(过滤)配置向导,设置事件过滤规则,单击下一步。 c.在Sink(目标)配置向导,选择服务类型为分布式消息服务Kafka,配置参数,单击保存,如图1所示。 参数 说明 示例 Kafka实例 选择Kafka实例。 instancexxx Kafka Topic 选择Kafka topic。 topicxxx 消息体(value) 提取事件中的数据。 $.data.value 消息键值(key) 提前事件中的key到目标。 $.data.key 图1 创建事件流时目标类型选择分布式消息服务Kafka 5. 创建事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。
        来自:
        帮助文档
        事件总线
        最佳实践
        基于事件流实现消息路由
        基于事件流实现Kafka消息路由
      • KafkaProducer监控
        指标类别 指标 指标名称 指标说明 单位 数据类型 默认聚合方式 topic (topic,kafka的topic监控数据。) id id clientid和ip信息 ENUM LAST topic (topic,kafka的topic监控数据。) topic topic kafka的topic名称 ENUM LAST topic (topic,kafka的topic监控数据。) byteRate 每秒发送字节 每秒发送字节 Byte INT AVG topic (topic,kafka的topic监控数据。) recordErrorRate 每秒错误数 每秒错误数 INT AVG topic (topic,kafka的topic监控数据。) recordRetryRate 每秒重试数 每秒重试数 INT AVG topic (topic,kafka的topic监控数据。) recordSendRate 每秒发送数 每秒发送数 INT AVG topic (topic,kafka的topic监控数据。) seqIds Producer生成序列号 Producer生成序列号 STRING LAST topic (topic,kafka的topic监控数据。) recordSendTotal 总发送次数 总发送次数 INT SUM topic (topic,kafka的topic监控数据。) byteTotal 总发送字节数 总发送字节数 INT SUM KafkaProducer汇总(total,KafkaProducer汇总信息统计。) recordSendTotal 总发送次数 总发送次数 INT SUM KafkaProducer汇总(total,KafkaProducer汇总信息统计。) byteTotal 总发送字节数 总发送字节数 INT SUM 异常 (exception,kafka发送异常信息。) causeType 异常发生类 异常发生类 ENUM LAST 异常 (exception,kafka发送异常信息。) exceptionType 异常类 异常类 ENUM LAST 异常 (exception,kafka发送异常信息。) count 数量 异常数量 INT SUM 异常 (exception,kafka发送异常信息。) message 异常消息 异常消息 STRING LAST 异常 (exception,kafka发送异常信息。) stackTrace 异常堆栈 异常堆栈 CLOB LAST 发送方法(doSendMethod,发送消息方法监控。) topic topic topic ENUM LAST 发送方法(doSendMethod,发送消息方法监控。) concurrentMax 最大并发 最大并发 INT MAX 发送方法(doSendMethod,发送消息方法监控。) errorCount 错误数 错误数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) invokeCount 调用次数 调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) maxTime 最慢时延 最慢时延 INT MAX 发送方法(doSendMethod,发送消息方法监控。) range1 0–10ms 时延在010ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range2 10–100ms 时延在10–100ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range3 100–500ms 时延在100–500ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range4 500–1000ms 时延在500–1000ms范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range5 1–10s 时延在1–10s范围调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) range6 10sn 时延在10s以上调用次数 INT SUM 发送方法(doSendMethod,发送消息方法监控。) totalTime 总时延 调用总耗时 INT SUM
        来自:
        帮助文档
        应用性能管理
        产品介绍
        指标总览
        消息队列
        KafkaProducer监控
      • Kafka数据迁移概述
        迁移方案一:先迁生产,再迁消费 先将生产消息的业务迁移到新的Kafka,原Kafka不会有新的消息生产。待原有Kafka实例的消息全部消费完成后,再将消费消息业务迁移到新的Kafka,开始消费新Kafka实例的消息。 本方案为业界通用的迁移方案,操作步骤简单,迁移过程由业务侧自主控制,整个过程中消息不会存在乱序问题, 适用于对消息顺序有要求的场景 。但是该方案中需要等待消费者业务直至消费完毕,存在一个时间差的问题,部分数据可能存在较大的端到端时延。 1、将生产客户端的Kafka连接地址修改为新Kafka实例的连接地址。 2、重启生产业务,使得生产者将新的消息发送到新Kafka实例中。 3、观察各消费组在原Kafka的消费进度,直到原Kafka中数据都已经被消费完毕。 4、将消费客户端的Kafka连接地址修改为新Kafka实例的连接地址。 5、重启消费业务,使得消费者从新Kafka实例中消费消息。 6、观察消费者是否能正常从新Kafka实例中获取数据。 7、迁移结束。 结束 迁移方案二:同时消费,后迁生产 消费者业务启用多个消费客户端,分别向原Kafka和新Kafka实例消费消息,然后将生产业务切到新Kafka实例,这样能确保所有消息都被及时消费。 本方案中消费业务会在一段时间内同时消费原Kafka和新Kafka实例。由于在迁移生产业务之前,已经有消费业务运行在新Kafka实例上,因此不会存在端到端时延的问题。但在迁移生产的开始阶段,同时消费原Kafka与新Kafka实例,会导致部分消息之间的生产顺序无法保证,存在消息乱序的问题。此场景 适用于对端到端时延有要求,却对消息顺序不敏感的业务 。 1、启动新的消费客户端,配置Kafka连接地址为新Kafka实例的连接地址,消费新Kafka实例中的数据。 原有消费客户端需继续运行,消费业务同时消费原Kafka与新Kafka实例的消息。 2、修改生产客户端,Kafka连接地址改为新Kafka实例的连接地址。 3、重启生产客户端,将生产业务迁移到新Kafka实例中。 4、生产业务迁移后,观察连接新Kafka实例的消费业务是否正常。 5、等待原Kafka中数据消费完毕,关闭原有消费业务客户端。 6、迁移结束。 结束
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Kafka数据迁移
        Kafka数据迁移概述
      • 如何提高消息处理效率
        重视消息生产与消费的确认过程 消息生产(发送) Kafka非常重视消息生产确认过程,它提供了可靠的消息传递保证。下面是Kafka在消息生产确认方面的一些关键特性和机制: 同步发送和异步发送:Kafka提供了同步发送和异步发送两种方式。在同步发送中,生产者会等待服务器确认消息已成功写入到所有副本中,然后才会返回确认。这种方式可以确保消息的可靠性,但会影响吞吐量。而在异步发送中,生产者会立即返回确认,不等待服务器的响应。这种方式可以提高吞吐量,但消息的可靠性可能会有所降低。 消息复制机制:Kafka使用多个副本来保证消息的可靠性。在消息发送过程中,生产者将消息写入到主副本,并将消息复制到其他副本。只有当所有副本都成功写入消息后,生产者才会返回确认。这样可以确保即使主副本发生故障,仍然可以从其他副本中读取到消息。 ISR机制:Kafka使用ISR(InSync Replicas)机制来保证消息的可靠性。ISR是指与主副本保持同步的副本集合。只有ISR中的副本成功写入消息后,生产者才会返回确认。如果某个副本与主副本的同步延迟超过一定阈值,那么它将被移出ISR,不再参与消息的确认过程,直到与主副本同步。 消息持久化:Kafka将消息持久化到磁盘,以确保即使发生故障,消息也不会丢失。消息被写入到日志文件中,并通过索引来提供高效的读取和检索。 可配置的确认级别:Kafka提供了可配置的消息确认级别。确认级别可以设置为0、1或all。在确认级别为0时,生产者不会等待服务器的确认,直接返回确认。在确认级别为1时,生产者会等待主副本的确认。在确认级别为all时,生产者会等待所有副本的确认。确认级别的选择可以根据应用的需求和性能要求进行调整。 总之,Kafka通过同步发送、消息复制、ISR机制、消息持久化和可配置的确认级别等机制,重视消息生产确认过程,以确保消息的可靠性和一致性。这些机制使得Kafka成为一个可靠的分布式消息系统。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        如何提高消息处理效率
      • 消费组详情
        介绍分布式消息服务Kafka消费组详情功能操作内容。 场景描述 当需要查询以下信息时,可通过消费组详情页面操作: 查看在线消费者列表及其订阅的主题、分区。 查看消费组订阅的主题的消息堆积详细情况。 Kafka消息堆积的场景包括以下几个: 消费者处理延迟:当消费者的处理能力不足或出现故障时,无法及时消费Kafka中的消息,导致消息堆积。这可能是由于消费者的处理逻辑复杂、处理速度慢,或者消费者的资源不足等原因引起的。 网络故障:当Kafka集群与消费者之间的网络出现故障或不稳定时,可能导致消息传输延迟或中断。这会导致消息在Kafka中堆积,等待网络恢复后才能被消费。 生产者速度超过消费者:当生产者产生消息的速度超过消费者的处理速度时,会导致消息在Kafka中堆积。这可能是由于生产者的速度过快、消费者处理能力不足或者消费者故障等原因引起的。 消费者组调整:当消费者组中的消费者发生变化,如新增或退出消费者,会触发Kafka的重平衡操作。在重平衡期间,消费者无法消费消息,导致消息堆积。这通常发生在消费者扩展或故障恢复时。 高峰期消息涌入:在某些特定的时间段或事件发生时,可能会引发大量的消息涌入Kafka,超过消费者的处理能力。这会导致消息在Kafka中堆积,直到消费者能够跟上消息的处理速度。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        消费组管理
        消费组详情
      • 不使用SASL证书连接
        生产消息 进入Kafka客户端文件的“/bin”目录下,执行如下命令进行生产消息: ./kafkaconsoleproducer.sh brokerlist ​{连接地址} topic 连接地址−−topic​{Topic名称} 参数说明如下: 连接地址:从前提条件中获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 示例如下,“10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094”为获取的Kafka实例公网连接地址。 执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin] ./kafkaconsoleproducer.sh brokerlist 10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094 topic topicdemoHello DMS Kafka! ^C[root@ecskafka bin] 如需停止生产使用Ctrl+C命令退出。 消费消息 执行如下命令进行消费消息: ./kafkaconsoleconsumer.sh bootstrapserver ​{连接地址} topic 连接地址−−topic​{Topic名称} group ${消费组名称} frombeginning 参数说明如下: 连接地址:从前提条件中获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 消费组名称:根据您的业务需求,设定消费组名称。 如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败。 消费组名称开头包含特殊字符,例如下划线“”、 号“ ”时,监控数据无法展示。 示例如下: [root@ecskafka bin] ./kafkaconsoleconsumer.sh bootstrapserver 10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094 topic topicdemo group ordertest frombeginning Kafka! DMS Hello ^CProcessed a total of 3 messages [root@ecskafka bin]
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        步骤四:连接实例生产消费消息
        不使用SASL证书连接
      • Kafka业务迁移
        介绍分布式消息服务Kafka业务迁移内容。 应用场景 Kafka的业务迁移可以应用在多个领域和场景中,包括但不限于以下几个方面: 数据集成和数据仓库:Kafka可以用作数据集成的中间件,将分散的数据源集中到一个统一的平台上。通过使用Kafka的生产者和消费者,可以实现数据的可靠传输和消费,支持实时的数据集成和数据仓库的建设。 实时数据处理和流处理:Kafka提供了流处理功能,可以对实时数据流进行处理和分析。通过使用Kafka Streams、Apache Flink等流处理框架,可以对数据流进行实时的计算、转换、聚合等操作,实现实时数据处理和实时决策。 异步通信和消息队列:Kafka作为消息队列,可以用于异步通信和解耦系统之间的依赖关系。通过将同步的请求和响应转换为异步的消息,可以提高系统的可伸缩性和响应性能,实现松耦合的系统架构。 日志收集和分析:Kafka可以用作日志收集和分析的中间件,用于接收和传递系统和应用程序的日志数据。通过将日志数据发送到Kafka中,可以实现日志的集中存储和分发。同时,可以使用Kafka的消费者来实时消费和分析日志数据,帮助进行故障排查、性能监控和安全审计等工作。 数据同步和复制:在多个数据中心或分布式系统之间进行数据同步和复制时,可以使用Kafka作为数据的中间传输通道。通过使用Kafka的生产者和消费者,可以实现数据的可靠传输和复制,保证数据的一致性和可用性。 这些只是Kafka业务迁移的一部分应用场景,实际应用中还有很多其他的需求和场景。Kafka的高性能、可靠性和可扩展性使其成为业务迁移的理想选择。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        Kafka业务迁移
      • RabbitMQ触发器
        配置项 操作 示例 触发器类型 选择 RabbitMQ触发器。 RabbitMQ触发器 名称 填写自定义的触发器名称。 rabbitmqtrigger 版本或别名 默认值为 LATEST ,支持选择任意函数版本或函数别名。 LATEST RabbitMQ实例 选择已创建的RabbitMQ实例。 Vhost 选择订阅的目标虚拟主机。 / Queue 选择订阅的目标队列。 myqueue 调用方法 选择函数调用方式。 同步调用 :指触发器消费消息后投递到函数是同步调用,会等待函数响应后继续下一个消息投递。 异步调用 :指触发器消费消息后投递到函数是异步调用,不会等待函数响应,可以快速消费事件。 User name 访问分布式消息服务RabbitMQ 版服务端时,需要传入用户名和密码进行权限认证,认证通过才允许访问服务端。 Password 访问分布式消息服务RabbitMQ 版服务端时,需要传入用户名和密码进行权限认证,认证通过才允许访问服务端。 触发器启用状态 创建触发器后是否立即启用。默认选择开启,即创建触发器后立即启用触发器。 启用 推送配置 批量推送条数:批量推送的最大值,积压值达到后立刻推送,取值范围为 [1, 10000]。 批量推送间隔:批量推送的最大时间间隔,达到后立刻推送,单位秒,取值[0,15]。默认0无需等待,数据直接推送。 推送格式:函数收到的事件格式,详情请查阅 重试策略 消息推送函数失败后重试的策略,共两种: 指数退避:指数退避重试,重试5次,重试周期为2,4,8,16,32(秒)。 线性退避:线性退避重试,重试5次,重试周期为1,2,3,4,5(秒)。 容错策略 当重试次数耗尽后仍然失败时的处理方式: 允许容错:当异常发生并超过重试策略配置时直接丢弃。 禁止容错:当异常发生并超过重试策略配置时继续阻塞执行。 死信队列 当容错策略为:允许容错时,可以额外开启死信队列。当开启死信队列时且异常发生并超过重试策略配置时,消息会被投递到指定的消息队列里,当前只支持投递到kafka和rocketmq
        来自:
      • 功能特性
        本节主要介绍分布式消息服务Kafka的功能特性 消息收发 消息发送,支持指定分区发送、同步发送、异步发送、分区批量累积发送;支持身份认证,客户端连接Broker时使用SSL或SASL进行验证,数据传加密传输;支持ACL机制,提供客户端读、写权限认证。支持通过压缩算法实现消息体压缩,减少网络传输数据量,提高Kafka的消息发送吞吐量。 消息消费,支持指定Partition消费、指定分区的offset消费;采用poll方式,支持批量消费,支持广播消费。 消息有序性 针对消息有序的业务需求,分为全局有序和局部有序。 全局有序:一个Topic下的所有消息都需要按照生产顺序消费。 局部有序:一个Topic下的消息,只需要满足同一业务字段的要按照生产顺序消费。例如:Topic消息是订单的流水表,包含订单orderId,业务要求同一个orderId的消息需要按照生产顺序进行消费。 由于Kafka的一个Topic可以分为了多个Partition,Producer发送消息的时候,是分散在不同 Partition的。当Producer按顺序发消息给Broker,但进入Kafka之后,这些消息就不一定进到哪个Partition,会导致顺序是乱的。因此要满足全局有序,需要1个Topic只能对应1个Partition。 要满足局部有序,只需要在发消息的时候指定Partition Key,Kafka对其进行Hash计算,根据计算结果决定放入哪个Partition。这样Partition Key相同的消息会放在同一个Partition。此时,Partition的数量仍然可以设置多个,提升Topic的整体吞吐量。
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        功能特性
      • 使用SASL证书连接
        生产消息 进入Kafka客户端文件的“/bin”目录下,执行如下命令进行生产消息。 ./kafkaconsoleproducer.sh brokerlist ​{连接地址} topic 连接地址−−topic​{Topic名称} producer.config ../config/producer.properties 参数说明如下: 连接地址:从前提条件获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 示例如下,“10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095”为Kafka实例连接地址。 执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin] ./kafkaconsoleproducer.sh brokerlist 10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095 topic topicdemo producer.config ../config/producer.propertiesHello DMS Kafka! ^C[root@ecskafka bin] 如需停止生产使用Ctrl+C命令退出。 消费消息 执行如下命令消费消息。 ./kafkaconsoleconsumer.sh bootstrapserver ​{连接地址} topic 连接地址−−topic​{Topic名称} group ${消费组名称} frombeginning consumer.config ../config/consumer.properties 参数说明如下: 连接地址:从前提条件获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 消费组名称:根据您的业务需求,设定消费组名称。 如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败 。 消费组名称开头包含特殊字符,例如下划线“”、 号“ ”时,监控数据无法展示。 示例如下: [root@ecskafka bin]
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        步骤四:连接实例生产消费消息
        使用SASL证书连接
      • Kafka消费者poll的优化
        运行结果 上面的示例代码中,消费者线程会循环调用 poll()方法来拉取消息,并对拉取到的消息进行处理。在处理消息时,示例代码只是简单地打印了消息的值。 因此,示例代码的响应结果将是每个消费者线程在拉取到消息时打印出消息的值。具体的响应结果将取决于你所消费的Kafka主题中的消息内容。 例如,假设你的Kafka主题中有以下两条消息: 1. Key: null, Value: "Hello, Kafka!" 2. Key: null, Value: "How are you?" 当消费者线程拉取到这两条消息时,它们将会打印如下的响应结果: Received message: Hello, Kafka! Received message: How are you? 请注意,示例代码中的打印语句只是简单地将消息值输出到控制台。在实际应用中,你可以根据需要对消息进行进一步的处理,比如将消息存储到数据库、执行业务逻辑等操作。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        Kafka消费者poll的优化
      • 使用限制
        分布式消息服务Kafka在某些功能做了约束和限制,如下表: 限制项 约束和限制 描述 Kafka Zookeeper 不对外暴露 Kafka实例的Zookeeper目前仅处于自用,不对外提供服务,为Kafka内部使用。 版本 当前服务端版本为1.1.0和2.3.0 兼容0.10以上的客户端版本,推荐使用和服务端一致的版本。 消息大小 生产消息的最大长度为10M 消息长度不要超过10M,否则生产失败。 登录Kafka节点所在机器 不能登录 无 限制Kafka Topic总分区数 限制 Kafka以分区为粒度管理消息,分区多导致生产、存储、消费都碎片化,影响性能稳定性。在使用过程中,当Topic的总分区数达到上限后,用户就无法继续创建Topic。 是否支持自动创建Topic 支持 在创建实例时候,您可以选择是否开启。 当您选择开启,表示生产或消费一个未创建的Topic时,会自动创建一个包含3个分区和3个副本的Topic。 是否需要创建消费组、消费者、生产者 不需要 不需要单独创建消费组、生产者和消费者,在使用时自动生成,实例创建后,直接使用即可 减少分区数 不支持 按照开源Kafka现有逻辑,不支持减少分区数。
        来自:
        帮助文档
        专属云分布式消息服务Kafka
        产品简介
        使用限制
      • 路由到分布式消息服务Kafka
        结果验证 1. 使用事件源触发一个事件。 2. 您可以在分布式消息服务Kafka管理控制台确认是否接收到事件,如图2所示。 1. 登录分布式消息服务Kafka管理控制台,然后在左侧导航栏选择实例列表。 2. 在实例列表页面单击目标实例名称。 3. 在消息查询页面,进行对应消息查询即可查看消息内容。 图2 在分布式消息服务Kafka管理控制台中查看消息详情
        来自:
        帮助文档
        事件总线
        用户指南
        事件流
        事件目标
        路由到分布式消息服务Kafka
      • 支持的监控指标
        操作场景 天翼云分布式消息Kafka自集成了一整套监控方案,对Kafka实例的运行状态进行日常监控,可以通过管理控制台查看Kafka实例各项监控指标。各项监控指标可以分为实例监控、实例节点监控、主题监控、消费组监控和Connect监控,各项监控指标的具体细节如下表所示。 操作前提 已开通天翼云Kafka实例,且实例状态为“运行中” Kafka 实例是天翼云Ⅱ类资源池实例,目前Ⅱ类资源池包括:华东1、上海36、华北2、长沙42、武汉41、西安7、杭州7、青岛20、西南1、西南2、广州4、郑州5、华南2等 监控指标 监控项的数据聚合周期为1分钟,即1分钟计算一次,计算出来每秒字节数。您可以将该数据理解为一分钟内的平均值。 1. 实例监控 指标ID 指标名称 指标含义 取值范围 单位 currentbrokers 存活节点数 该指标用于统计Kafka实例中正常运行的实例节点数 0~50 Count currenttopics 主题数 该指标用于统计Kafka实例中已经创建的主题数量。 0~2000 Count currentpartitions 分区数 该指标用于统计Kafka实例中已经使用的分区数量。 0~2000 Count groupmsgs 堆积消息数 该指标用于统计Kafka实例中所有消费组中总堆积消息数。 >0 Count instancebytesinrate 生产流量 该指标用于统计Kafka实例中每秒生产的字节数。 >0 MB/s instancebytesoutrate 消费流量 该指标用于统计Kafka实例中每秒生产的字节数。 >0 MB/s instancemessagesinrate 消息生产速率 该指标用于统计实例每秒生产的消息数。 >0 Count/s instancemessagesoutrate 消息消费速率 该指标用于统计实例每秒消费的消息数。 注意:2025年1月及以后购买的实例,支持此监控项。 >0 Count/s instancerequestqueuesize 实例请求队列长度 该指标用于统计实例请求队列长度。 >0 Count instanceresponsequeuesize 实例响应队列长度 该指标用于统计实例响应队列长度。 >0 Count instanceconnectionusage 实例连接数使用率 该指标用于统计实例连接数使用率 注意:2025年6月及以后购买的实例,支持此监控项。 0~100 % instancetopicusage 实例用户主题数使用率 该指标用于统计实例租户主题使用率 注意:2025年6月及以后购买的实例,支持此监控项。 0~100 % instancepartitionusage 实例用户主题分区数使用率 该指标用于统计实例租户主题分区使用率 注意:2025年6月及以后购买的实例,支持此监控项。 0~100 % instancegroupusage 实例用户消费组数使用率 该指标用于统计实例租户消费组使用率 注意:2025年6月及以后购买的实例,支持此监控项。 0~100 % instanceproducelimit 实例生产限流次数 该指标用于统计实例生产限流次数 注意:2025年6月及以后购买的实例,支持此监控项。 >0 Count instanceconsumelimit 实例消费限流次数 该指标用于统计实例消费限流次数 注意:2025年6月及以后购买的实例,支持此监控项。 >0 Count
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        可观测
        监控信息
        支持的监控指标
      • 消息监控
        KafkaConsumer 总消费次数&总消费字节数 显示总消费次数和总消费字节数的趋势图。 总消费次数:这个指标表示消费者从Kafka中成功读取并处理的消息数量。它是一个计数器,用于跟踪消费者在其生命周期内接收到的消息总数。这对于监控消费者的工作负载和性能非常有用,可以帮助开发者了解消费者是否有效地从生产者那里获取了数据。 总消费字节数:这个指标表示消费者从Kafka中成功读取的数据总量,以字节为单位。它考虑了每条消息的大小,并累加起来给出一个总和。这对于评估网络带宽使用情况、存储需求以及数据传输效率等方面非常重要。例如,如果一个主题有20个分区和5个消费者,每个消费者需要至少4MB的可用内存来接收记录。因此,通过监控总消费字节数,可以更好地理解数据流的规模和处理速度。 这两个指标共同提供了对Kafka消费者行为的全面视图,使得开发者能够优化系统性能,确保数据处理的高效性和稳定性。 通过消息队列接收消息 显示topic列表,表头如下。 topic:是指消息队列中接收到的特定主题(topic)的消息。主题用于将相关的消息进行归类和分组。 调用次数:是指在一段时间内,通过消息队列接收消息的总调用次数。每次调用消息队列的接收操作(例如KafkaConsumer的poll()方法)并成功获取一条消息都会被计算为一次调用。 平均响应时间(ms):是指从发起接收请求到接收到消息的平均时间。它表示了接收消息的速度和效率。 错误数:表示在接收消息的过程中发生的错误次数。这些错误可能包括接收消息失败、网络连接问题或其他异常情况。 最慢调用(ms):是指在一段时间内,接收消息过程中最耗时的一次调用的时间。它反映了接收消息中的性能瓶颈或延迟情况。 操作:点击详情,出现弹层显示调用次数、平均响应时间(ms)、错误数在筛选时间段内的趋势图。 这些指标共同构成了对消息队列系统性能和可靠性的全面评估,有助于管理员和开发者优化系统配置,提高系统的稳定性和效率。
        来自:
        帮助文档
        应用性能监控 APM
        用户指南
        应用列表
        应用详情
        消息监控
      • 删除消息
        介绍分布式消息服务Kafka删除消息功能操作内容。 场景描述 在以下场景中,可以考虑删除Kafka中的消息: 错误消息:当Kafka中的消息存在错误或异常时,需要将这些消息删除。这可能是由于消息格式错误、消息内容不完整或者其他数据质量问题导致的。删除错误消息可以保持数据的一致性和准确性。 保留策略变更:Kafka中可以设置消息的保留时间或者大小,即消息在主题中的保留期限。当需要更改消息的保留策略时,可能需要删除旧的消息以应用新的策略。例如,如果要缩短消息的保留时间,则需要删除超过新保留期限的旧消息。 清理测试数据:在测试环境中,经常需要清理旧的测试数据,以确保环境的可用空间和性能。当测试数据不再需要时,可以删除相应的消息来释放资源。 故障恢复:当Kafka发生故障或者数据丢失时,可能需要删除一些消息以进行数据恢复。在这种情况下,需要根据故障的范围和影响选择删除相应的消息,以恢复数据的完整性。 需要注意的是,在删除消息之前,需要确保删除的消息不再需要或者已经备份。删除消息是不可逆的操作,一旦删除就无法恢复。因此,在执行删除操作之前,建议先进行备份或者确认消息不再需要。 操作步骤 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“Topic管理”后进入主题管理页面。 (5)在Topic所在行,点击其右侧“更多”,在下拉框中单击“消息删除”,并选择确认。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        Topic管理
        删除消息
      • 创建Topic
        本节主要介绍分布式消息服务Kafka的创建Topic说明 背景信息 Kafka的主题(Topic)是对消息进行分类和组织的概念。主题是Kafka中最基本的逻辑单元,用于区分不同类型的消息。 每个主题可以有多个分区(Partition),每个分区都是一个有序的消息日志。分区允许消息在集群中进行并行处理,提高了吞吐量和并发性能。 主题的特点包括: 逻辑分类:主题可以根据业务需求或数据类型进行逻辑分类,例如订单主题、日志主题、用户行为主题等。 多分区:主题可以被分成多个分区,每个分区都有自己的消息顺序。分区可以在不同的节点上进行存储和处理,实现了水平扩展和负载均衡。 副本复制:每个分区可以有多个副本(Replica),副本用于提供高可用性和容错性。副本可以分布在不同的节点上,当节点故障时可以自动进行故障转移。 持久化存储:主题中的消息被持久化存储在磁盘上,确保消息的可靠性和持久性。消费者可以随时消费主题中的历史消息,而不仅仅是最新的消息。 通过使用主题,Kafka能够有效地组织和管理大规模的消息流。主题的分区和副本机制提供了高可用性和容错性,使得Kafka成为处理大规模实时数据流的理想选择。 前提条件 已购买并部署分布式消息服务Kafka 操作步骤 创建Topic (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“Topic管理”后、点击“创建Topic”。 (5)点击“创建Topic”后,输入Topic名称、分区数等参数。 (6)创建好Topic后,可以点击“更多”按钮中的“生产消息”来测试其是否正常。
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        创建Topic
      • 监控安全风险
        介绍分布式消息服务Kafka监控安全风险 分布式消息服务Kafka提供资源和操作监控能力,可以对每个消息实例实时监控、告警和通知操作。用户可以实时掌握实例、主题、消费组的生产消费情况,包括流量、堆积量、重平衡次数等关键信息。 分布式消息服务Kafka支持的监控指标等内容,参考监控信息。
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        安全
        监控安全风险
      • 按时间查询
        介绍分布式消息服务Kafka按时间查询消息功能操作内容。 场景描述 Kafka按时间查询是指通过指定时间范围来查询Kafka主题中的消息。以下是一些常见的按时间查询的场景描述: 数据分析和报告:在数据分析和报告生成的场景中,经常需要按时间查询Kafka主题中的消息。通过指定起始时间和结束时间,可以获取在特定时间范围内产生的消息,用于进一步的数据分析和报告生成。 故障排查:当出现故障或问题时,按时间查询可以帮助定位问题的发生时间和相关的消息。通过指定故障发生的时间范围,可以获取相关的消息,用于故障排查和问题分析。 监控和警报:按时间查询还可以用于监控和警报系统。通过定期按时间查询Kafka主题中的消息,可以检查是否有异常或异常事件发生,并触发相应的警报机制。 数据回溯和重播:按时间查询功能还可以用于数据回溯和重播。通过指定特定的时间范围,可以获取过去某个时间段内的消息,并进行数据回溯或重播操作。 数据同步和复制:在数据同步和复制的场景中,按时间查询可以帮助确保数据的一致性。通过按时间查询源和目标主题中的消息,可以比较不同时间段内的消息,并进行数据同步和复制操作。 操作步骤 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“消息查询”后默认就是按点位查询。 (5)选择按时间查询,选择需要查询的Topic,输入分区号以及时间段,点击”查询“按钮,会返回时间段内的所有消息。 (6)点击消息列表的“消息详情”可查看消息详情信息。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        消息查询
        按时间查询
      • Python
        环境安装 1. 安装Python。(Python版本为2.7或3.X。) 2. 安装依赖库。(使用公网连接需要安装confluentkafka 1.9.2及以下版本的依赖库) pip install confluentkafka1.9.2 3. 下载Demo包kafkaconfluentpythondemo.zip。 配置修改 1. 如果是ssl连接,需要在控制台下载证书。并且解压压缩包得到ssl.client.truststore.jks,执行以下命令生成caRoot.pem文件。 keytool importkeystore srckeystore ssl.client.truststore.jks destkeystore caRoot.p12 deststoretype pkcs12 openssl pkcs12 in caRoot.p12 out caRoot.pem 2. 修改setting.py文件。(calocation仅在ssl连接时需要配置) kafkasetting { 'bootstrapservers': 'XXX', 'topicname': 'XXX', 'groupname': 'XXX' } 生产消息 发送以下命令发送消息。 python kafkaproducer.py 生产消息示例代码如下: from confluentkafka import Producer import setting conf setting.kafkasetting """初始化一个 Producer 对象""" p Producer({'bootstrap.servers': conf['bootstrapservers']}) def deliveryreport(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) """异步发送消息""" p.produce(conf['topicname'], "Hello".encode('utf8'), callbackdeliveryreport) p.poll(0) """在程序结束时,调用flush""" p.flush() 消费消息 发送以下命令消费消息。 python kafkaconsumer.py 消费消息示例代码如下: from confluentkafka import Consumer, KafkaError import setting conf setting.kafkasetting c Consumer({ 'bootstrap.servers': conf['bootstrapservers'], 'group.id': conf['groupname'], 'auto.offset.reset': 'latest' }) c.subscribe([conf['topicname']]) while True: msg c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() KafkaError.PARTITIONEOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf8'))) c.close()
        来自:
        帮助文档
        分布式消息服务Kafka
        开发指南
        Python
      • 日志处理工具Logstash接入Kafka
        本文主要介绍日志处理工具Logstash接入Kafka。 最佳实践概述 场景描述 Kafka与Logstash互联,可实现将Kafka作为inputs把消息传递给Logstash进行处理、或者是作为outputs将Kafka用作消息目的地。 技术架构图 暂无。 方案优势 可以异步处理数据:防止突发流量。 解耦:当发生异常的时候不会影响上游工作。 前提条件 需已购买Kafka实例、创建Topic,并且已成功消费消息。 资源规划 本实践方案内容仅涉及Kafka专享版实例和Logstash。 分布式消息服务 Figure 分布式消息服务 资源类型 配置项 配置明细 说明 :::: 企业中间件 DMS Kafka专享实例 需已购买kafka专享实例,创建好Topic,并成功消费消息。 Logstash下载与安装 下载并安装Logstash,验证运行成功。 方案正文 配置Kafka 1、登录分布式消息服务Kafka版控制台,按需要单击“实例名称”,进入实例基本信息页面。 图 Kafka控制台实例列表 2、在“连接地址”模块,获取Kafka连接IP地址。 图 连接IP地址与端口 3、创建Logstashtest的Topic。 图 创建Topic
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        日志处理工具Logstash接入Kafka
      • 分布式消息服务Kafka事件目标
        参数 说明 示例 实例 选择分布式消息服务Kafka实例。 instancexxx Topic 选择分布式消息服务Kafka实例的Topic。 topic1 消息体 选择消息体(Body)的内容,更多信息请参考 完整事件 消息键值 选择消息键值(Key)的内容,更多信息请参考 空
        来自:
        帮助文档
        事件总线
        用户指南
        事件总线
        事件规则
        事件目标
        目标服务类型
        分布式消息服务Kafka事件目标
      • 连接未开启SASL的Kafka专享实例
        目前提供Kafka专享版实例的服务,Kafka专享版实例采用物理隔离的方式部署,租户独占Kafka实例。创建Kafka专享版实例之后,使用开源Kafka客户端向Kafka专享版实例生产消息和消费消息。 本章节介绍如何使用开源的Kafka客户端访问未开启SASL的Kafka专享实例方法。 多语言客户端使用请参考Kafka官网: 说明:Kafka服务器允许客户端单IP连接的个数为200个,如果超过了,会出现连接失败问题。 前提条件 已配置正确的安全组。 访问未开启SASL的Kafka专享实例时,支持VPC内访问。实例需要配置正确的安全组规则,具体安全组配置要求,请参考下表。 已获取连接Kafka专享版实例的地址。 使用VPC内访问,实例端口为9092,实例连接地址获取如下图。 获取VPC内访问Kafka专享实例的连接地址(实例未开启SASL) Kafka专享实例已创建Topic,否则请提前创建Topic。 已下载Kafka命令行工具1.1.0版本或者Kafka命令行工具2.3.0版本,确保Kafka实例版本与命令行工具版本相同。 已在Kafka命令行工具的使用环境中安装Java Development Kit 1.8.111或以上版本,并完成环境变量配置。 命令行模式连接实例 以下操作命令以Linux系统为例进行说明: 解压Kafka命令行工具。 进入文件压缩包所在目录,然后执行以下命令解压文件。 tar zxf [kafkatar] 其中,[kafkatar]表示命令行工具的压缩包名称。 例如: tar zxf kafka2.111.1.0.tgz 进入Kafka命令行工具的“/bin”目录下。 注意,Windows系统下需要进入“/bin/windows”目录下。 执行如下命令进行生产消息。 ./kafkaconsoleproducer.sh brokerlist ${连接地址} topic ${Topic名称} 参数说明如下: 连接地址:从前提条件中获取的连接地址。 Topic名称:Kafka实例下创建的Topic名称。 以获取的Kafka实例连接地址为“10.3.196.45:9092,10.78.42.127:9092,10.4.49.103:9092”为例。执行完命令后输入内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin]
        来自:
        帮助文档
        专属云分布式消息服务Kafka
        用户指南
        连接未开启SASL的Kafka专享实例
      • 与其他服务的关系
        专属云(计算独享型) 该服务为专属云Kafka开通前必选服务,在开通了专属云计算独享后,才能在控制台中节点选择此专属云标识的环境,再进入控制台的分布式消息服务,来操作专属云Kafka的开通。 专属云(存储独享型) 该服务为专属云Kafka开通时的可选服务,在进如专属云Kafka的开通过程中,队列实例规格必须在专属云计算独享型,而队列存储的部分支持选择公有云的云盘承载,也可选择已经购买的专属云存储独享型,可根据需求灵活选择。
        来自:
        帮助文档
        专属云分布式消息服务Kafka
        产品简介
        与其他服务的关系
      • 入门指引
        本章节将为您介绍分布式消息服务Kafka入门的基本流程,主要包括环境准备、购买实例、创建Topic以及编译运行Demo Java工程,帮助您快速上手Kafka。 操作流程 操作流程如下: 1. 环境准备 创建Kafka实例先要准备好虚拟私有云、子网和安全组,可选弹性公网IP。 2. 创建实例 在订购分布式消息Kafka填写和确认实例名称、计费模式等信息,确认费用后点击下一步,等待开通流程结果通知成功后完成创建实例。 3. 创建Topic 开通实例后,在控制台相关页面按照指引创建主题,用于发送消息。点击“生产消息”来测试Topic是否正常。 4. 编译运行Demo Java工程 以上工作完成后,在客户端应用编译工程进行生产消费,包括引入依赖,生产消息和消费消息。
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        入门指引
      • 使用Logstash处理数据
        配置处理任务 1. 在云搜索服务实例创建页面选择Logstash实例管理,选择已经加装的Logstash实例。 2. 选择“管道管理”,选择“新建管道”。 3. 在“管道配置”窗口中,配置相关的管道配置。 python input { kafka { autooffsetreset > "latest" bootstrapservers > "ip:port,ip:port,ip:port" consumerthreads > 88 groupid > "yourgroupid" topics > ["yourkafkatopic"] codec > json } } filter { } output { elasticsearch { hosts > ["ip:port"] index > "yourindexname%{+YYYY.MM.dd}" user > "yourusername" password > "yourpassword" } } input部分用于定义数据的来源,即Logstash从哪里获取数据。这里以Kafka为例: autooffsetreset:会决定从哪个位置开始消费消息 bootstrapservers:Kafka集群的地址 consumerthreads:控制 Kafka 消费者线程的数量 groupid:消费者组ID topics:Kafka的主题列表 codec:指定数据格式 filter部分可以进行消息处理、数据解析等操作。根据实际业务规则来处理。 output部分用于将处理后的数据推送到外部系统(如数据库、消息队列、搜索引擎等)。这里以Elasticsearch为例: hosts:指定Elasticsearch的地址。 index:指定存储到Elasticsearch的索引名称,可以使用日期变量%{+YYYY.MM.dd} 来动态生成基于日期的索引。 user/password:提供Elasticsearch用户名和密码 配置完点击“预校验”,预校验会使用Logstash的语法功能来检测基础的语法,不检测逻辑和连通性。 预校验通过后可以执行后续操作。 4. 在“参数配置”中,配置相关的参数。 参数 说明 管道名称 只能包含字母、数字、中划线或下划线,且必须以字母开头,限制430个字符,实例内唯一 管道描述 限制长度50 管道工作线程数 控制Logstash同时处理事件的线程数 管道批处理大小 单个线程从input部分收集的最大事件数 管道批处理延迟 不满足批处理大小最大等待时间 队列类型 memory模式、persisted模式 memory:内存队列 persisted:磁盘队列 队列最大字节数 persisted模式生效 队列最大存储大小 队列检查点写入数 persisted模式生效 队列检查点数量 5. 启动处理任务 配置完参数后点击“保存并部署”。就启动了读取Kafka中的数据转换后写入到Elasticsearch的管道任务。 等到管道“运行中”可以尝试配置的Kafka的topics中导入一些数据,例如使用Filebeat或其他程序。 可以通过Curl命令查看logstash中配置的索引名,在Elasticsearch实例是否有数据进来。 plaintext curl " 6. 停止任务 当任务执行结束后,可以在“管道管理”页面,点击“停止”,会执行停止管道操作。
        来自:
        帮助文档
        云搜索服务
        快速入门
        使用Logstash处理数据
      • 事件规则参数
        分布式消息服务Kafka 当事件目标选择分布式消息服务Kafka时,事件目标的type值为kafka,eventTargets中的resourceKey字段中含义如下表所示。 resourceKey 是否必传 form value template instanceId 是 CONSTANT 分布式消息服务Kafka实例Id。 说明 实例Id可在分布式消息服务Kafka管理控制台实例详情页获取。 无 topic 是 CONSTANT Kafka主题。 无 value 是 CONSTANT ORIGINAL JSONPATH TEMPLATE 消息体。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 key 是 CONSTANT EMPTY JSONPATH TEMPLATE 消息键值。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 请求示例: plaintext { "eventBusName": "test", "eventRuleName": "test", "desc": "", "filterPattern": "{}", "eventTargets": [ { "type": "kafka", "eventTargetName": "1gQfuPljs9MJsWM1eHYch", "params": [ { "resourceKey": "instanceId", "value": "356b3496b87xxxxxxxxfe7deef7fe35", "form": "CONSTANT" }, { "resourceKey": "topic", "value": "test2", "form": "CONSTANT" }, { "resourceKey": "value", "value": "", "form": "ORIGINAL" }, { "resourceKey": "key", "value": "", "form": "EMPTY" } ] } ] } 分布式消息服务RocketMQ 当事件目标选择分布式消息服务RocketMQ时,事件目标的type值为rocketmq,eventTargets中的resourceKey字段中含义如下表所示。 resourceKey 是否必传 form value template instanceId 是 CONSTANT 分布式消息服务RocketMQ实例ID。 说明 实例Id可在分布式消息服务RocketMQ管理控制台实例详情页获取。 无 topic 是 CONSTANT RocketMQ主题。 无 body 是 CONSTANT ORIGINAL JSONPATH TEMPLATE 消息体。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 keys 是 CONSTANT EMPTY JSONPATH TEMPLATE 消息索引。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 properties 是 CONSTANT EMPTY JSONPATH TEMPLATE 消息自定义属性。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 tags 是 CONSTANT EMPTY JSONPATH TEMPLATE 消息标签。 如果form选择TEMPLATE,则在此处配置模板。详见事件内容转换。 请求示例: plaintext { "eventBusName": "test", "eventRuleName": "test", "desc": "", "filterPattern": "{}", "eventTargets": [ { "type": "rocketmq", "eventTargetName": "lMVBoqi55L3Q6QaCKFWYA", "params": [ { "resourceKey": "instanceId", "value": "89ceb110331e4c968499744c2ccbdbcc", "form": "CONSTANT" }, { "resourceKey": "topic", "value": "TopicA", "form": "CONSTANT" }, { "resourceKey": "body", "value": "", "form": "ORIGINAL" }, { "resourceKey": "properties", "value": "", "form": "EMPTY" }, { "resourceKey": "keys", "value": "", "form": "EMPTY" }, { "resourceKey": "tags", "value": "", "form": "EMPTY" } ] } ] }
        来自:
        帮助文档
        事件总线
        用户指南
        事件总线
        事件规则
        事件规则参数
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • ...
      • 243
      跳转至
      推荐热词
      天翼云运维管理审计系统天翼云云服务平台云服务备份云日志服务应用运维管理云手机云电脑天翼云云hbase数据库电信云大数据saas服务电信云大数据paas服务轻量型云主机天翼云客户服务电话应用编排服务天翼云云安全解决方案云服务总线CSB天翼云服务器配置天翼云联邦学习产品天翼云云安全天翼云企业上云解决方案天翼云产品天翼云视频云存储

      天翼云最新活动

      天翼云新春焕新季

      云主机开年特惠28.8元/年,0元秒杀等你来抢!

      云上钜惠

      爆款云主机全场特惠,2核4G只要1.8折起!

      中小企业服务商合作专区

      国家云助力中小企业腾飞,高额上云补贴重磅上线

      出海产品促销专区

      爆款云主机低至2折,高性价比,不限新老速来抢购!

      天翼云奖励推广计划

      加入成为云推官,推荐新用户注册下单得现金奖励

      产品推荐

      弹性云主机 ECS

      物理机 DPS

      多活容灾服务

      GPU云主机

      轻量型云主机

      公共算力服务

      模型推理服务

      智算一体机

      动作活体识别

      推荐文档

      服务器迁移相关术语解释

      购买类

      邀请参会方

      VPC安全组删除规则

      修改实例安全组

      产品变更

      监控节点运行状态

      生产者

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 天翼云国际站
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2026 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号