云主机开年特惠28.8元/年,0元秒杀等你来抢!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 安全隔离版OpenClaw NEW OpenClaw云服务器专属“龙虾“套餐低至1.5折起
  • 天翼云新春焕新季 NEW 云主机开年特惠28.8元/年,0元秒杀等你来抢!
  • 云上钜惠 爆款云主机全场特惠,2核4G只要1.8折起!
  • 中小企业服务商合作专区 国家云助力中小企业腾飞,高额上云补贴重磅上线
  • 出海产品促销专区 NEW 爆款云主机低至2折,高性价比,不限新老速来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

息壤智算

领先开放的智算服务平台,提供算力、平台、数据、模型、应用“五位一体”智算服务体系,构建全流程的AI基础设施能力
AI Store
  • 算力市场
  • 模型市场
  • 应用市场
  • MCP市场
公共算力服务
  • 裸金属
  • 定制裸金属
训推服务
  • 模型开发
  • 训练任务
  • 服务部署
模型推理服务
  • 模型广场
  • 体验中心
  • 服务接入
应用托管
  • 应用实例
科研助手
  • 科研智能体
  • 科研服务
  • 开发机
  • 并行计算
大模型
  • DeepSeek-V3.1
  • DeepSeek-R1-0528
  • DeepSeek-V3-0324
  • Qwen3-235B-A22B
  • Qwen3-32B
智算一体机
  • 智算一体机
模型适配专家服务
  • 模型适配专家服务
算力服务商
  • 入驻算力服务商

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场进入AI Store创新解决方案公有云生态专区智云上海应用生态专区
建站工具
  • 新域名服务
  • SSL证书
  • 翼建站
企业办公
  • 安全邮箱
  • WPS 365 天翼云版
  • 天翼企业云盘(标准服务版)
灾备迁移
  • 云管家2.0
  • 翼备份(SaaS版)

定价

协助您快速了解云产品计费模式、价格详情,轻松预估上云成本
价格计算器
  • 动态测算产品价格
定价策略
  • 快速了解计费模式

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼信创云专区
  • 信创云专区
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
培训与认证
  • 天翼云学堂
  • 天翼云认证
开源社区
  • 魔乐社区
  • OpenTeleDB

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 服务保障
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家
我要反馈
  • 建议与反馈
  • 用户体验官
