爆款云主机低至25.83元/年
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 云聚517 · 好价翼起拼 NEW 爆款云主机低至25.83元/年,参与拼团享更多优惠,拼成得额外优惠券
  • 安全隔离版OpenClaw NEW OpenClaw云服务器专属“龙虾“套餐低至1.5折起
  • 聚力AI赋能 天翼云大模型专项 大模型特惠专区·Token Plan 轻享包低至9.9元起
  • 青云志云端助力计划 NEW 一站式科研助手,海外资源安全访问平台,助力青年翼展宏图,平步青云
  • 企业出海解决方案 NEW 助力您的业务扬帆出海,通达全球!
  • 天翼云信创专区 NEW “一云多芯、一云多态”,国产化软件全面适配,国产操作系统及硬件芯片支持丰富
  • 中小企业服务商合作专区 国家云助力中小企业腾飞,高额上云补贴重磅上线
  • 云上钜惠 爆款云主机全场特惠,2核4G只要1.8折起!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

息壤智算

领先开放的智算服务平台,提供算力、平台、数据、模型、应用“五位一体”智算服务体系,构建全流程的AI基础设施能力
AI Store
  • 算力市场
  • 模型市场
  • 应用市场
公共算力服务
  • 裸金属
  • 定制裸金属
训推服务
  • 模型开发
  • 训练任务
  • 服务部署
Token服务
  • 模型广场
  • 体验中心
  • 服务接入
应用托管
  • 应用实例
科研助手
  • 科研智能体
  • 科研服务
  • 开发机
  • 并行计算
大模型
  • DeepSeek-V4-Flash
  • GLM-5.1
  • Qwen3.5-122B-A10B
  • DeepSeek-V3.2(旗舰版)
  • GLM-5(正式版)
智算一体机
  • 智算一体机
智能体引擎
  • 智能体引擎
智算安全专区
  • 大模型安全评测
  • 大模型安全护栏
模型适配专家服务
  • 模型适配专家服务
算力服务商
  • 入驻算力服务商

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场进入AI Store创新解决方案公有云生态专区智云上海应用生态专区
建站工具
  • 新域名服务
  • SSL证书
  • 翼建站
企业办公
  • 安全邮箱
  • WPS 365 天翼云版
  • 天翼企业云盘(标准服务版)
灾备迁移
  • 云管家2.0
  • 翼备份(SaaS版)

定价

协助您快速了解云产品计费模式、价格详情,轻松预估上云成本
价格计算器
  • 动态测算产品价格
定价策略
  • 快速了解计费模式

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼信创云专区
  • 信创云专区
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
培训与认证
  • 天翼云学堂
  • 天翼云认证
开源社区
  • 魔乐社区
  • OpenTeleDB

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 服务保障
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家
我要反馈
  • 建议与反馈
  • 用户体验官
信息公告
  • 客户公告

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2026智能云生态大会
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 息壤智算
  • 产品
  • 解决方案
  • 应用商城
  • 定价
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      消息队列Kafka版_相关内容
      • 业务过载最佳实践
        本节介绍Kafka业务过载最佳实践 方案概述 Kafka业务过载,一般表现为CPU使用率高、磁盘写满的现象。 当CPU使用率过高时,系统的运行速度会降低,并有加速硬件损坏的风险。 当磁盘写满时,相应磁盘上的Kafka日志目录会出现offline问题。此时,该磁盘上的分区副本不可读写,降低了分区的可用性和容错能力。同时由于Leader分区迁移到其他节点,会增加其他节点的负载。 CPU使用率高的原因 数据操作相关线程数(num.io.threads、num.network.threads、num.replica.fetchers)过多,导致CPU繁忙。 分区设置不合理,所有的生产和消费都集中在某个节点上,导致CPU利用率高。 磁盘写满的原因 业务数据增长较快,已有的磁盘空间不能满足业务数据需要。 节点内磁盘使用率不均衡,生产的消息集中在某个分区,导致分区所在的磁盘写满。 Topic的数据老化时间设置过大,保存了过多的历史数据,容易导致磁盘写满。 实施步骤 CPU使用率高的处理措施: 优化线程参数num.io.threads、num.network.threads和num.replica.fetchers的配置。 num.io.threads和num.network.threads建议配置为磁盘个数的倍数,但不能超过CPU核数。 num.replica.fetchers建议配置不超过5。 合理设置Topic的分区,分区一般设置为节点个数的倍数。 生产消息时,给消息Key加随机后缀,使消息均衡分布到不同分区上。 磁盘写满的处理措施: 扩容磁盘,使磁盘具备更大的存储空间。 迁移分区,将已写满的磁盘中的分区迁移到本节点的其他磁盘中。 合理设置Topic的数据老化时间,减少历史数据的容量大小。 在CPU资源情况可控的情况下,使用压缩算法对数据进行压缩。 常用的压缩算法包括:ZIP,GZIP,SNAPPY,LZ4等。选择压缩算法时,需考虑数据的压缩率和压缩耗时。通常压缩率越高的算法,压缩耗时也越高。对于性能要求高的系统,可以选择压缩速度快的算法,比如LZ4;对于需要更高压缩比的系统,可以选择压缩率高的算法,比如GZIP。 可以在生产者端配置“compression.type”参数来启用指定类型的压缩算法。 Properties props new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 开启GZIP压缩 props.put("compression.type", "gzip"); Producer producer new KafkaProducer<>(props);
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        业务过载最佳实践
      • 安全方案
        数据保护技术 RabbitMQ提供了多种数据保护技术,以确保数据在传输和存储过程中的机密性和完整性。 1. 跨AZ容灾:在不同的可用区部署多个RabbitMQ节点,确保节点之间的数据复制和同步。这样,当一个可用区发生故障时,其他可用区上的节点可以继续提供服务。 2. 副本冗余:RabbitMQ副本冗余是一种保证消息队列的高可用性和数据冗余的策略。通过在多个节点上创建副本,可以确保即使一个节点发生故障,其他节点上的副本仍然可以提供服务。 3. 数据持久化:RabbitMQ数据持久化是通过将队列、消息和交换器进行持久化,确保消息队列中的数据在节点重启或故障时不会丢失。 服务韧性 RabbitMQ服务的韧性是指其在面对各种故障和异常情况时能够保持可用性和可靠性的能力。以下是保障RabbitMQ服务韧性的关键方面: 1. AZ内实例容灾:在不同的可用区内部署多个RabbitMQ节点,使它们能够相互复制和同步消息。这样即使一个可用区发生故障,其他可用区的节点仍然可以提供服务。 2. 数据容灾:RabbitMQ数据容灾是通过持久化消息、队列和交换器、备份和复制以及高可用性集群等策略,保护数据免受损失和故障影响的措施。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        产品简介
        安全方案
      • 分布式消息服务Kafka服务协议
        分布式消息服务Kafka服务协议,请点击这里
        来自:
        帮助文档
        分布式消息服务Kafka
        相关协议
        分布式消息服务Kafka服务协议
      • 消息重复和消费幂等
        Kafka 消费的语义是 “at least once”, 也就是至少投递一次,保证消息不丢,但是不会保证消息不重复。在出现网络问题、客户端重启时均有可能出现少量重复消息,此时应用消费端如果对消息重复比较敏感(比如说订单交易类),则应该做到消息幂等。 以数据库类应用为例,常用做法是: 发送消息时,传入 key 作为唯一流水号ID; 消费消息时,判断 key 是否已经消费过,如果已经消费过了,则忽略,如果没消费过,则消费一次; 当然,如果应用本身对少量消息重复不敏感,则不需要做此类幂等检查。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        消费者实践
        消息重复和消费幂等
      • 产品服务等级协议(SLA)
        视频 天翼云媒体存储服务等级协议 容器与企业中间件 云容器引擎服务等级协议 微服务云应用平台服务等级协议 分布式消息服务RabbitMQ服务等级协议 分布式消息服务RocketMQ服务等级协议 分布式消息服务Kafka服务等级协议 天翼云软件开发生产线CodeArts服务等级协议 天翼云应用性能监控APM服务等级协议 天翼云容器安全卫士服务等级协议 天翼云应用服务网格服务等级协议 天翼云微服务引擎服务等级协议 天翼云容器镜像服务等级协议 天翼云函数计算服务等级协议 安全 Web应用防火墙(边缘云版)服务等级协议 天翼云高防(边缘云版)服务等级协议 天翼云网站安全监测服务等级协议 天翼云爬虫管理平台服务等级协议 天翼云容器安全平台服务等级协议 天翼云托管检测与响应服务(原生版)服务等级协议 天翼云密评专区服务等级协议 天翼云日志审计服务等级协议 天翼云证书管理服务等级协议 天翼云云堡垒机服务等级协议 天翼云密钥管理服务等级协议 天翼云云安全中心服务等级协议 天翼云Web应用防火墙(原生版)服务等级协议 天翼云云防火墙(原生版)服务等级协议 天翼云Web应用防火墙(独享版)服务等级协议 天翼云漏洞扫描(专业版)服务等级协议 天翼云企业主机安全服务等级协议 天翼云态势感知(专业版)服务等级协议 天翼云云防火墙服务等级协议 天翼云运维安全中心(云堡垒机)服务等级协议
        来自:
        帮助文档
        法律声明
        服务等级协议
        产品服务等级协议(SLA)
      • 入门实践
        本文主要介绍分布式消息服务RabbitMQ的入门实践。 当您购买了RabbitMQ实例后,可以根据自身的业务需求使用分布式消息服务RabbitMQ提供的一系列常用实践。 表1 常用最佳实践 实践 描述 RabbitMQ业务迁移 介绍将线下单机或集群实例迁移到天翼云RabbitMQ实例的方案。 RabbitMQ队列迁移 扩容节点、删除队列可能会导致队列在各个节点分布不均衡,本文介绍如何设置队列负载均衡。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        快速入门
        入门实践
      • 事件源概述
        事件源是事件的来源,负责将生产的事件发布到事件总线。 事件总线EventBridge支持以下事件源: 天翼云官方事件源 作为事件源接入时,只要开通相应的天翼云服务,就可以自动接入事件总线EventBridge。通过配置预定义的事件源与事件规则,实现从事件源发布事件到云服务专用总线,经过事件模式过滤后把事件路由到事件目标。 自定义事件源 作为事件源接入时,您可以配置自定义应用事件源,使用SDK接入事件总线EventBridge。通过创建自定义总线、配置事件规则,把自定义应用产生的事件发布到自定义总线,经过事件模式过滤后把事件路由到事件目标。 作为事件源接入时,您可以配置消息类事件提供方,把事件主动拉取到事件总线EventBridge。例如,当您配置事件提供方为分布式消息服务Kafka时,事件总线EventBridge会把分布式消息服务Kafka对应资源产生的消息主动拉取到自定义总线中,经过事件模式过滤后把事件路由到事件目标。
        来自:
        帮助文档
        事件总线
        用户指南
        事件总线
        事件源
        事件源概述
      • 服务等级协议
        产品服务等级协议请查看: 分布式消息服务Kafka服务等级协议
        来自:
        帮助文档
        分布式消息服务Kafka
        相关协议
        服务等级协议
      • 使用KAFKA协议上报日志到LTS
        ${accessSecret}" topic: "${logGroupId}${logStreamId}" sasl.mechanism: "PLAIN" security.protocol: "SASLPLAINTEXT" acks: "0" compression: gzip 2. 通过Logstash软件上报日志。 input { stdin {} } output { kafka { 配置地址 bootstrapservers > "${ip}:${port}" 配置topic topicid > "${logGroupId}${logStreamId}" 配置消息确认机制 acks > "0" 配置压缩方式 compressiontype > "gzip" 配置认证方式 securityprotocol > "SASLPLAINTEXT" saslmechanism > "PLAIN" 用户名 projectId 密码 accessKey accessSecret sasljaasconfig > "org.apache.kafka.common.security.plain.PlainLoginModule required username'${projectId}' password'${accessKey} ${accessSecret}';" } } 3. 通过Flume软件上报日志。 Name a1.sources r1 a1.channels c1 a1.sinks k1 Source a1.sources.r1.type TAILDIR a1.sources.r1.channels c1 a1.sources.r1.filegroups f1 a1.sources.r1.filegroups.f1 /tmp/test.txt a1.sources.r1.fileHeader true a1.sources.r1.maxBatchCount 1000 Channel a1.channels.c1.type memory a1.channels.c1.capacity 10000 a1.channels.c1.transactionCapacity 100 Sink a1.sinks.k1.type org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic ${logGroupId}${logStreamId} a1.sinks.k1.kafka.bootstrap.servers ${ip}:${port} a1.sinks.k1.kafka.producer.acks 0 a1.sinks.k1.kafka.producer.security.protocol SASLPLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism PLAIN a1.sinks.k1.kafka.producer.compression.type gzip a1.sinks.k1.kafka.producer.sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required username"${projectId}" password"${accessKey}
        来自:
        帮助文档
        云日志服务
        用户指南
        日志接入
        其他方式接入
        使用KAFKA协议上报日志到LTS
      • 日志、告警外发设置
        本节指导用户通过Kafka将日志、告警外发至第三方。 日志外发功能为公测功能,新购用户可以限时免费体验。 为日志类型添加转发目标 1. 登录云安全中心控制台。 2. 在左侧导航栏,选择“设置 > 集成配置”,进入集成配置页面。 3. 选择日志类型,点击“转发配置”列的【+】按钮,弹出“新增转发配置”窗口。 4. 选择“转发目标”,单击“保存”,自动返回集成配置页面。此时转发配置 开关为“关闭”,投递状态为“未开启”。 5. 单击“转发配置”开关,打开“转发配置”。 配置转发目标 1. 在新增转发配置页面,单击“转发目标配置”进入转发目标配置页面,单击“新增”,进入“转发目标编辑”页面。 2. 输入相关信息,单击“检测”。 说明 “加密类型”选择“不加密”时,会自动在目标Kafka创建相应的“队列名(topic)”。 “加密类型”选择“SASL加密”时,需要先手动在目标Kafka创建相应的“队列名(topic)”。 3. 检测通过后,单击“保存”。 保存成功,返回转发目标配置页面,可以看到已新增的转发目标。
        来自:
        帮助文档
        云安全中心
        快速入门
        日志、告警外发设置
      • 队列管理概述
        专属队列 专属队列是指队列对应的资源为专属资源,空闲时不释放,即无论是否使用均保留资源的队列类型。专属队列可以保证提交作业时资源一定存在。 队列弹性扩缩容 DLI提供了队列弹性扩缩容的功能。用户在创建指定规格队列后,可根据需要进行弹性扩缩容。 根据业务情况,手动更改队列规格。具体操作请参考队列弹性扩缩容。 说明 新创建的队列需要运行作业后才可进行扩缩容。 队列弹性扩缩容定时任务 DLI提供了队列弹性扩缩容定时任务的功能。用户在创建队列后,可根据需要进行弹性扩缩容定时任务。 根据业务情况,设置队列自动扩缩容的时间,由系统定时触发队列扩缩容。具体操作请参考弹性扩缩容定时任务。 说明 新创建的队列需要运行作业后才可进行扩缩容。 队列自动扩缩容 Flink作业使用队列,DLI可根据作业大小自动触发扩缩容,用户无需进行操作。 说明 新创建的队列需要运行作业后才可进行扩缩容。 队列管理页面 队列管理主要包括如下功能: 队列权限管理 创建队列 删除队列 修改队列网段 队列弹性扩缩容 弹性扩缩容定时任务 测试地址连通性 创建消息通知主题 说明 DLI作业执行失败需要通过SMN发送通知消息,因此需要获得访问和使用SMN(消息通知服务)的SMN Administrator权限。 队列管理页面显示用户创建所有的队列和服务预置的default队列。队列列表默认按创建时间排列,创建时间最近的队列显示在最前端。 队列管理参数 参数 参数说明 名称 队列的名称。 类型 队列的类型。 SQL队列 通用队列 Spark队列(兼容老版本) 规格 队列大小,单位:CUs。 CUs是队列的计价单位。1CUs1Core 4GMem。不同规格的队列对应的计算能力不一样,规格越高计算能力越好。 实际CUs 当前队列实际大小值。 弹性扩缩容 定时扩缩容的目标CU值,或当前规格CU值的最大值和最小值。 用户名 队列所有者。 描述 创建队列时,对队列的描述。如果无描述,则显示“”。 操作 删除:删除所选队列。如果队列中有正在提交或者正在运行的作业,将不支持删除操作。 权限管理:查看队列对应的用户权限信息以及对其他用户授权。 更多 − 重启:强制重启队列。 说明 只有SQL队列有“重启”操作。 − 弹性扩缩容:可以根据需要选择“扩容”或“缩容”,目标值大小必须为16CU的整数倍。 − 弹性扩缩容定时任务:可以根据业务周期或使用情况,在不同的时间或周期内设置不同的队列大小,系统将定时自动进行“扩容”或“缩容”。目标值大小必须为16CU的整数倍。 − 修改网段:使用DLI增强型跨源时,DLI队列网段与数据源网段不能重合,可根据需要进行修改。 − 测试地址连通性:测试队列到指定地址是否可达,支持域名和ip,可指定端口。
        来自:
        帮助文档
        数据湖探索
        用户指南
        队列管理
        队列管理概述
      • 消息查询
        根据ID查询 根据消息ID查询唯一消息,选择消费组后,可以查询到该消息是否被该消费组消费过。 点击“查看”,可以查询该消息的包体内容。 消费状态标志含义: (1)Toconsume:未消费。 (2)Consumed:已签收。 (3)Consuming:已拉取,未签收。 根据offset查询 根据指定队列指定偏移量查询唯一消息,选择订阅组后,可以查询到该消息是否被该订阅组消费过。 点击“查看”,可以查询该消息的包体内容。 基于Topic查询 死信队列查询
        来自:
        帮助文档
        分布式消息服务RocketMQ
        用户指南
        管理消息
        消息查询
      • 查询Topic信息
        本章节介绍了如何查询分布式消息服务RocketMQ实例的Topic。 操作场景 Topic创建成功后,查询Topic相关的配置和状态信息。 操作步骤 1. 登录分布式消息服务RocketMQ控制台。 2. 单击RocketMQ实例的名称,进入实例详情页面。 3. 在左侧导航栏,单击“Topic管理”,进入“Topic管理”页面。 4. 单击需要查询的Topic名称,进入Topic详情页面。 在详情页上方可以查看Topic名称、关联代理、读队列个数、写队列个数和权限。 在详情页下方可以查看Topic在每个代理上的队列状态,包括队列ID、最小偏移量、最大偏移量和消息更新时间。还可以查看消费组消费此Topic的情况,包括消费组名称、最大重试次数和广播消费。 图 Topic详情
        来自:
        帮助文档
        分布式消息服务RocketMQ
        用户指南
        Topic管理
        查询Topic信息
      • 消费组列表
        介绍分布式消息服务Kafka消费组查看功能操作内容。 场景描述 在以下场景中,可以考虑查看Kafka的消费组列表: 监控消费组:通过查看消费组列表,可以监控和管理Kafka中的消费者。可以查看消费组的健康状态等信息,以确保消费者正常工作并及时发现潜在的问题。 动态调整消费者数量:通过查看消费组列表,可以了解当前的消费者数量和状态。根据实际需求,可以动态增加或减少消费者的数量,以适应不同的负载和流量变化。 操作步骤 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理” (4)点击“消费组管理”后出现如下图列表 (5)右上角输入消费组名称,可查询对应消费组 消费组状态说明见下表: 状态 说明 Empty 消费组没有分配到任何分区进行消费。 PreparingRebalance 消费组中有新的消费者加入或者有消费者离开,正在进行重新分配分区的准备工作。 CompletingRebalance 1. 重新分配分区的过程已经完成,消费组即将开始消费。 Stable 消费组中的消费者正常消费消息,并且消费进度与分区分配是一致的。 Dead 消费组已经停止工作,没有任何活跃的消费者。 PreparingSync 消费组正在准备同步消费进度,以确保消费者之间的一致性。 AwaitingSync 消费组中的消费者正在等待同步消费进度的完成。 Rebalancing 消费组正在进行重新分配分区的操作。 DeadAndEmpty 消费组已经停止工作,且没有分配到任何分区进行消费。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        消费组管理
        消费组列表
      • 使用KAFKA协议上报日志到LTS(1)
        ${accessSecret}"; Bind a1.sources.r1.channels c1 a1.sinks.k1.channel c1 SDK 调用示例 1. Java SDK调用示例。 maven依赖(示例kafka协议版本为2.7.1): org.apache.kafka kafkaclients 2.7.1 代码示例: package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerDemo { public static void main(String[] args) { Properties props new Properties(); // 配置地址 props.put("bootstrap.servers", "${ip}:${port}"); // 配置消息确认机制 props.put("acks", "0"); // 配置认证方式 props.put("security.protocol", "SASLPLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); // 用户名 projectId 密码 accessKey accessSecret props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username'${projectId}' password'${accessKey} ${accessSecret}';"); // 配置压缩方式 props.put("compression.type", "${compresstype}"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 1.创建一个生产者对象 Producer producer new KafkaProducer<>(props); // 2.调用send方法 for (int i 0; i ("${logGroupId}${logStreamId}", "${message}"); // 配置recordHeader // record.headers().add(new RecordHeader("LTSLOGTYPE","FORMAT".getBytes())); producer.send(record); } // 3.关闭生产者 producer.close(); } } 2. Python SDK调用示例。 from kafka import KafkaProducer producer KafkaProducer(
        来自:
      • 如何提高消息处理效率
        消息可以批量生产和消费 为提高消息发送和消息消费效率,推荐使用批量消息发送和消费。通常,默认消息消费为批量消费,而消息发送尽可能采用批量发送。同时批量方式可有效减少API调用次数,减少服务使用费用。 如下面两张示意图对比所示,消息批量生产与消费,可以减少API调用次数,节约资源。 图 消息批量生产(发送)与消费 说明 批量发送消息时,单次不能超过10条消息,总大小不能超过512KB。 批量生产(发送)消息可以灵活使用,在消息并发多的时候,批量发送,并发少时,单条发送。这样能够在减少调用次数的同时保证消息发送的实时性。 图 消息逐条生产(发送)与消费 此外,批量消费消息时,消费者应按照接收的顺序对消息进行处理、确认,当对某一条消息处理失败时,不再需要继续处理本批消息中的后续消息,直接对已正确处理的消息进行确认即可。 巧用消费组协助运维 用户使用DMS服务作为消息管理系统,查看队列的消息内容对于定位问题与调试服务是至关重要的。 当消息的生产和消费过程中遇到疑难问题时,通过创建不同消费组可以帮助定位分析问题或调试服务对接。用户可以创建一个新的消费组,对队列中的消息进行消费并分析消费过程,这样不会影响其他服务对消息的处理。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        如何提高消息处理效率
      • 名词解释
        本节介绍了分布式消息服务RabbitMQ产品常见的名词解释。 Vhost 虚拟主机(Virtual Host),类似于Namespace命名空间的概念,逻辑隔离,每个用户里可以创建多个Vhost,每个Vhost可以创建若干个Exchange和Queue。 Queue 消息队列,每个消息都会被投入到一个或者多个Queue里。 Producer 消息生产者,即投递消息的程序。 Consumer 消息消费者,即接受消息的程序。 Connection TCP 连接,Producer 或 Consumer 与消息队列间的物理 TCP 连接。 Connection将应用与分布式消息服务RabbitMQ连接在一起。Connection会执行认证、IP解析、路由等底层网络任务。应用与分布式消息服务RabbitMQ建立Connection需要多个TCP报文交互,因而会消耗较多的网络资源和分布式消息服务RabbitMQ资源。大量的Connection会对分布式消息服务RabbitMQ造成巨大压力,甚至触发分布式消息服务RabbitMQ SYN洪水攻击防护,导致分布式消息服务RabbitMQ无响应,进而影响业务。 Channel 在客户端的每个物理TCP连接里,可建立多个Channel,每个Channel代表一个会话任务。 Channel是物理TCP连接中的虚拟连接。当应用通过Connection与分布式消息服务RabbitMQ建立连接后,所有的AMQP协议操作(例如创建队列、发送消息、接收消息等)都会通过Connection中的Channel完成。Channel可以复用Connection,即一个Connection下可以建立多个Channel。Channel不能脱离Connection独立存在,而必须存活在Connection中。当某个Connection断开时,该Connection下的所有Channel都会断开。当大量应用需要与分布式消息服务RabbitMQ建立多个连接时,建议您使用Channel来复用Connection,从而减少网络资源和分布式消息服务RabbitMQ资源消耗。 Connection和Channel的使用建议 保持Connection长连接,请勿频繁开启或关闭Connection。如果确实需要频繁开启或关闭连接,请使用Channel。 一个进程对应一个Connection,一个进程中的多个线程则分别对应一个Connection中的多个Channel。 Producer和Consumer分别使用不同的Connection进行消息发送和消费。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        产品简介
        名词解释
      • 天翼云官方事件源接入流程
        步骤二:产生对象存储事件 1. 登录对象存储管理控制台。 2. 单击前提条件中您已打开的桶名,点击文件管理,点击上传文件,等待文件上传完成,如图1。 图1 在对象存储管理控制台上传文件 步骤三:查看官方事件源事件示例 1. 登陆事件总线EventBridge管理控制台。 2. 点击default 总线,进入云服务专用事件总线详情页,单击左侧事件源按钮。 3. 单击名为ctyun.zos的事件源,查看事件示例,如图2。 图2 查看对象存储官方事件源的事件列表 步骤四:结果验证 1. 登录分布式消息服务Kafka管理控制台。 2. 在左侧导航栏,单击实例列表,选择目标实例。 3. 在左侧导航栏点击消息查询页面,点击按位点查询,查看事件内容与步骤三中查看的示例是否相符。 图3 查看分布式消息服务Kafka管理控制台中的消息详情
        来自:
        帮助文档
        事件总线
        快速入门
        天翼云官方事件源接入流程
      • 安全方案
        介绍分布式消息服务RocketMQ的安全方案,包括支持TLS传输加密、权限控制、Topic资源访问权限控制等内容。 安全价值 RocketMQ的安全对用户有以下几个重要价值: 1. 保护数据安全:RocketMQ的安全机制可以保护消息的机密性和完整性,防止敏感数据泄露或被篡改。这对于处理包含个人信息、商业机密等敏感数据的应用程序非常重要。 2. 防止未经授权的访问:RocketMQ的访问控制功能可以限制对消息队列的访问权限,只有具有相应权限的用户才能发送和消费消息。这可以防止未经授权的用户访问和操作消息队列,保护系统的安全性。 3. 合规性要求:对于一些行业和法规要求较高的场景,如金融、医疗等,RocketMQ的安全特性可以帮助用户满足合规性要求,确保数据的安全和合规性。 4. 提供安全审计功能:RocketMQ的安全审计功能可以记录和追踪对消息队列的操作,包括发送、消费、订阅等。这可以帮助用户监控和检测潜在的安全风险,及时发现和应对安全事件。 5. 增强用户信任:通过提供安全性能和功能,RocketMQ可以增强用户对系统的信任感。用户可以放心地使用RocketMQ来处理重要的消息传输和处理任务,而不必担心数据的安全问题。 综上所述,RocketMQ的安全性对用户来说具有重要的价值,可以保护数据安全,防止未经授权的访问,满足合规性要求,提供安全审计功能,并增强用户对系统的信任感。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        产品简介
        安全方案
      • RocketMQ .NET SDK
        说明 分布式消息服务RocketMQ兼容了社区版 HTTP SDK,您可以使用社区版 HTTP SDK接入分布式消息服务RocketMQ。 前提条件 1. 下载社区 C SDK到本地并解压。 2. 使用Visual Studio打开sln文件导入工程。 发送普通消息 using System; using Aliyun.MQ; using Aliyun.MQ.Model; namespace MQ.Sample { public class Producer { // 填写分布式消息服务RocketMQ控制台HTTP接入点 private const string endpoint "${HTTPENDPOINT}"; // 填写AccessKey,在管理控制台创建 private const string accessKeyId "${ACCESSKEY}"; // 填写SecretKey 在管理控制台创建 private const string secretAccessKey "${SECRETKEY}"; // 消息所属的Topic,在消息队列RocketMQ版控制台创建。 private const string topicName "${TOPIC}"; // Topic所属实例ID,默认实例为空 private const string instanceId "${INSTANCEID}"; private static MQClient client new Aliyun.MQ.MQClient(accessKeyId, secretAccessKey, endpoint); static MQProducer producer client.GetProducer(instanceId, topicName); static void Main(string[] args) { try { // 循环发送4条消息。 for (int i 0; i messages null; try { messages consumer.ConsumeMessage( 3, // 一次最多消费3条(最多可设置为16条) 3 // 长轮询时间3秒(最多可设置为30秒) ); } catch (Exception exp1) { if (exp1 is MessageNotExistException) { Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId); continue; } Console.WriteLine(exp1); Thread.Sleep(2000); } if (messages null) { continue; } List handlers new List<>(); Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:"); // 处理业务逻辑 foreach (Message message in messages) { Console.WriteLine(message); handlers.Add(message.ReceiptHandle); } // Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样 try { consumer.AckMessage(handlers); Console.WriteLine("Ack message success:"); foreach (string handle in handlers) { Console.Write("t" + handle); } Console.WriteLine(); } catch (Exception exp2) { // 某些消息的句柄可能超时了会导致确认不成功 if (exp2 is AckMessageException) { AckMessageException ackExp (AckMessageException)exp2; Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId); foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems) { Console.WriteLine("tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage); } } } } catch (Exception ex) { Console.WriteLine(ex); Thread.Sleep(2000); } } } }}
        来自:
        帮助文档
        分布式消息服务RocketMQ
        SDK参考
        RocketMQ .NET SDK
      • Kafka客户端参数配置建议
        本文主要 Kafka客户端参数配置建议。 Kafka客户端的配置参数很多,以下提供producer和consumer几个常用参数配置。其他参数配置,请参考Kafka配置。 表 Producer参数 参数 默认值 推荐值 说明 acks 1 高可靠:all或者1高吞吐:1 收到Server端确认信号个数,表示producer需要收到多少个这样的确认信号,算消息发送成功。 acks参数代表了数据备份的可用性。常用选项:acks0:表示producer不需要等待任何确认收到的信息,副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为1。 acks1:这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。如果follower没有成功备份数据,而此时leader又无法提供服务,则消息会丢失。 acksall或者1:这意味着leader需要等待ISR中所有备份都成功写入日志。只要任何一个备份存活,数据都不会丢失。min.insync.replicas指定必须确认写入才能被认为成功的副本的最小数量。 retries 0 结合实际业务调整 客户端发送消息的重试次数。值大于0时,这些数据发送失败后,客户端会重新发送。 注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。针对网络闪断场景,生产者建议配置重试能力,推荐重试次数retries3,重试间隔retry.backoff.ms1000。 request.timeout.ms 30000 结合实际业务调整 设置一个请求最大等待时间,超过这个时间则会抛Timeout异常。超时时间如果设置大一些,如127000(127秒),高并发的场景中,能减少发送失败的情况。 block.on.buffer.full TRUE TRUE TRUE表示当我们内存用尽时,停止接收新消息记录或者抛出错误。默认情况下,这个设置为TRUE。 然而某些阻塞可能不值得期待,因此立即抛出错误更好。如果设置为false,则producer抛出一个异常错误:BufferExhaustedException batch.size 16384 262144 默认的批量处理消息字节数上限。producer将试图批处理消息记录,以减少请求次数。 这将改善client与server之间的性能。不会试图处理大于这个字节数的消息字节数。 发送到brokers的请求将包含多个批量处理,其中会包含对每个partition的一个请求。较小的批量处理数值比较少用,并且可能降低吞吐量(0则会仅用批量处理)。 较大的批量处理数值将会浪费更多内存空间,这样就需要分配特定批量处理数值的内存大小。 buffer.memory 33554432 67108864 producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。 这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。 表 Consumer参数 参数 默认值 推荐值 说明 :::: auto.commit.enable TRUE FALSE 如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程无法提供服务时,由新的consumer使用。约束:设置为false后,需要先成功消费再提交,这样可以避免消息丢失。 auto.offset.reset latest earliest 没有初始化offset或者offset被删除时,可以设置以下值: earliest:自动复位offset为最早 latest:自动复位offset为最新 none:如果没有发现offset则向消费者抛出异常 anything else:向消费者抛出异常。 说明 如果将此配置设置为latest,新增分区时,生产者可能会在消费者重置初始偏移量之前开始向新增加的分区发送消息,从而导致部分消息丢失。 connections.max.idle.ms 600000 30000 空连接的超时时间,设置为30000可以在网络异常场景下减少请求卡顿的时间。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        Kafka客户端参数配置建议
      • 磁盘水位处理
        操作步骤 磁盘自动扩容 1. 登录管理控制台。 2. 进入Kafka管理控制台。 3. 在实例列表页的操作列,目标实例行点击“管理”按钮。 4. 点击“智能运维”、“弹性伸缩”菜单进入磁盘水位处理页面。 5. 在磁盘水位处理页面开启磁盘自动扩容。 6. 点击“配置”按钮,可调整配置参数。 动态策略:当磁盘使用量达到指定比例后,系统自动扩容磁盘指定比例的容量,单节点最低扩容100GB。 最高磁盘:实例磁盘总容量扩容到指定值后不再扩容。 7. 点击“确定”按钮保存配置。 8. 点击调整记录列的“查看”按钮跳转至任务列表查看磁盘自动扩容调整记录。 动态消息保留策略 1. 登录管理控制台。 2. 进入Kafka管理控制台。 3. 在实例列表页的操作列,目标实例行点击“管理”按钮。 4. 点击“智能运维”、“弹性伸缩”菜单进入磁盘水位处理页面。 5. 在磁盘水位处理页面开启动态消息保留策略。 6. 点击“配置”按钮,可调整配置参数。 动态策略:当磁盘使用量达到指定比例后,系统自动调整主题的消息保留时长,删除指定比例的最早消息数据。 保底时长:主题消息的最低保留时长,当某个主题的保留时长调整到该值后,该主题不会再被调整。 7. 点击“确定”按钮保存配置。 8. 点击调整记录列的“查看”按钮跳转至任务列表查看动态消息保留记录。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        智能运维
        弹性伸缩
        磁盘水位处理
      • 名词解释
        名词 说明 环境 用于隔离不同应用的逻辑单元。 应用 一组资源的逻辑集合,通常代表一个业务系统。应用是进行演练和管理的核心对象。 资源 构成应用的组件节点,例如云主机、容器、分布式缓存服务Redis版、分布式消息服务Kafka等实例。 演练 通过向应用的特定资源注入指定故障,并观察其影响,从而验证系统稳定性与韧性的过程。 动作 注入到目标资源上的一个原子性故障,例如“CPU高负载”或“网络延迟”。用户可以在一次演练中对多个动作进行自由组合和编排。 动作组 一个或多个动作的逻辑分组,通常代表一个完整的故障场景。在一个演练任务中,不同的动作组之间可以并行执行。 探针 安装在目标资源(如云主机)上,用于执行具体故障注入动作的代理程序(Agent)。 保护策略 一种自动化的安全机制,用于控制演练的“爆炸半径”。当触发预设条件时,系统会依据此策略自动中止演练并回滚故障。
        来自:
        帮助文档
        应用高可用
        产品简介
        故障演练服务
        名词解释
      • 编译运行Demo Java工程
        介绍连接Kafka编译运行Demo Java工程 kafkaclients引入依赖 在使用Kafka时,你需要在你的项目中引入相应的依赖。具体的依赖项可能会因你的项目和需求而有所不同。在使用Kafka之前,请确保查阅官方文档以获取最新的依赖项和使用说明。 以Java编程语言为例,可以使用Kafka的Java客户端库。你可以在Maven或Gradle构建工具中添加以下依赖项: org.apache.kafka kafkaclients 示例代码 1. 从控制台获取以下信息 连接地址 实例连接地址从控制台实例详情菜单处获取,在实例详情页面的接入点信息一栏。 Topic名称 在Topic管理页面,选择需要的Topic名称。 消费组名称 在消费组管理页面,选择需要的消费组名称。 2. 在实例代码中替换以上信息即可实现消息。 import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { private final KafkaProducer producer; public final static String TOPIC "testtopic"; public final static String BROKERADDR "192.168.0.11:8090,192.168.0.9:8090,192.168.0.10:8090"; public Producer() { Properties props new Properties(); props.put(ProducerConfig.BOOTSTRAPSERVERSCONFIG, BROKERADDR); props.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKSCONFIG, "all"); props.put("retries",3); producer new KafkaProducer<>(props); } public void produce() { try { for (int i 0; i (TOPIC, data), new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception ! null) { // TODO: 异常处理 exception.printStackTrace(); return; } System.out.println("produce msg completed, partition id " + metadata.partition()); } }); } } catch (Exception e) { // TODO: 异常处理 e.printStackTrace(); } producer.flush(); producer.close(); } public static void main(String[] args) { Producer producer new Producer(); producer.produce(); } } 3. 同样在实例代码中替换以上信息即可消费消息。 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class Consumer { private org.apache.kafka.clients.consumer.Consumer consumer; private static final String GROUPID "testgroup"; private static final String TOPIC "testtopic"; public final static String BROKERADDR "192.168.0.11:8090,192.168.0.9:8090,192.168.0.10:8090"; public Consumer() { Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAPSERVERSCONFIG, BROKERADDR); props.put(ConsumerConfig.GROUPIDCONFIG, GROUPID); props.put(ConsumerConfig.AUTOOFFSETRESETCONFIG, "earliest"); props.put(ConsumerConfig.ENABLEAUTOCOMMITCONFIG, "false"); props.put(ConsumerConfig.KEYDESERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUEDESERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer new KafkaConsumer<>(props); } public void consume() { consumer.subscribe(Arrays.asList(TOPIC)); while (true){ try { ConsumerRecords records consumer.poll(1000); System.out.println("the numbers of topic:" + records.count()); for (ConsumerRecord record : records) { System.out.println("the data is " + record.value()); } }catch (Exception e){ // TODO: 异常处理 e.printStackTrace(); } } } public static void main(String[] args) { new Consumer().consume(); } }
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        编译运行Demo Java工程
      • Topic管理
        本章节介绍 Topic管理 。 操作场景 Topic创建成功后,查询Topic相关的配置和状态信息。 操作步骤 1. 登录分布式消息服务RocketMQ控制台。 2. 单击RocketMQ实例的名称,进入实例详情页面。 3. 在左侧导航栏,单击“Topic管理”,进入“Topic管理”页面。 4. 单击需要查询的Topic名称,进入Topic详情页面。 在详情页上方可以查看Topic名称、关联代理、读队列个数、写队列个数和权限。 在详情页下方可以查看Topic在每个代理上的队列状态,包括队列ID、最小偏移量、最大偏移量和消息更新时间。还可以查看消费组消费此Topic的情况,包括消费组名称、最大重试次数和广播消费。 图1 Topic详情
        来自:
        帮助文档
        分布式消息服务RocketMQ
        快速入门
        Topic管理
      • 性能调优
        本章节主要介绍Flink作业相关问题中有关性能调优的问题。 Flink作业如何进行性能调优 概念说明及监控查看 消费组积压 消费组积压可通过topic最新数据offset减去该消费组已提交最大offset计算得出,说明的是该消费组当前待消费的数据总量。 如果Flink作业对接的是kafka专享版,则可通过云监控服务(CES)进行查看。具体可选择“云服务监控 > 分布式消息服务 > kafka专享版” ,单击“kafka实例名称 > 消费组” ,选择具体的消费组名称,查看消费组的指标信息。 反压状态 反压状态是通过周期性对taskManager线程的栈信息采样,计算被阻塞在请求输出Buffer的线程比率来确定,默认情况下,比率在0.1以下为OK,0.1到0.5为LOW,超过0.5则为HIGH。 时延 Source端会周期性地发送带当前时间戳的LatencyMarker,下游算子接收到该标记后,通过当前时间减去标记中带的时间戳的方式,计算时延指标。算子的反压状态和时延可以通过Flink UI或者作业任务列表查看,一般情况下反压和高时延成对出现: 性能分析 由于Flink的反压机制,流作业在存在性能问题的情况下,会导致数据源消费速率跟不上生产速率,从而引起Kafka消费组的积压。在这种情况下,可以通过算子的反压和时延,确定算子的性能瓶颈点。 作业最后一个算子(Sink)反压正常(绿色),前面算子反压高(红色) 该场景说明性能瓶颈点在sink,此时需要根据具体数据源具体优化,比如对于JDBC数据源,可以通过调整写出批次(connector.write.flush.maxrows)、JDBC参数重写(rewriteBatchedStatementstrue)等进行优化。 作业非倒数第二个算子反压高(红色) 该场景说明性能瓶颈点在Vertex2算子,可以通过查看该算子描述,确认该算子具体功能,以进行下一步优化。 所有算子反压都正常(绿色),但存在数据堆积 该场景说明性能瓶颈点在Source,主要是受数据读取速度影响,此时可以通过增加Kafka分区数并增加source并发解决。 作业一个算子反压高(红色),而其后续的多个并行算子都不存在反压(绿色) 该场景说明性能瓶颈在Vertex2或者Vertex3,为了进一步确定具体瓶颈点算子,可以在FlinkUI页面开启inPoolUsage监控。如果某个算子并发对应的inPoolUsage长时间为100%,则该算子大概率为性能瓶颈点,需分析该算子以进行下一步优化。 inPoolUsage监控
        来自:
        帮助文档
        数据湖探索
        常见问题
        操作类
        Flink作业相关问题
        性能调优
      • 与其他云服务的关系
        本文主要介绍与其他云服务的关系。 云审计(Cloud Trace Service) 云审计为您提供云服务资源的操作记录,记录内容包括您从管理控制台或者开放API发起的云服务资源操作请求以及每次请求的结果,供您查询、审计和回溯使用。 当前CTS记录的操作,请参考支持云审计的操作列表。 虚拟私有云(Virtual Private Cloud) Kafka实例运行于虚拟私有云,需要使用虚拟私有云创建的IP和带宽。通过虚拟私有云安全组的功能可以增强访问Kafka实例的安全性。 弹性云主机(Elastic Cloud Server) 弹性云主机是由CPU、内存、操作系统、云硬盘组成的基础的计算组件。Kafka实例运行在弹性云主机上,一个代理对应一台弹性云主机。 云硬盘(Elastic Volume Service) 云硬盘为云服务器提供块存储服务,Kafka的所有数据(如消息、元数据和日志等)都保存在云硬盘中。 云监控(Cloud Eye) 云监控是一个开放性的监控平台,提供资源的实时监控、告警、通知等服务。 说明 Kafka实例向CloudEye上报监控数据的更新周期为1分钟。 弹性公网IP(Elastic IP) 弹性公网IP提供独立的公网IP资源,包括公网IP地址与公网出口带宽服务。Kafka实例绑定弹性公网IP后,可以通过公网访问Kafka实例。
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        与其他云服务的关系
      • 计费类常见问题
        基准带宽和磁盘可选择哪些? 资源、规格等信息可参考产品规格文档。 哪些资源池可以订购分布式消息服务Kafka? 天翼云分布式消息服务Kafka可以在以下资源池订购: (1)华南2、华东1、华北2、西南1、上海36、青岛20、长沙42、南昌5、南宁23、武汉41、太原4 (2)芜湖2、福州4、重庆2 、北京5 、佛山3、贵州3、杭州2、内蒙6、上海7、上海6、西安3、昆明2、保定、北京4、合肥2 、乌鲁木齐27 账户里面有钱,为什么无法创建按需付费Kafka实例? 请检查您的账户金额是否小于100元,如果小于100元,则无法创建新的Kafka实例。 (1)预存后付费方式:提前充值现金到天翼云账户中,现金账户余额不低于100元,之后系统按照用户实际使用量进行结算。 (2)计费周期:按小时计费,以自然小时为计费单位(均以北京时间为准),不满一小时按照一小时计费。费用从用户账户现金余额中扣费。开通时间建议整点开通,开通不足一个自然小时,按一小时收费。提前删除也按照自然小时收费。 欠费后如何重新启用? 如果您在15天内充值补足欠款,服务会自动启用。 欠费后,资源进入保留期,您将不能正常访问及使用云服务(资源冻结),但对于您存储在云服务中的数据予以保留。 若您在保留期内充值,充值后系统会自动扣减欠费金额。 若保留期到期您仍未充值,存储在云服务中的数据将被删除、云服务资源将被释放。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        计费与购买类
        计费类常见问题
      • 计费模式
        变更计费模式 天翼云分布式消息服务Kafka支持计费模式变更,具体操作请参考计费互转。
        来自:
      • 将MySQL同步到Kafka
        参数 描述 同步Topic策略 同步Topic策略,可选择 集中投递到一个Topic:适合源库业务量不大的场景。 自动生成Topic名字:适合每张表数据量都较大的场景,按每一张库表来确定一个Topic。 Topic 选择目标端需要同步到的Topic,同步Topic策略选择集中投递到一个Topic时可见。 Topic名字格式 Topic名字格式,同步Topic策略选择自动生成Topic名字时可见。 Topic名字格式支持database和tablename两个变量,其他字符都当做常量。分别用$database$代替数据库名,$tablename$代替表名。 例如:配置成$database$$tablename$时,如果数据库名称为db1,表名为tab1,则Topic名字为db1tab1。如果是DDL语句,$tablename$为空,则Topic名字为db1。 由于kafka的机制,Topic名字格式不能以"" , "."开头,或以".internal","internal" 结尾,这些命名格式的Topic会被当做为kafka的内部Topic,业务无法使用。 同步到kafka partition策略 同步到kafka partition策略。 按库名+表名的hash值投递到不同Partition:适用于单表的查询场景,可以提高单表读写性能,推荐使用此选项。 全部投递到Partition 0:适用于有事务要求的场景,写入性能比较差,如果没有强事务要求,不推荐使用此选项。 按表的主键值hash值投递到不同的Partion:适用于一个表一个Topic的场景,避免该表都写到同一个分区,消费者可以并行从各分区获取数据。 投送到kafka的数据格式 选择MySQL投送到kafka的数据格式。 Avro:可以显示Avro二进制编码,高效获取数据。 JSON:为Json消息格式,方便解释格式,但需要占用更多的空间。 JSONC:一种能够兼容多个批量,流式计算框架的数据格式。 详细格式可参考 Kafka消息格式。 同步对象 同步对象支持表级同步、库级同步,您可以根据业务场景选择对应的数据进行同步。 选择对象的时候支持搜索,以便您快速选择需要的数据库对象。 在同步对象右侧已选对象框中,可以使用对象名映射功能进行源数据库和目标数据库中的同步对象映射,具体操作可参考对象名映射。
        来自:
        帮助文档
        数据库复制
        用户指南
        实时同步
        自建到自建
        将MySQL同步到Kafka
      • 鉴权接入超时问题
        本节介绍分布式消息服务Kafka使用安全接入点,公网接入点等需要鉴权的接入点时,可能会遇到连接超时的问题。 使用安全接入点,公网接入点等需要鉴权的接入点时,可能会遇到连接超时等问题。 解决方法为可以在客户端机器的host文件上配置Kafka的ip。 例如用户连接Kafka的公网接入点34.28.112.101:8094,34.28.112.102:8094,34.28.112.103:8094时,出现连接超时的报错,可以在客户端机器上的/etc/hosts文件里面加上如下配置: plaintext 34.28.112.101 34.28.112.101 34.28.112.102 34.28.112.102 34.28.112.103 34.28.112.103 然后再重启客户端服务,看是否能正常生产消费。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        连接问题
        鉴权接入超时问题
      • 查询消息
        查询消息轨迹 1. 登录分布式消息服务RocketMQ控制台。 2. 单击RocketMQ实例的名称,进入实例详情页面。 3. 在左侧导航栏,单击“消息查询”,进入“消息查询”页面。 4. 选择以下任意一种方法,查询消息。 按Topic查询:“Topic”选择待查询消息的Topic名称,“队列”选择待查询消息的队列,“存储时间”选择待查询消息的时间段,单击“查询”。 按Message ID查询:“Topic”选择待查询消息所在的Topic名称,“消息ID”输入待查询消息的Message ID,单击“查询”。 按Message Key查询:“Topic”选择待查询消息所在的Topic名称,“消息ID”输入待查询消息的Message Key,单击“查询”。 5. 在待查询消息所在行,单击“消息轨迹”,查看消息的轨迹,确定是否生产/消费成功。 消息轨迹的参数说明如表1所示。 表1 消息轨迹的参数说明 参数 参数说明 生产者状态 生产者状态如下: 1.发送成功:消息发送成功,服务端已经成功存储消息。 2.提交成功:允许消费者消费此事务消息。 3.回滚:事务消息将被丢弃,不允许消费者消费此事务消息。 4.未知,待确认:事务消息状态暂时无法确定,等待固定时间后,服务端向生产者进行消息回查。 生产耗时 生产者发送消息的耗时。 生产地址 生产者的IP地址和端口号。 消费者状态 消费者状态如下: 1.消费成功消费超时消费异常 2.消费返回NULL消费失败 消费时间 消费消息的时间。 消费耗时 消费者消费消息的耗时。 消费地址 消费者的IP地址和端口号。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        用户指南
        消息查询
        查询消息
      • 1
      • ...
      • 10
      • 11
      • 12
      • 13
      • 14
      • ...
      • 262
      跳转至
      推荐热词
      天翼云运维管理审计系统天翼云云服务平台云服务备份云日志服务应用运维管理云手机云电脑天翼云云hbase数据库电信云大数据saas服务电信云大数据paas服务轻量型云主机天翼云客户服务电话应用编排服务天翼云云安全解决方案云服务总线CSB天翼云服务器配置天翼云联邦学习产品天翼云云安全天翼云企业上云解决方案天翼云产品天翼云视频云存储

      天翼云最新活动

      云聚517 · 好价翼起拼

      爆款云主机低至25.83元/年,参与拼团享更多优惠,拼成得额外优惠券

      安全隔离版OpenClaw

      OpenClaw云服务器专属“龙虾“套餐低至1.5折起

      聚力AI赋能 天翼云大模型专项

      大模型特惠专区·Token Plan 轻享包低至9.9元起

      青云志云端助力计划

      一站式科研助手,海外资源安全访问平台,助力青年翼展宏图,平步青云

      企业出海解决方案

      助力您的业务扬帆出海,通达全球!

      天翼云信创专区

      “一云多芯、一云多态”,国产化软件全面适配,国产操作系统及硬件芯片支持丰富

      中小企业服务商合作专区

      国家云助力中小企业腾飞,高额上云补贴重磅上线

      云上钜惠

      爆款云主机全场特惠,2核4G只要1.8折起!

      产品推荐

      弹性云主机 ECS

      物理机 DPS

      GPU云主机

      镜像服务 IMS

      轻量型云主机

      天翼云CTyunOS系统

      AI Store

      Token服务

      科研助手

      推荐文档

      迁移准备

      代金券查询与使用

      应用场景

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 息壤智算平台
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 天翼云国际站
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2026 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号