信息公告
  • 客户公告

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 息壤智算
  • 产品
  • 解决方案
  • 应用商城
  • 定价
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      消息队列Kafka版_相关内容
      • 磁盘水位处理
        操作步骤 磁盘自动扩容 1. 登录管理控制台。 2. 进入Kafka管理控制台。 3. 在实例列表页的操作列,目标实例行点击“管理”按钮。 4. 点击“智能运维”、“弹性伸缩”菜单进入磁盘水位处理页面。 5. 在磁盘水位处理页面开启磁盘自动扩容。 6. 点击“配置”按钮,可调整配置参数。 动态策略:当磁盘使用量达到指定比例后,系统自动扩容磁盘指定比例的容量,单节点最低扩容100GB。 最高磁盘:实例磁盘总容量扩容到指定值后不再扩容。 7. 点击“确定”按钮保存配置。 8. 点击调整记录列的“查看”按钮跳转至任务列表查看磁盘自动扩容调整记录。 动态消息保留策略 1. 登录管理控制台。 2. 进入Kafka管理控制台。 3. 在实例列表页的操作列,目标实例行点击“管理”按钮。 4. 点击“智能运维”、“弹性伸缩”菜单进入磁盘水位处理页面。 5. 在磁盘水位处理页面开启动态消息保留策略。 6. 点击“配置”按钮,可调整配置参数。 动态策略:当磁盘使用量达到指定比例后,系统自动调整主题的消息保留时长,删除指定比例的最早消息数据。 保底时长:主题消息的最低保留时长,当某个主题的保留时长调整到该值后,该主题不会再被调整。 7. 点击“确定”按钮保存配置。 8. 点击调整记录列的“查看”按钮跳转至任务列表查看动态消息保留记录。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        智能运维
        弹性伸缩
        磁盘水位处理
      • 事件源概述
        事件源是事件的来源,负责将生产的事件发布到事件总线。 事件总线EventBridge支持以下事件源: 天翼云官方事件源 作为事件源接入时,只要开通相应的天翼云服务,就可以自动接入事件总线EventBridge。通过配置预定义的事件源与事件规则,实现从事件源发布事件到云服务专用总线,经过事件模式过滤后把事件路由到事件目标。 自定义事件源 作为事件源接入时,您可以配置自定义应用事件源,使用SDK接入事件总线EventBridge。通过创建自定义总线、配置事件规则,把自定义应用产生的事件发布到自定义总线,经过事件模式过滤后把事件路由到事件目标。 作为事件源接入时,您可以配置消息类事件提供方,把事件主动拉取到事件总线EventBridge。例如,当您配置事件提供方为分布式消息服务Kafka时,事件总线EventBridge会把分布式消息服务Kafka对应资源产生的消息主动拉取到自定义总线中,经过事件模式过滤后把事件路由到事件目标。
        来自:
        帮助文档
        事件总线
        用户指南
        事件总线
        事件源
        事件源概述
      • 业务过载最佳实践
        本节介绍Kafka业务过载最佳实践 方案概述 Kafka业务过载,一般表现为CPU使用率高、磁盘写满的现象。 当CPU使用率过高时,系统的运行速度会降低,并有加速硬件损坏的风险。 当磁盘写满时,相应磁盘上的Kafka日志目录会出现offline问题。此时,该磁盘上的分区副本不可读写,降低了分区的可用性和容错能力。同时由于Leader分区迁移到其他节点,会增加其他节点的负载。 CPU使用率高的原因 数据操作相关线程数(num.io.threads、num.network.threads、num.replica.fetchers)过多,导致CPU繁忙。 分区设置不合理,所有的生产和消费都集中在某个节点上,导致CPU利用率高。 磁盘写满的原因 业务数据增长较快,已有的磁盘空间不能满足业务数据需要。 节点内磁盘使用率不均衡,生产的消息集中在某个分区,导致分区所在的磁盘写满。 Topic的数据老化时间设置过大,保存了过多的历史数据,容易导致磁盘写满。 实施步骤 CPU使用率高的处理措施: 优化线程参数num.io.threads、num.network.threads和num.replica.fetchers的配置。 num.io.threads和num.network.threads建议配置为磁盘个数的倍数,但不能超过CPU核数。 num.replica.fetchers建议配置不超过5。 合理设置Topic的分区,分区一般设置为节点个数的倍数。 生产消息时,给消息Key加随机后缀,使消息均衡分布到不同分区上。 磁盘写满的处理措施: 扩容磁盘,使磁盘具备更大的存储空间。 迁移分区,将已写满的磁盘中的分区迁移到本节点的其他磁盘中。 合理设置Topic的数据老化时间,减少历史数据的容量大小。 在CPU资源情况可控的情况下,使用压缩算法对数据进行压缩。 常用的压缩算法包括:ZIP,GZIP,SNAPPY,LZ4等。选择压缩算法时,需考虑数据的压缩率和压缩耗时。通常压缩率越高的算法,压缩耗时也越高。对于性能要求高的系统,可以选择压缩速度快的算法,比如LZ4;对于需要更高压缩比的系统,可以选择压缩率高的算法,比如GZIP。 可以在生产者端配置“compression.type”参数来启用指定类型的压缩算法。 Properties props new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 开启GZIP压缩 props.put("compression.type", "gzip"); Producer producer new KafkaProducer<>(props);
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        业务过载最佳实践
      • 快速创建Kafka流式集群
        本章节主要介绍如何快速创建Kafka流式集群。 本章节为您介绍如何快速创建一个Kafka流式集群,Kafka集群使用Kafka和Storm组件提供一个开源高吞吐量,可扩展性的消息系统。广泛用于日志收集、监控数据聚合等场景,实现高效的流式数据采集,实时数据处理存储等。 快速创建Kafka流式集群 1.登录MRS管理控制台。 2.单击“创建集群”,进入“创建集群”页面。 3.在创建集群页面,选择“快速创建”页签。 4.参考下列参数说明配置集群基本信息,参数详细信息请参考创建自定义集群。 区域:默认即可。 集群名称:可以设置为系统默认名称,但为了区分和记忆,建议带上项目拼音缩写或者日期等。例如:“mrs20200321”。 版本类型:默认选择普通版(不同版本提供的组件有所不同,请根据需要选择版本类型)。 集群版本:不同版本集群提供的组件有所不同,请根据需要选择集群版本。 组件选择:选择“Kafka流式集群”。 可用区:默认即可。 虚拟私有云:默认即可。如果没有虚拟私有云,请单击“查看虚拟私有云”进入虚拟私有云,创建一个新的虚拟私有云。 子网:默认即可。 CPU架构:默认即可。 集群节点:请根据自身需要选择集群节点规格数量等。MRS 3.x及之后版本集群Master节点规格不能小于64GB。 集群高可用:默认即可。MRS 3.x版本暂时不支持该参数。 LVM:默认即可。MRS 3.x版本暂时不支持该参数。 用户名:默认为“root/admin”,root用于远程登录ECS机器,admin用于登录集群管理页面。 密码:设置root用户和admin用户密码。 确认密码:再次输入设置的root用户和admin用户密码。 5.勾选“确认授权”开通通信安全授权,通信安全授权详情请参考授权安全通信。 6.单击“立即申请”。 当集群开启Kerberos认证时,需要确认是否需要开启Kerberos认证,若确认开启请单击“继续”,若无需开启Kerberos认证请单击“返回”关闭Kerberos认证后再创建集群。 7.单击“返回集群列表”,可以查看到集群创建的状态。单击“访问集群”,可以查看集群详情。 集群创建的状态过程请参见 集群概览章节查看集群状态部分 集群列表参数中的“状态”参数说明。 集群创建需要时间,所创集群的初始状态为“启动中”,创建成功后状态更新为“运行中”,请您耐心等待。 MRS系统界面支持同一时间并发创建10个集群,且最多支持管理100个集群。
        来自:
        帮助文档
        翼MapReduce
        用户指南
        配置集群
        快速创建集群
        快速创建Kafka流式集群
      • 获取Topic列表信息
        参数 参数类型 说明 示例 下级对象 topicName String 主题名称 topic001 readQueueNums Integer 读队列数量 4 writeQueueNums Integer 写队列数量 4 perm Integer 读写权限 2 6 topicFilterType String 主题过滤标志 topicSysFlag Integer 主题系统标识 0 表示非系统主题 1 系统内部主题 0 order Boolean 是否为顺序消息 false remark String 备注 clusterName String 集群名 brokerName String 集群名称 brokerId String 代理id 0 messageType String 消息类型 NORMAL普通消息 NORMAL
        来自:
        帮助文档
        分布式消息服务RocketMQ
        API参考
        API
        2022-04-06
        Topic管理
        获取Topic列表信息
      • 分布式消息服务Kafka服务等级协议(新)
        自2021年11月11日起,新版分布式消息服务Kafka服务等级协议(SLA)生效。详情请参见这里。
        来自:
        帮助文档
        分布式消息服务Kafka
        相关协议
        分布式消息服务Kafka服务等级协议(新)
      • RocketMQ .NET SDK
        说明 分布式消息服务RocketMQ兼容了社区版 HTTP SDK,您可以使用社区版 HTTP SDK接入分布式消息服务RocketMQ。 前提条件 1. 下载社区 C SDK到本地并解压。 2. 使用Visual Studio打开sln文件导入工程。 发送普通消息 using System; using Aliyun.MQ; using Aliyun.MQ.Model; namespace MQ.Sample { public class Producer { // 填写分布式消息服务RocketMQ控制台HTTP接入点 private const string endpoint "${HTTPENDPOINT}"; // 填写AccessKey,在管理控制台创建 private const string accessKeyId "${ACCESSKEY}"; // 填写SecretKey 在管理控制台创建 private const string secretAccessKey "${SECRETKEY}"; // 消息所属的Topic,在消息队列RocketMQ版控制台创建。 private const string topicName "${TOPIC}"; // Topic所属实例ID,默认实例为空 private const string instanceId "${INSTANCEID}"; private static MQClient client new Aliyun.MQ.MQClient(accessKeyId, secretAccessKey, endpoint); static MQProducer producer client.GetProducer(instanceId, topicName); static void Main(string[] args) { try { // 循环发送4条消息。 for (int i 0; i messages null; try { messages consumer.ConsumeMessage( 3, // 一次最多消费3条(最多可设置为16条) 3 // 长轮询时间3秒(最多可设置为30秒) ); } catch (Exception exp1) { if (exp1 is MessageNotExistException) { Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId); continue; } Console.WriteLine(exp1); Thread.Sleep(2000); } if (messages null) { continue; } List handlers new List<>(); Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:"); // 处理业务逻辑 foreach (Message message in messages) { Console.WriteLine(message); handlers.Add(message.ReceiptHandle); } // Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样 try { consumer.AckMessage(handlers); Console.WriteLine("Ack message success:"); foreach (string handle in handlers) { Console.Write("t" + handle); } Console.WriteLine(); } catch (Exception exp2) { // 某些消息的句柄可能超时了会导致确认不成功 if (exp2 is AckMessageException) { AckMessageException ackExp (AckMessageException)exp2; Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId); foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems) { Console.WriteLine("tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage); } } } } catch (Exception ex) { Console.WriteLine(ex); Thread.Sleep(2000); } } } }}
        来自:
        帮助文档
        分布式消息服务RocketMQ
        SDK参考
        RocketMQ .NET SDK
      • 编译运行Demo Java工程
        介绍连接Kafka编译运行Demo Java工程 kafkaclients引入依赖 在使用Kafka时,你需要在你的项目中引入相应的依赖。具体的依赖项可能会因你的项目和需求而有所不同。在使用Kafka之前,请确保查阅官方文档以获取最新的依赖项和使用说明。 以Java编程语言为例,可以使用Kafka的Java客户端库。你可以在Maven或Gradle构建工具中添加以下依赖项: org.apache.kafka kafkaclients 示例代码 1. 从控制台获取以下信息 连接地址 实例连接地址从控制台实例详情菜单处获取,在实例详情页面的接入点信息一栏。 Topic名称 在Topic管理页面,选择需要的Topic名称。 消费组名称 在消费组管理页面,选择需要的消费组名称。 2. 在实例代码中替换以上信息即可实现消息。 import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { private final KafkaProducer producer; public final static String TOPIC "testtopic"; public final static String BROKERADDR "192.168.0.11:8090,192.168.0.9:8090,192.168.0.10:8090"; public Producer() { Properties props new Properties(); props.put(ProducerConfig.BOOTSTRAPSERVERSCONFIG, BROKERADDR); props.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKSCONFIG, "all"); props.put("retries",3); producer new KafkaProducer<>(props); } public void produce() { try { for (int i 0; i (TOPIC, data), new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception ! null) { // TODO: 异常处理 exception.printStackTrace(); return; } System.out.println("produce msg completed, partition id " + metadata.partition()); } }); } } catch (Exception e) { // TODO: 异常处理 e.printStackTrace(); } producer.flush(); producer.close(); } public static void main(String[] args) { Producer producer new Producer(); producer.produce(); } } 3. 同样在实例代码中替换以上信息即可消费消息。 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class Consumer { private org.apache.kafka.clients.consumer.Consumer consumer; private static final String GROUPID "testgroup"; private static final String TOPIC "testtopic"; public final static String BROKERADDR "192.168.0.11:8090,192.168.0.9:8090,192.168.0.10:8090"; public Consumer() { Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAPSERVERSCONFIG, BROKERADDR); props.put(ConsumerConfig.GROUPIDCONFIG, GROUPID); props.put(ConsumerConfig.AUTOOFFSETRESETCONFIG, "earliest"); props.put(ConsumerConfig.ENABLEAUTOCOMMITCONFIG, "false"); props.put(ConsumerConfig.KEYDESERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUEDESERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer new KafkaConsumer<>(props); } public void consume() { consumer.subscribe(Arrays.asList(TOPIC)); while (true){ try { ConsumerRecords records consumer.poll(1000); System.out.println("the numbers of topic:" + records.count()); for (ConsumerRecord record : records) { System.out.println("the data is " + record.value()); } }catch (Exception e){ // TODO: 异常处理 e.printStackTrace(); } } } public static void main(String[] args) { new Consumer().consume(); } }
        来自:
        帮助文档
        分布式消息服务Kafka
        快速入门
        编译运行Demo Java工程
      • 消息查询
        根据ID查询 根据消息ID查询唯一消息,选择消费组后,可以查询到,该消息是否被该消费组消费过,查看“消费状态”。 点击“查看”,可以查询该消息的包体内容。 消费状态标志含义: (1)Toconsume:未消费。 (2)Consumed:已签收。 (3)Consuming:已拉取,未签收。 根据offset查询 根据指定队列指定偏移量查询唯一消息,选择订阅组后,可以查询到,该消息是否被该订阅组消费过,查看“消费状态”。 点击“查看”,可以查询该消息的包体内容。 基于Topic查询 死信队列查询
        来自:
        帮助文档
        分布式消息服务RocketMQ
        用户指南
        管理消息
        消息查询
      • 计费类常见问题
        基准带宽和磁盘可选择哪些? 资源、规格等信息可参考产品规格文档。 哪些资源池可以订购分布式消息服务Kafka? 天翼云分布式消息服务Kafka可以在以下资源池订购: (1)华南2、华东1、华北2、西南1、上海36、青岛20、长沙42、南昌5、南宁23、武汉41、太原4 (2)芜湖2、福州4、重庆2 、北京5 、佛山3、贵州3、杭州2、内蒙6、上海7、上海6、西安3、昆明2、保定、北京4、合肥2 、乌鲁木齐27 账户里面有钱,为什么无法创建按需付费Kafka实例? 请检查您的账户金额是否小于100元,如果小于100元,则无法创建新的Kafka实例。 (1)预存后付费方式:提前充值现金到天翼云账户中,现金账户余额不低于100元,之后系统按照用户实际使用量进行结算。 (2)计费周期:按小时计费,以自然小时为计费单位(均以北京时间为准),不满一小时按照一小时计费。费用从用户账户现金余额中扣费。开通时间建议整点开通,开通不足一个自然小时,按一小时收费。提前删除也按照自然小时收费。 欠费后如何重新启用? 如果您在15天内充值补足欠款,服务会自动启用。 欠费后,资源进入保留期,您将不能正常访问及使用云服务(资源冻结),但对于您存储在云服务中的数据予以保留。 若您在保留期内充值,充值后系统会自动扣减欠费金额。 若保留期到期您仍未充值,存储在云服务中的数据将被删除、云服务资源将被释放。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        计费与购买类
        计费类常见问题
      • 队列管理概述
        专属队列 专属队列是指队列对应的资源为专属资源,空闲时不释放,即无论是否使用均保留资源的队列类型。专属队列可以保证提交作业时资源一定存在。 队列弹性扩缩容 DLI提供了队列弹性扩缩容的功能。用户在创建指定规格队列后,可根据需要进行弹性扩缩容。 根据业务情况,手动更改队列规格。具体操作请参考队列弹性扩缩容。 说明 新创建的队列需要运行作业后才可进行扩缩容。 队列弹性扩缩容定时任务 DLI提供了队列弹性扩缩容定时任务的功能。用户在创建队列后,可根据需要进行弹性扩缩容定时任务。 根据业务情况,设置队列自动扩缩容的时间,由系统定时触发队列扩缩容。具体操作请参考弹性扩缩容定时任务。 说明 新创建的队列需要运行作业后才可进行扩缩容。 队列自动扩缩容 Flink作业使用队列,DLI可根据作业大小自动触发扩缩容,用户无需进行操作。 说明 新创建的队列需要运行作业后才可进行扩缩容。 队列管理页面 队列管理主要包括如下功能: 队列权限管理 创建队列 删除队列 修改队列网段 队列弹性扩缩容 弹性扩缩容定时任务 测试地址连通性 创建消息通知主题 说明 DLI作业执行失败需要通过SMN发送通知消息,因此需要获得访问和使用SMN(消息通知服务)的SMN Administrator权限。 队列管理页面显示用户创建所有的队列和服务预置的default队列。队列列表默认按创建时间排列,创建时间最近的队列显示在最前端。 队列管理参数 参数 参数说明 名称 队列的名称。 类型 队列的类型。 SQL队列 通用队列 Spark队列(兼容老版本) 规格 队列大小,单位:CUs。 CUs是队列的计价单位。1CUs1Core 4GMem。不同规格的队列对应的计算能力不一样,规格越高计算能力越好。 实际CUs 当前队列实际大小值。 弹性扩缩容 定时扩缩容的目标CU值,或当前规格CU值的最大值和最小值。 用户名 队列所有者。 描述 创建队列时,对队列的描述。如果无描述,则显示“”。 操作 删除:删除所选队列。如果队列中有正在提交或者正在运行的作业,将不支持删除操作。 权限管理:查看队列对应的用户权限信息以及对其他用户授权。 更多 − 重启:强制重启队列。 说明 只有SQL队列有“重启”操作。 − 弹性扩缩容:可以根据需要选择“扩容”或“缩容”,目标值大小必须为16CU的整数倍。 − 弹性扩缩容定时任务:可以根据业务周期或使用情况,在不同的时间或周期内设置不同的队列大小,系统将定时自动进行“扩容”或“缩容”。目标值大小必须为16CU的整数倍。 − 修改网段:使用DLI增强型跨源时,DLI队列网段与数据源网段不能重合,可根据需要进行修改。 − 测试地址连通性:测试队列到指定地址是否可达,支持域名和ip,可指定端口。
        来自:
        帮助文档
        数据湖探索
        用户指南
        队列管理
        队列管理概述
      • 如何提高消息处理效率
        消息可以批量生产和消费 为提高消息发送和消息消费效率,推荐使用批量消息发送和消费。通常,默认消息消费为批量消费,而消息发送尽可能采用批量发送。同时批量方式可有效减少API调用次数,减少服务使用费用。 如下面两张示意图对比所示,消息批量生产与消费,可以减少API调用次数,节约资源。 图 消息批量生产(发送)与消费 说明 批量发送消息时,单次不能超过10条消息,总大小不能超过512KB。 批量生产(发送)消息可以灵活使用,在消息并发多的时候,批量发送,并发少时,单条发送。这样能够在减少调用次数的同时保证消息发送的实时性。 图 消息逐条生产(发送)与消费 此外,批量消费消息时,消费者应按照接收的顺序对消息进行处理、确认,当对某一条消息处理失败时,不再需要继续处理本批消息中的后续消息,直接对已正确处理的消息进行确认即可。 巧用消费组协助运维 用户使用DMS服务作为消息管理系统,查看队列的消息内容对于定位问题与调试服务是至关重要的。 当消息的生产和消费过程中遇到疑难问题时,通过创建不同消费组可以帮助定位分析问题或调试服务对接。用户可以创建一个新的消费组,对队列中的消息进行消费并分析消费过程,这样不会影响其他服务对消息的处理。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        如何提高消息处理效率
      • 消费组列表
        介绍分布式消息服务Kafka消费组查看功能操作内容。 场景描述 在以下场景中,可以考虑查看Kafka的消费组列表: 监控消费组:通过查看消费组列表,可以监控和管理Kafka中的消费者。可以查看消费组的健康状态等信息,以确保消费者正常工作并及时发现潜在的问题。 动态调整消费者数量:通过查看消费组列表,可以了解当前的消费者数量和状态。根据实际需求,可以动态增加或减少消费者的数量,以适应不同的负载和流量变化。 操作步骤 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理” (4)点击“消费组管理”后出现如下图列表 (5)右上角输入消费组名称,可查询对应消费组 消费组状态说明见下表: 状态 说明 Empty 消费组没有分配到任何分区进行消费。 PreparingRebalance 消费组中有新的消费者加入或者有消费者离开,正在进行重新分配分区的准备工作。 CompletingRebalance 1. 重新分配分区的过程已经完成,消费组即将开始消费。 Stable 消费组中的消费者正常消费消息,并且消费进度与分区分配是一致的。 Dead 消费组已经停止工作,没有任何活跃的消费者。 PreparingSync 消费组正在准备同步消费进度,以确保消费者之间的一致性。 AwaitingSync 消费组中的消费者正在等待同步消费进度的完成。 Rebalancing 消费组正在进行重新分配分区的操作。 DeadAndEmpty 消费组已经停止工作,且没有分配到任何分区进行消费。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        消费组管理
        消费组列表
      • 产品定义
        本文主要介绍天翼云分布式消息服务RabbitMQ的产品定义。 天翼云分布式消息服务RabbitMQ完全兼容开源RabbitMQ,为您提供即开即用、消息特性丰富、灵活路由、高可用、监控和告警等特性,广泛应用于秒杀、流控、系统解耦等场景。 即开即用 天翼云分布式消息服务RabbitMQ提供单机和集群的消息实例,拥有丰富内存规格,您可以通过控制台直接下单购买并创建,无需单独准备服务器资源。 消息特性丰富 支持AMQP协议,支持普通消息、广播消息、死信、延迟消息等特性。 灵活路由 在RabbitMQ中,生产者将消息发送到交换器,由交换器将消息路由到队列中。交换器支持Direct、Topic、Headers和Fanout四种路由方式,同时支持交换机组合和自定义。 高可用 RabbitMQ集群提供镜像队列,可通过镜像在其他节点同步数据,单节点宕机时,仍可通过唯一的访问地址对外提供服务,数据不丢失。 监控和告警 支持对RabbitMQ实例状态进行监控,支持对集群每个代理的内存、CPU、网络流量等进行监控。如果集群或节点状态异常,将触发告警。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        产品简介
        产品定义
      • 步骤四:配置必须的监控告警
        本文主要介绍如何配置分布式消息服务RabbitMQ必须的监控告警。 本章节主要介绍部分监控指标的告警策略,以及配置操作。在实际业务中,建议按照表1告警策略,配置监控指标的告警规则。 表1 RabbitMQ实例配置告警的指标 指标名称 告警策略 指标说明 解决方案 内存高水位状态 告警阈值:原始值>1 连续触发次数:1 告警级别:致命 告警阈值为1表示触发内存高水位,会阻塞消息生产 加快消费 采用生产者确认的发送模式,并监控生产端消息生产速度和时长,当消息生产时长有明显增加时进行流控措施 磁盘高水位状态 告警阈值:原始值>1 连续触发次数:1 告警级别:致命 告警阈值为1表示触发磁盘高水位,会阻塞消息生产 减少惰性队列的消息堆积 减少持久化队列的消息堆积 删除队列 内存使用率 告警阈值:原始值>业务预期使用率(推荐30%) 连续触发次数:连续3~5个周期 告警级别:重要 该指标需要分别为每个节点设置内存使用率告警,避免触发内存高水位阻塞生产 加快消费 采用生产者确认的发送模式,并监控生产端消息生产速度和时长,当消息生产时长有明显增加时进行流控措施 CPU使用率 告警阈值:原始值>业务预期使用率(推荐70%) 连续触发次数:连续3~5个周期 告警级别:重要 该指标需要分别为每个节点设置CPU使用率告警,CPU使用率过高可能会影响生产速度 减少镜像队列个数 对于集群实例,建议扩容节点个数,然后进行节点间重平衡 可消费消息数 告警阈值:原始值>业务预期可消费消息数 连续触发次数:1 告警级别:重要 可消费消息数过多表示消息堆积 请参考消息堆积的解决办法 未确认消息数 告警阈值:原始值>业务预期未确认消息数 连续触发次数:1 告警级别:重要 未确认消息数过多可能会导致消息堆积 检查消费者是否异常 检查消费者逻辑是否消耗时间过长 连接数 告警阈值:原始值>业务预期连接数 连续触发次数:1 告警级别:重要 连接数突增可能是流量变大的预警 需检查业务是否正常,可参考其他告警 通道数 告警阈值:原始值>业务预期通道数 连续触发次数:1 告警级别:重要 通道数突增可能是流量变大的预警 需检查业务是否正常,可参考其他告警 Erlang进程数 告警阈值:原始值>业务预期进程数 连续触发次数:1 告警级别:重要 进程数突增可能是流量变大的预警 需检查业务是否正常,可参考其他告警
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        快速入门
        步骤四:配置必须的监控告警
      • 修改Topic参数
        本章节介绍了如何修改分布式消息服务RocketMQ实例的Topic。 操作场景 Topic创建成功后,您可以根据业务需要修改以下参数:读队列个数、写队列个数、权限和添加关联代理。 操作步骤 1. 登录分布式消息服务RocketMQ控制台。 2. 单击RocketMQ实例的名称,进入实例详情页面。 3. 在左侧导航栏,单击“Topic管理”,进入“Topic管理”页面。 4. 选择以下任意一种方法修改Topic参数。 在待修改的Topic所在行,单击“编辑”,弹出“编辑Topic”页面。 单击待修改的Topic名称,进入Topic详情页面。在页面右上角,单击“编辑”,弹出“编辑Topic”页面。 5. 修改如表1所示配置信息。 表 Topic参数说明 参数 说明 权限 Topic的权限,包括发布+订阅、发布和订阅。 关联代理 修改读队列个数或写队列个数。 lic读数读队列个数:Top据可用的总队列数。 写队列个数:Topic写数据可用的总队列数。 如果创建Topic时未关联所有代理,单击“添加关联代理”,可以在其他代理上创建Topic,并设置读队列个数和写队列个数。 6. 修改完成后,单击“确定”。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        用户指南
        Topic管理
        修改Topic参数
      • 名词解释
        本节介绍了分布式消息服务RabbitMQ产品常见的名词解释。 Vhost 虚拟主机(Virtual Host),类似于 Namespace 命名空间的概念,逻辑隔离,每个用户里可以创建多个 Vhost,每个 Vhost 可以创建若干个 Exchange 和 Queue。 Queue 消息队列,每个消息都会被投入到一个或者多个 Queue 里。 Producer 消息生产者,即投递消息的程序。 Consumer 消息消费者,即接受消息的程序。 Connection TCP 连接,Producer 或 Consumer 与消息队列间的物理 TCP 连接。 Connection将应用与分布式消息服务RabbitMQ连接在一起。Connection会执行认证、IP解析、路由等底层网络任务。应用与分布式消息服务RabbitMQ建立Connection需要多个TCP报文交互,因而会消耗较多的网络资源和分布式消息服务RabbitMQ资源。大量的Connection会对分布式消息服务RabbitMQ造成巨大压力,甚至触发分布式消息服务RabbitMQ SYN洪水攻击防护,导致分布式消息服务RabbitMQ无响应,进而影响业务。 Channel 在客户端的每个物理 TCP 连接里,可建立多个 Channel,每个 Channel 代表一个会话任务。 Channel是物理TCP连接中的虚拟连接。当应用通过Connection与分布式消息服务RabbitMQ建立连接后,所有的AMQP协议操作(例如创建队列、发送消息、接收消息等)都会通过Connection中的Channel完成。Channel可以复用Connection,即一个Connection下可以建立多个Channel。Channel不能脱离Connection独立存在,而必须存活在Connection中。当某个Connection断开时,该Connection下的所有Channel都会断开。当大量应用需要与分布式消息服务RabbitMQ建立多个连接时,建议您使用Channel来复用Connection,从而减少网络资源和分布式消息服务RabbitMQ资源消耗。 Connection和Channel的使用建议 保持Connection长连接,请勿频繁开启或关闭Connection。如果确实需要频繁开启或关闭连接,请使用Channel。 一个进程对应一个Connection,一个进程中的多个线程则分别对应一个Connection中的多个Channel。 Producer和Consumer分别使用不同的Connection进行消息发送和消费。
        来自:
        帮助文档
        分布式消息服务RabbitMQ
        产品简介
        名词解释
      • 云审计服务支持的DMS for Kafka操作列表
        本文主要介绍云审计服务支持的DMS for Kafka操作列表。 通过云审计服务,您可以记录与分布式消息服务Kafka相关的操作事件,便于日后的查询、审计和回溯。 表 云审计服务支持的DMS for Kafka操作列表 操作名称 资源类型 事件名称 创建DMS实例订单成功 kafka createDMSInstanceOrderSuccess 创建DMS实例任务执行成功 kafka createDMSInstanceTaskSuccess 创建DMS实例订单失败 kafka createDMSInstanceOrderFailure 创建DMS实例任务执行失败 kafka createDMSInstanceTaskFailure 删除创建失败的DMS实例成功 kafka deleteDMSCreateFailureInstancesSuccess 删除创建失败的DMS实例失败 kafka deleteDMSCreateFailureInstancesFailure 删除DMS实例任务执行成功 kafka deleteDMSInstanceTaskSuccess 删除DMS实例任务执行失败 kafka deleteDMSInstanceTaskFailure 批量删除DMS实例任务 kafka batchDeleteDMSInstanceTask 提交批量删除DMS实例请求成功 kafka batchDeleteDMSInstanceSuccess 批量删除DMS实例任务执行成功 kafka batchDeleteDMSInstanceTaskSuccess 提交批量删除DMS实例请求失败 kafka batchDeleteDMSInstanceFailure 批量删除DMS实例任务执行失败 kafka batchDeleteDMSInstanceTaskFailure 提交修改DMS实例订单请求成功 kafka modifyDMSInstanceOrderSuccess 提交修改DMS实例订单请求失败 kafka modifyDMSInstanceOrderFailure 提交扩容实例请求成功 kafka extendDMSInstanceSuccess 扩容DMS实例任务执行成功 kafka extendDMSInstanceTaskSuccess 提交扩容实例请求失败 kafka extendDMSInstanceFailure 扩容DMS实例任务执行失败 kafka extendDMSInstanceTaskFailure 提交重置DMS实例密码请求成功 kafka resetDMSInstancePasswordSuccess 提交重置DMS实例密码请求失败 kafka resetDMSInstancePasswordFailure 提交重启DMS实例请求成功 kafka restartDMSInstanceSuccess 重启DMS实例任务执行成功 kafka restartDMSInstanceTaskSuccess 提交重启DMS实例请求失败 kafka restartDMSInstanceFailure 重启DMS实例任务执行失败 kafka restartDMSInstanceTaskFailure 提交批量重启DMS实例请求成功 kafka batchRestartDMSInstanceSuccess 批量重启DMS实例任务执行成功 kafka batchRestartDMSInstanceTaskSuccess 提交批量重启DMS实例请求失败 kafka batchRestartDMSInstanceFailure 批量重启DMS实例任务执行失败 kafka batchRestartDMSInstanceTaskFailure 提交修改DMS实例信息请求成功 kafka modifyDMSInstanceInfoSuccess 修改DMS实例信息任务执行成功 kafka modifyDMSInstanceInfoTaskSuccess 提交修改DMS实例信息请求失败 kafka modifyDMSInstanceInfoFailure 修改DMS实例信息任务执行失败 kafka modifyDMSInstanceInfoTaskFailure 删除后台任务成功 kafka deleteDMSBackendJobSuccess 删除后台任务失败 kafka deleteDMSBackendJobFailure 冻结DMS实例任务执行成功 kafka freezeDMSInstanceTaskSuccess 冻结DMS实例任务执行失败 kafka freezeDMSInstanceTaskFailure 解冻DMS实例任务执行成功 kafka unfreezeDMSInstanceTaskSuccess 解冻DMS实例任务执行失败 kafka unfreezeDMSInstanceTaskFailure Kafka专享实例创建Topic成功 kafka KafkacreatetopicSuccess Kafka专享实例创建Topic失败 kafka KafkacreatetopicFailure Kafka专享实例删除Topic成功 kafka KafkadeletetopicsSuccess Kafka专享实例删除Topic失败 kafka KafkadeletetopicsFailure 开启自动创建Topic成功 kafka enableautotopicSuccess 开启自动创建Topic失败 kafka enableautotopicFailure 重置消费组偏移量成功 kafka KafkaresetconsumeroffsetSuccess 重置消费组偏移量失败 kafka KafkaresetconsumeroffsetFailure 创建用户成功 kafka createUserSuccess 创建用户失败 kafka createUserFailure 删除用户成功 kafka deleteUserSuccess 删除用户失败 kafka deleteUserFailure 更新用户策略成功 kafka updateUserPoliciesTaskSuccess 更新用户策略失败 kafka updateUserPoliciesTaskFailure
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        云审计服务支持的关键操作
        云审计服务支持的DMS for Kafka操作列表
      • 修改用户信息
        介绍分布式消息服务Kafka修改用户的功能操作内容。 场景描述 当前主要支持重置用户密码:当用户忘记密码或需要更改密码时,可以通过修改用户密码来实现。这有助于保护用户账户的安全性,防止未经授权的访问和操作。 操作步骤 (1)登录管理控制台。 (2)进入Kafka管理控制台。 (3)在实例列表页在操作列,目标实例行点击“管理”。 (4)点击“用户管理”后进入应用用户管理页面,点击“修改”。 (5)点击“修改”后,在弹窗中可以修改密码和对应描述。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        用户管理
        修改用户信息
      • 性能调优
        本章节主要介绍Flink作业相关问题中有关性能调优的问题。 Flink作业如何进行性能调优 概念说明及监控查看 消费组积压 消费组积压可通过topic最新数据offset减去该消费组已提交最大offset计算得出,说明的是该消费组当前待消费的数据总量。 如果Flink作业对接的是kafka专享版,则可通过云监控服务(CES)进行查看。具体可选择“云服务监控 > 分布式消息服务 > kafka专享版” ,单击“kafka实例名称 > 消费组” ,选择具体的消费组名称,查看消费组的指标信息。 反压状态 反压状态是通过周期性对taskManager线程的栈信息采样,计算被阻塞在请求输出Buffer的线程比率来确定,默认情况下,比率在0.1以下为OK,0.1到0.5为LOW,超过0.5则为HIGH。 时延 Source端会周期性地发送带当前时间戳的LatencyMarker,下游算子接收到该标记后,通过当前时间减去标记中带的时间戳的方式,计算时延指标。算子的反压状态和时延可以通过Flink UI或者作业任务列表查看,一般情况下反压和高时延成对出现: 性能分析 由于Flink的反压机制,流作业在存在性能问题的情况下,会导致数据源消费速率跟不上生产速率,从而引起Kafka消费组的积压。在这种情况下,可以通过算子的反压和时延,确定算子的性能瓶颈点。 作业最后一个算子(Sink)反压正常(绿色),前面算子反压高(红色) 该场景说明性能瓶颈点在sink,此时需要根据具体数据源具体优化,比如对于JDBC数据源,可以通过调整写出批次(connector.write.flush.maxrows)、JDBC参数重写(rewriteBatchedStatementstrue)等进行优化。 作业非倒数第二个算子反压高(红色) 该场景说明性能瓶颈点在Vertex2算子,可以通过查看该算子描述,确认该算子具体功能,以进行下一步优化。 所有算子反压都正常(绿色),但存在数据堆积 该场景说明性能瓶颈点在Source,主要是受数据读取速度影响,此时可以通过增加Kafka分区数并增加source并发解决。 作业一个算子反压高(红色),而其后续的多个并行算子都不存在反压(绿色) 该场景说明性能瓶颈在Vertex2或者Vertex3,为了进一步确定具体瓶颈点算子,可以在FlinkUI页面开启inPoolUsage监控。如果某个算子并发对应的inPoolUsage长时间为100%,则该算子大概率为性能瓶颈点,需分析该算子以进行下一步优化。 inPoolUsage监控
        来自:
        帮助文档
        数据湖探索
        常见问题
        操作类
        Flink作业相关问题
        性能调优
      • 安全方案
        介绍分布式消息服务RocketMQ的安全方案,包括支持TLS传输加密、权限控制、Topic资源访问权限控制等内容。 安全价值 RocketMQ的安全对用户有以下几个重要价值: 1. 保护数据安全:RocketMQ的安全机制可以保护消息的机密性和完整性,防止敏感数据泄露或被篡改。这对于处理包含个人信息、商业机密等敏感数据的应用程序非常重要。 2. 防止未经授权的访问:RocketMQ的访问控制功能可以限制对消息队列的访问权限,只有具有相应权限的用户才能发送和消费消息。这可以防止未经授权的用户访问和操作消息队列,保护系统的安全性。 3. 合规性要求:对于一些行业和法规要求较高的场景,如金融、医疗等,RocketMQ的安全特性可以帮助用户满足合规性要求,确保数据的安全和合规性。 4. 提供安全审计功能:RocketMQ的安全审计功能可以记录和追踪对消息队列的操作,包括发送、消费、订阅等。这可以帮助用户监控和检测潜在的安全风险,及时发现和应对安全事件。 5. 增强用户信任:通过提供安全性能和功能,RocketMQ可以增强用户对系统的信任感。用户可以放心地使用RocketMQ来处理重要的消息传输和处理任务,而不必担心数据的安全问题。 综上所述,RocketMQ的安全性对用户来说具有重要的价值,可以保护数据安全,防止未经授权的访问,满足合规性要求,提供安全审计功能,并增强用户对系统的信任感。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        产品简介
        安全方案
      • 名词解释
        名词 说明 环境 用于隔离不同应用的逻辑单元。 应用 一组资源的逻辑集合,通常代表一个业务系统。应用是进行演练和管理的核心对象。 资源 构成应用的组件节点,例如云主机、容器、分布式缓存服务Redis版、分布式消息服务Kafka等实例。 演练 通过向应用的特定资源注入指定故障,并观察其影响,从而验证系统稳定性与韧性的过程。 动作 注入到目标资源上的一个原子性故障,例如“CPU高负载”或“网络延迟”。用户可以在一次演练中对多个动作进行自由组合和编排。 动作组 一个或多个动作的逻辑分组,通常代表一个完整的故障场景。在一个演练任务中,不同的动作组之间可以并行执行。 探针 安装在目标资源(如云主机)上,用于执行具体故障注入动作的代理程序(Agent)。 保护策略 一种自动化的安全机制,用于控制演练的“爆炸半径”。当触发预设条件时,系统会依据此策略自动中止演练并回滚故障。
        来自:
        帮助文档
        应用高可用
        产品简介
        故障演练服务
        名词解释
      • 名词解释(1)
        名词 说明 环境 用于隔离不同应用的逻辑单元。 应用 一组资源的逻辑集合,通常代表一个业务系统。应用是进行演练和管理的核心对象。 资源 构成应用的组件节点,例如云主机、容器、分布式缓存服务Redis版、分布式消息服务Kafka等实例。 演练 通过向应用的特定资源注入指定故障,并观察其影响,从而验证系统稳定性与韧性的过程。 动作 注入到目标资源上的一个原子性故障,例如“CPU高负载”或“网络延迟”。用户可以在一次演练中对多个动作进行自由组合和编排。 动作组 一个或多个动作的逻辑分组,通常代表一个完整的故障场景。在一个演练任务中,不同的动作组之间可以并行执行。 探针 安装在目标资源(如云主机)上,用于执行具体故障注入动作的代理程序(Agent)。 保护策略 一种自动化的安全机制,用于控制演练的“爆炸半径”。当触发预设条件时,系统会依据此策略自动中止演练并回滚故障。
        来自:
      • Topic管理
        本章节介绍 Topic管理 。 操作场景 Topic创建成功后,查询Topic相关的配置和状态信息。 操作步骤 1. 登录分布式消息服务RocketMQ控制台。 2. 单击RocketMQ实例的名称,进入实例详情页面。 3. 在左侧导航栏,单击“Topic管理”,进入“Topic管理”页面。 4. 单击需要查询的Topic名称,进入Topic详情页面。 在详情页上方可以查看Topic名称、关联代理、读队列个数、写队列个数和权限。 在详情页下方可以查看Topic在每个代理上的队列状态,包括队列ID、最小偏移量、最大偏移量和消息更新时间。还可以查看消费组消费此Topic的情况,包括消费组名称、最大重试次数和广播消费。 图1 Topic详情
        来自:
        帮助文档
        分布式消息服务RocketMQ
        快速入门
        Topic管理
      • Kafka客户端参数配置建议
        本文主要 Kafka客户端参数配置建议。 Kafka客户端的配置参数很多,以下提供producer和consumer几个常用参数配置。其他参数配置,请参考Kafka配置。 表 Producer参数 参数 默认值 推荐值 说明 acks 1 高可靠:all或者1高吞吐:1 收到Server端确认信号个数,表示producer需要收到多少个这样的确认信号,算消息发送成功。 acks参数代表了数据备份的可用性。常用选项:acks0:表示producer不需要等待任何确认收到的信息,副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为1。 acks1:这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。如果follower没有成功备份数据,而此时leader又无法提供服务,则消息会丢失。 acksall或者1:这意味着leader需要等待ISR中所有备份都成功写入日志。只要任何一个备份存活,数据都不会丢失。min.insync.replicas指定必须确认写入才能被认为成功的副本的最小数量。 retries 0 结合实际业务调整 客户端发送消息的重试次数。值大于0时,这些数据发送失败后,客户端会重新发送。 注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。针对网络闪断场景,生产者建议配置重试能力,推荐重试次数retries3,重试间隔retry.backoff.ms1000。 request.timeout.ms 30000 结合实际业务调整 设置一个请求最大等待时间,超过这个时间则会抛Timeout异常。超时时间如果设置大一些,如127000(127秒),高并发的场景中,能减少发送失败的情况。 block.on.buffer.full TRUE TRUE TRUE表示当我们内存用尽时,停止接收新消息记录或者抛出错误。默认情况下,这个设置为TRUE。 然而某些阻塞可能不值得期待,因此立即抛出错误更好。如果设置为false,则producer抛出一个异常错误:BufferExhaustedException batch.size 16384 262144 默认的批量处理消息字节数上限。producer将试图批处理消息记录,以减少请求次数。 这将改善client与server之间的性能。不会试图处理大于这个字节数的消息字节数。 发送到brokers的请求将包含多个批量处理,其中会包含对每个partition的一个请求。较小的批量处理数值比较少用,并且可能降低吞吐量(0则会仅用批量处理)。 较大的批量处理数值将会浪费更多内存空间,这样就需要分配特定批量处理数值的内存大小。 buffer.memory 33554432 67108864 producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。 这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。 表 Consumer参数 参数 默认值 推荐值 说明 :::: auto.commit.enable TRUE FALSE 如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程无法提供服务时,由新的consumer使用。约束:设置为false后,需要先成功消费再提交,这样可以避免消息丢失。 auto.offset.reset latest earliest 没有初始化offset或者offset被删除时,可以设置以下值: earliest:自动复位offset为最早 latest:自动复位offset为最新 none:如果没有发现offset则向消费者抛出异常 anything else:向消费者抛出异常。 说明 如果将此配置设置为latest,新增分区时,生产者可能会在消费者重置初始偏移量之前开始向新增加的分区发送消息,从而导致部分消息丢失。 connections.max.idle.ms 600000 30000 空连接的超时时间,设置为30000可以在网络异常场景下减少请求卡顿的时间。
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        Kafka客户端参数配置建议
      • 查询消息
        查询消息轨迹 1. 登录分布式消息服务RocketMQ控制台。 2. 单击RocketMQ实例的名称,进入实例详情页面。 3. 在左侧导航栏,单击“消息查询”,进入“消息查询”页面。 4. 选择以下任意一种方法,查询消息。 按Topic查询:“Topic”选择待查询消息的Topic名称,“队列”选择待查询消息的队列,“存储时间”选择待查询消息的时间段,单击“查询”。 按Message ID查询:“Topic”选择待查询消息所在的Topic名称,“消息ID”输入待查询消息的Message ID,单击“查询”。 按Message Key查询:“Topic”选择待查询消息所在的Topic名称,“消息ID”输入待查询消息的Message Key,单击“查询”。 5. 在待查询消息所在行,单击“消息轨迹”,查看消息的轨迹,确定是否生产/消费成功。 消息轨迹的参数说明如表1所示。 表1 消息轨迹的参数说明 参数 参数说明 生产者状态 生产者状态如下: 1.发送成功:消息发送成功,服务端已经成功存储消息。 2.提交成功:允许消费者消费此事务消息。 3.回滚:事务消息将被丢弃,不允许消费者消费此事务消息。 4.未知,待确认:事务消息状态暂时无法确定,等待固定时间后,服务端向生产者进行消息回查。 生产耗时 生产者发送消息的耗时。 生产地址 生产者的IP地址和端口号。 消费者状态 消费者状态如下: 1.消费成功消费超时消费异常 2.消费返回NULL消费失败 消费时间 消费消息的时间。 消费耗时 消费者消费消息的耗时。 消费地址 消费者的IP地址和端口号。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        用户指南
        消息查询
        查询消息
      • 与其他云服务的关系
        本文主要介绍与其他云服务的关系。 云审计(Cloud Trace Service) 云审计为您提供云服务资源的操作记录,记录内容包括您从管理控制台或者开放API发起的云服务资源操作请求以及每次请求的结果,供您查询、审计和回溯使用。 当前CTS记录的操作,请参考支持云审计的操作列表。 虚拟私有云(Virtual Private Cloud) Kafka实例运行于虚拟私有云,需要使用虚拟私有云创建的IP和带宽。通过虚拟私有云安全组的功能可以增强访问Kafka实例的安全性。 弹性云主机(Elastic Cloud Server) 弹性云主机是由CPU、内存、操作系统、云硬盘组成的基础的计算组件。Kafka实例运行在弹性云主机上,一个代理对应一台弹性云主机。 云硬盘(Elastic Volume Service) 云硬盘为云服务器提供块存储服务,Kafka的所有数据(如消息、元数据和日志等)都保存在云硬盘中。 云监控(Cloud Eye) 云监控是一个开放性的监控平台,提供资源的实时监控、告警、通知等服务。 说明 Kafka实例向CloudEye上报监控数据的更新周期为1分钟。 弹性公网IP(Elastic IP) 弹性公网IP提供独立的公网IP资源,包括公网IP地址与公网出口带宽服务。Kafka实例绑定弹性公网IP后,可以通过公网访问Kafka实例。
        来自:
        帮助文档
        分布式消息服务Kafka
        产品简介
        与其他云服务的关系
      • 创建用户并授权使用DMS for Kafka
        本文主要介绍 创建用户并授权使用DMS for Kafka。 如果您需要对您所拥有的DMS for Kafka服务进行精细的权限管理,您可以使用统一身份认证服务(Identity and Access Management,简称IAM),通过IAM,您可以: 根据企业的业务组织,在您的帐号中,给企业中不同职能部门的员工创建IAM用户,让员工拥有唯一安全凭证,并使用DMS for Kafka资源。 根据企业用户的职能,设置不同的访问权限,以达到用户之间的权限隔离。 将DMS for Kafka资源委托给更专业、高效的其他帐号或者云服务,这些帐号或者云服务可以根据权限进行代运维。 如果帐号已经能满足您的要求,不需要创建独立的IAM用户,您可以跳过本章节,不影响您使用DMS for Kafka服务的其它功能。 本章节为您介绍对用户授权的方法,操作流程如下图所示。 前提条件 给用户组授权之前,请您了解用户组可以添加的DMS for Kafka系统策略,并结合实际需求进行选择,DMS for Kafka支持的系统策略及策略间的对比,请参见:DMS for Kafka权限管理。 示例流程 图 给用户授权DMS for Kafka权限流程 1. 创建用户组并授权 在IAM控制台创建用户组,并授予DMS for Kafka的只读权限“DMS ReadOnlyAccess”。 2. 创建用户并加入用户组 在IAM控制台创建用户,并将其加入 1 中创建的用户组。 3. 用户登录并验证权限 新创建的用户登录控制台,切换至授权区域,验证权限: 在“服务列表”中选择分布式消息服务Kafka,进入Kafka实例主界面,单击右上角“购买Kafka实例”,尝试购买Kafka实例,如果无法购买Kafka实例(假设当前权限仅包含DMS ReadOnlyAccess),表示“DMS ReadOnlyAccess”已生效。 在“服务列表”中选择云硬盘(假设当前策略仅包含DMS ReadOnlyAccess),若提示权限不足,表示“DMS ReadOnlyAccess”已生效。
        来自:
        帮助文档
        分布式消息服务Kafka
        用户指南
        权限管理
        创建用户并授权使用DMS for Kafka
      • 鉴权接入超时问题
        本节介绍分布式消息服务Kafka使用安全接入点,公网接入点等需要鉴权的接入点时,可能会遇到连接超时的问题。 使用安全接入点,公网接入点等需要鉴权的接入点时,可能会遇到连接超时等问题。 解决方法为可以在客户端机器的host文件上配置Kafka的ip。 例如用户连接Kafka的公网接入点34.28.112.101:8094,34.28.112.102:8094,34.28.112.103:8094时,出现连接超时的报错,可以在客户端机器上的/etc/hosts文件里面加上如下配置: plaintext 34.28.112.101 34.28.112.101 34.28.112.102 34.28.112.102 34.28.112.103 34.28.112.103 然后再重启客户端服务,看是否能正常生产消费。
        来自:
        帮助文档
        分布式消息服务Kafka
        常见问题
        连接问题
        鉴权接入超时问题
      • 如何设置消息堆积数超过阈值时,发送告警短信/邮件
        本文主要介绍如何设置消息堆积数超过阈值时,发送告警短信/邮件。 操作场景 如果您想要在消费组的消息堆积数超过阈值时,通过短信/邮件及时收到通知信息,可以参考本章节设置告警通知。 您还可以参考分布式消息服务Kafka的支持的监控指标,设置告警通知。 前提条件 已购买Kafka实例、创建Topic,并且已成功消费消息。 操作步骤 1、 登录分布式消息服务Kafka控制台,单击待创建告警通知的实例名称,进入实例详情页。 2、 在左侧导航栏,选择“监控”,进入监控页面。 3、 在“消费组”页签,设置需要创建告警通知的消费组。 图 选择需要创建告警通知的消费组 消费组:选择需要创建告警通知的消费组。 主题:选择“全部Topic”。 4、 选中“消息堆积数(消费组可消费消息数)”图表,单击,创建告警规则。 图 消息堆积数图表 5、 在“创建告警规则”界面,设置告警名称。 图 设置告警名称 名称:您自定义的告警名称,用于识别不同的告警。 描述:告警规则描述,可以不填。 6、 在“创建告警规则”界面,设置告警策略。 图 设置告警策略 触发规则:选择“自定义创建”。 告警策略:触发告警规则的告警策略,是否触发告警取决于连续周期的数据是否达到阈值。 告警级别:根据实际情况选择告警等级。 7、 在“创建告警规则”界面,设置告警通知对象。 图 设置告警通知对象 发送通知:选择开启。 通知方式:选择“主题订阅”。 通知对象:选择云帐号联系人或已创建的告警通知主题,告警通知主题的订阅信息中包含需要接收告警信息的手机号/邮箱地址。 图 创建告警通知主题 图 添加订阅 生效时间:该告警规则仅在生效时间内发送通知消息。 触发条件:触发告警通知的条件。 8、 在“创建告警规则”界面,设置企业项目和标签。 图 设置企业项目和标签 归属企业项目:告警规则所属的企业项目。只有拥有该企业项目权限的用户才可以查看和管理该告警规则。 标签:标签用于标识云资源,当您拥有相同类型的许多云资源时,可以使用标签按各种维度(例如用途、所有者或环境)对云资源进行分类。 9、 单击“立即创建”,完成告警规则的设置。 告警规则创建完成后,在云监控服务的“告警 > 告警规则”界面,查看新创建的告警规则。 图 查看新创建的告警规则
        来自:
        帮助文档
        分布式消息服务Kafka
        最佳实践
        如何设置消息堆积数超过阈值时,发送告警短信/邮件
      • 产品类
        顺序消息和普通消息的区别是什么 最大的区别在于是否能保证消息生产和消费的顺序一致。 对于顺序消息,消息均根据ShardingKey进行区块分区,同一分区内的消息消费满足先进先出,保证分区有序,不同分区的消息消费顺序不做要求。 普通消息则没有该项保证,消息消费的顺序跟生产的顺序不一定保证一致性。 RocketMQ集群消费和广播消费区别是什么 使用集群消费模式时,MQ内任意一条消息只需被订阅组集群内的任意一个消费者消费即可。 使用广播消费模式时,MQ内每条消息都会投递到订阅组集群的所有消费者,每条消息至少被每个消费者消费一次。 多个订阅组订阅同一个主题时消息如何被消费 RocketMQ中订阅关系并非是一对一的,一个主题可以被一个或多个订阅组订阅,但不同订阅组之间的消费是互不影响的,它们各自维护自己在当前主题的消费偏移信息,每一条消息都会被订阅该主题的订阅组接收到。 消息消费失败是否会有重试机制 在push消费模式下,RocketMQ在消费者消费消息失败后会通过将消息重新投递到该订阅组的重试队列在一定时间后会被消费者重新消费到,如果多次失败则会多次重复上述的重试过程,超过最大次数之后(创建订阅组时可配置)会将消息投递到死信队列。
        来自:
        帮助文档
        分布式消息服务RocketMQ
        常见问题
        产品类
      • 将MySQL同步到Kafka
        参数 描述 同步Topic策略 同步Topic策略,可选择 集中投递到一个Topic:适合源库业务量不大的场景。 自动生成Topic名字:适合每张表数据量都较大的场景,按每一张库表来确定一个Topic。 Topic 选择目标端需要同步到的Topic,同步Topic策略选择集中投递到一个Topic时可见。 Topic名字格式 Topic名字格式,同步Topic策略选择自动生成Topic名字时可见。 Topic名字格式支持database和tablename两个变量,其他字符都当做常量。分别用$database$代替数据库名,$tablename$代替表名。 例如:配置成$database$$tablename$时,如果数据库名称为db1,表名为tab1,则Topic名字为db1tab1。如果是DDL语句,$tablename$为空,则Topic名字为db1。 由于kafka的机制,Topic名字格式不能以"" , "."开头,或以".internal","internal" 结尾,这些命名格式的Topic会被当做为kafka的内部Topic,业务无法使用。 同步到kafka partition策略 同步到kafka partition策略。 按库名+表名的hash值投递到不同Partition:适用于单表的查询场景,可以提高单表读写性能,推荐使用此选项。 全部投递到Partition 0:适用于有事务要求的场景,写入性能比较差,如果没有强事务要求,不推荐使用此选项。 按表的主键值hash值投递到不同的Partion:适用于一个表一个Topic的场景,避免该表都写到同一个分区,消费者可以并行从各分区获取数据。 投送到kafka的数据格式 选择MySQL投送到kafka的数据格式。 Avro:可以显示Avro二进制编码,高效获取数据。 JSON:为Json消息格式,方便解释格式,但需要占用更多的空间。 JSONC:一种能够兼容多个批量,流式计算框架的数据格式。 详细格式可参考 Kafka消息格式。 同步对象 同步对象支持表级同步、库级同步,您可以根据业务场景选择对应的数据进行同步。 选择对象的时候支持搜索,以便您快速选择需要的数据库对象。 在同步对象右侧已选对象框中,可以使用对象名映射功能进行源数据库和目标数据库中的同步对象映射,具体操作可参考对象名映射。
        来自:
        帮助文档
        数据库复制
        用户指南
        实时同步
        自建到自建
        将MySQL同步到Kafka
      • 1
      • ...
      • 10
      • 11
      • 12
      • 13
      • 14
      • ...
      • 244
      跳转至
      推荐热词
      天翼云运维管理审计系统天翼云云服务平台云服务备份云日志服务应用运维管理云手机云电脑天翼云云hbase数据库电信云大数据saas服务电信云大数据paas服务轻量型云主机天翼云客户服务电话应用编排服务天翼云云安全解决方案云服务总线CSB天翼云服务器配置天翼云联邦学习产品天翼云云安全天翼云企业上云解决方案天翼云产品天翼云视频云存储

      天翼云最新活动

      安全隔离版OpenClaw

      OpenClaw云服务器专属“龙虾“套餐低至1.5折起

      天翼云新春焕新季

      云主机开年特惠28.8元/年,0元秒杀等你来抢!

      云上钜惠

      爆款云主机全场特惠,2核4G只要1.8折起!

      中小企业服务商合作专区

      国家云助力中小企业腾飞,高额上云补贴重磅上线

      出海产品促销专区

      爆款云主机低至2折,高性价比,不限新老速来抢购!

      天翼云奖励推广计划

      加入成为云推官,推荐新用户注册下单得现金奖励

      产品推荐

      弹性云主机 ECS

      GPU云主机

      镜像服务 IMS

      模型推理服务

      科研助手

      知识库问答

      人脸检测

      人脸比对

      动作活体识别

      推荐文档

      android端

      产品特点

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 天翼云国际站
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2026 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号