活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 云聚517 · 好价翼起拼 NEW 爆款云主机低至25.83元/年,参与拼团享更多优惠,拼成得额外优惠券
  • 安全隔离版OpenClaw NEW OpenClaw云服务器专属“龙虾“套餐低至1.5折起
  • 聚力AI赋能 天翼云大模型专项 大模型特惠专区·Token Plan 轻享包低至9.9元起
  • 青云志云端助力计划 NEW 一站式科研助手,海外资源安全访问平台,助力青年翼展宏图,平步青云
  • 企业出海解决方案 NEW 助力您的业务扬帆出海,通达全球!
  • 天翼云信创专区 NEW “一云多芯、一云多态”,国产化软件全面适配,国产操作系统及硬件芯片支持丰富
  • 中小企业服务商合作专区 国家云助力中小企业腾飞,高额上云补贴重磅上线
  • 云上钜惠 爆款云主机全场特惠,2核4G只要1.8折起!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

息壤智算

领先开放的智算服务平台,提供算力、平台、数据、模型、应用“五位一体”智算服务体系,构建全流程的AI基础设施能力
AI Store
  • 算力市场
  • 模型市场
  • 应用市场
公共算力服务
  • 裸金属
  • 定制裸金属
训推服务
  • 模型开发
  • 训练任务
  • 服务部署
Token服务
  • 模型广场
  • 体验中心
  • 服务接入
应用托管
  • 应用实例
科研助手
  • 科研智能体
  • 科研服务
  • 开发机
  • 并行计算
大模型
  • DeepSeek-V4-Flash
  • GLM-5.1
  • Qwen3.5-122B-A10B
  • DeepSeek-V3.2(旗舰版)
  • GLM-5(正式版)
智算一体机
  • 智算一体机
智能体引擎
  • 智能体引擎
模型适配专家服务
  • 模型适配专家服务
算力服务商
  • 入驻算力服务商

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场进入AI Store创新解决方案公有云生态专区智云上海应用生态专区
建站工具
  • 新域名服务
  • SSL证书
  • 翼建站
企业办公
  • 安全邮箱
  • WPS 365 天翼云版
  • 天翼企业云盘(标准服务版)
灾备迁移
  • 云管家2.0
  • 翼备份(SaaS版)

定价

协助您快速了解云产品计费模式、价格详情,轻松预估上云成本
价格计算器
  • 动态测算产品价格
定价策略
  • 快速了解计费模式

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼信创云专区
  • 信创云专区
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
培训与认证
  • 天翼云学堂
  • 天翼云认证
开源社区
  • 魔乐社区
  • OpenTeleDB

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 服务保障
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家
我要反馈
  • 建议与反馈
  • 用户体验官
信息公告
  • 客户公告

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2026智能云生态大会
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 息壤智算
  • 产品
  • 解决方案
  • 应用商城
  • 定价
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      文档中心

      云日志服务

      云日志服务

      • 云日志服务

      无数据

        • 产品动态
        • 产品简介
        • 产品定义
        • 产品优势
        • 应用场景
        • 使用限制
        • 权限管理
        • 采集器隐私声明
        • 与其他云服务关系
        • 计费说明
        • 产品价格和计费项说明
        • 余额不足
        • 快速入门
        • 入门概览
        • 步骤1:创建日志组和日志流
        • 步骤2:安装ICAgent
        • 步骤3:接入日志
        • 步骤4:查看实时日志
        • 用户指南
        • 权限管理
        • 日志管理
        • 控制台首页
        • 通过关键字查询日志
        • 资源统计
        • 日志组
        • 日志流
        • 标签管理
        • 日志接入
        • 使用ICAgent插件采集容器日志
        • ICAgent采集容器场景日志概述
        • 云容器引擎CCE应用日志接入LTS
        • 自建K8s应用日志接入LTS
        • 云服务接入
        • 主机接入
        • APIG接入
        • CBH接入
        • CCE接入
        • CTS接入
        • DWS接入
        • ELB接入
        • VPC接入
        • WAF接入
        • 对象存储服务OBS接入LTS
        • 其他方式接入
        • 跨账号接入
        • 使用KAFKA协议上报日志到LTS
        • 主机管理
        • 主机组
        • 主机
        • 安装ICAgent(区域内主机)
        • 安装ICAgent(区域外主机)
        • 升级ICAgent
        • 卸载ICAgent
        • ICAgent状态
        • 日志搜索与分析
        • 日志搜索
        • 内置保留字段
        • 索引配置
        • 云端结构化解析
        • 日志结构化配置
        • 结构化方式
        • 结构化模板
        • 结构化配置字段
        • 自定义日志时间
        • 搜索语法与功能
        • 搜索语法
        • 短语搜索
        • 实时查看日志
        • 快速分析
        • 快速查询
        • 日志告警
        • 告警规则
        • 关键词告警
        • 告警通知
        • 消息模板
        • 告警列表
        • 配置中心
        • 配额设置
        • 分词设置
        • 日志采集
        • 日志转储
        • 概述
        • 转储至OBS
        • 一次性转储
        • LTS配置中心管理
        • 设置LTS日志采集配额
        • 设置LTS日志内容分词
        • 设置ICAgent日志采集开关
        • 日志可视化
        • 日志可视化概述
        • 使用统计图表将日志可视化
        • 使用仪表盘将日志可视化
        • 常见问题
        • ICAgent安装
        • ICAgent安装失败怎么办?
        • 界面上单击升级ICAgent操作失败怎么办?
        • 安装ICAgent完成后显示离线如何解决?
        • 安装ICAgent完成后不显示如何解决?
        • 日志采集
        • 使用ICAgent过程中,CPU占用较高怎么处理?
        • 云日志服务可以采集哪类日志?支持采集哪些文件类型?
        • 在AOM(应用运维管理)中关闭超额继续采集日志开关,会影响LTS(云日志服务)收集日志吗?
        • CCE标准输出日志默认采集到AOM,应该如何关闭?
        • 日志搜索与查看
        • 实时查看最新日志,每一次加载数据时延是多久?
        • 在云日志服务控制台查看不到原始日志怎么办?
        • 如何手动删除日志?
        • 日志搜索相关问题
        • 日志转储
        • 日志转储后,LTS会删除转储的内容吗?
        • 日志转储页面,转储状态异常是什么原因?
        • 如何转储云审计服务CTS的日志?
        • 配置OBS转储后,OBS桶无法查看历史数据?
        • 其他问题
        • 如何获取服务AK/SK?
        • 如何通过创建委托授权安装ICAgent?
        • LTS配置日志接入后多久有日志?
        • 500MB免费额度怎么算?
        • 文档下载
        • 用户手册
        • 相关协议
        • 天翼云云日志服务产品服务协议
        • 天翼云云日志服务等级协议
          无相关产品

          本页目录

          帮助中心云日志服务用户指南日志接入其他方式接入使用KAFKA协议上报日志到LTS
          使用KAFKA协议上报日志到LTS
          更新时间 2026-05-11 13:57:54
          • 新浪微博
          • 微信
            扫码分享
          • 复制链接
          最近更新时间: 2026-05-11 13:57:54
          分享文章
          • 新浪微博
          • 微信
            扫码分享
          • 复制链接

          您可以通过Kafka协议上报日志到日志服务,目前支持各类Kafka Producer SDK或采集工具,仅依赖于Kafka协议。支持以下场景:

          • 场景1:已有基于开源采集的自建系统,仅修改配置文件便可以将日志上报到LTS,例如Logstash。

          • 场景2:希望通过Kafka producer SDK来采集日志并上报,不必再安装采集ICAgent。

          前提条件

          • 确认云日志服务的区域,请用户根据所在区域,获取regionid。

          • 获取需要上报到LTS的日志组ID、日志流ID。

          • 当前仅支持内网上报,需要在ECS主机上使用。

          相关限制

          • 当前仅支持内网上报,端口固定为,IP根据所在局点进行配置。

          • 支持 Kafka 协议版本为:1.0.X,2.X.X,3.X.X。

          • 支持压缩方式:gzip,snappy,lz4。

          • KAFKA认证方式为 SASL_PLAINTEXT 认证。

          • KAFKA协议的ACKS参数必须设置为0。

          配置方式

          • 使用Kafka协议上报日志时,需要使用到的通用参数如下。

          通用参数

          参数名称描述类型
          projectId用户账号的项目ID(project id)String
          logGroupIdLTS的日志组IDString
          logStreamIdLTS的日志流IDString
          regionName云日志服务的区域String
          accessKey用户账号的AKString
          accessSecret用户账号的SKString
          • 使用Kafka协议上报日志时,需要配置以下参数。

          配置参数

          参数名称说明
          连接类型当前支持SASL_PLAINTEXT
          hosts

          Kafka的IP和PORT地址,格式为lts-kafka.${regionName}.${external_global_domain_name}:9095

          其中IP根据局点进行配置,PORT固定为。

          topicKafka的topic名称,格式为${日志组ID}_${日志流ID},即LTS的日志组ID和日志流ID通过下划线连接,作为topic的名称。
          usernameKafka访问用户名,配置为用户账号的项目ID。
          passwordKafka访问密码,格式为${accessKey}#${accessSecret},即用户账号的AK和SK通过#连接,作为Kafka的访问密码。
          • ${message}日志格式

            仅当headers中添加了key为LTS_LOG_TYPE,value为FORMAT的header时,日志需要符合该格式规范。

          日志参数

          参数名称是否必选参数类型描述
          tenant_project_id是String用户账号的项目ID。
          tenant_group_id是StringLTS的日志组ID。
          tenant_stream_id是StringLTS的日志流ID。
          log_time_ns是Long

          日志数据采集时间,UTC时间(纳秒)。

          采集时间需在日志存储时间范围之内,否则上报日志会被删除。比如日志组的日志存储时间是7天,则此参数不应早于当前时间的7天前。

          contents是Array of String日志内容。
          labels是Object

          用户自定义label。

          请不要将字段名称设置为内置保留字段,否则可能会造成字段名称重复、查询不精确等问题。

          日志示例

          { 
          "tenant_project_id": "${projectId}", 
          "tenant_group_id": "${logGroupId}", 
          "tenant_stream_id": "${logStreamId}", 
          "log_time_ns": "XXXXXXXXXXXXXXXXXXX", 
          "contents": [ 
          "This is a log 1", 
          "This is a log 2" 
          ], 
          "labels": { 
          "type": "kafka" 
          } 
          }

          调用示例

          1. Beat系列软件调用(FileBeat等)。以FileBeat为例,配置参数如下:

          output.kafka: 
          hosts: ["${ip}:${port}"] 
          partition.round_robin: 
          reachable_only: false 
          username: "${projectId}" 
          password: "${accessKey}#${accessSecret}" 
          topic: "${logGroupId}_${logStreamId}" 
          sasl.mechanism: "PLAIN" 
          security.protocol: "SASL_PLAINTEXT" 
          acks: "0" 
          compression: gzip

          1. 通过Logstash软件上报日志。

          input { 
          stdin {} 
          } 
          output { 
          kafka { 
          # 配置地址
          bootstrap_servers => "${ip}:${port}" 
          # 配置topic 
          topic_id => "${logGroupId}_${logStreamId}" 
          # 配置消息确认机制
          acks => "0" 
          # 配置压缩方式
          compression_type => "gzip" 
          # 配置认证方式
          security_protocol => "SASL_PLAINTEXT" 
          sasl_mechanism => "PLAIN" 
          # 用户名 projectId 密码 accessKey#accessSecret 
          sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';" 
          } 
          }

          1. 通过Flume软件上报日志。

          #Name 
          a1.sources = r1 
          a1.channels = c1 
          a1.sinks = k1 
          #Source 
          a1.sources.r1.type = TAILDIR 
          a1.sources.r1.channels = c1 
          a1.sources.r1.filegroups = f1 
          a1.sources.r1.filegroups.f1 = /tmp/test.txt 
          a1.sources.r1.fileHeader = true 
          a1.sources.r1.maxBatchCount = 1000 
          #Channel 
          a1.channels.c1.type = memory 
          a1.channels.c1.capacity = 10000 
          a1.channels.c1.transactionCapacity = 100 
          #Sink 
          a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink 
          a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} 
          a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} 
          a1.sinks.k1.kafka.producer.acks = 0 
          a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT 
          a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN 
          a1.sinks.k1.kafka.producer.compression.type = gzip 
          a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; 
          #Bind 
          a1.sources.r1.channels = c1 
          a1.sinks.k1.channel = c1

          SDK 调用示例

          1. Java SDK调用示例。

          maven依赖(示例kafka协议版本为2.7.1):

          <dependencies> 
          <dependency> 
          <groupId>org.apache.kafka</groupId> 
          <artifactId>kafka-clients</artifactId> 
          <version>2.7.1</version> 
          </dependency> 
          </dependencies>

          代码示例:

          package org.example; 
          import org.apache.kafka.clients.producer.KafkaProducer; 
          import org.apache.kafka.clients.producer.Producer; 
          import org.apache.kafka.clients.producer.ProducerRecord; 
          import java.util.Properties; 
          public class ProducerDemo { 
          public static void main(String[] args) { 
          Properties props = new Properties(); 
          // 配置地址
          props.put("bootstrap.servers", "${ip}:${port}"); 
          // 配置消息确认机制
          props.put("acks", "0"); 
          // 配置认证方式
          props.put("security.protocol", "SASL_PLAINTEXT"); 
          props.put("sasl.mechanism", "PLAIN"); 
          // 用户名 projectId 密码 accessKey#accessSecret 
          props.put("sasl.jaas.config", 
          "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';"); 
          // 配置压缩方式
          props.put("compression.type", "${compress_type}"); 
          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
          // 1.创建一个生产者对象
          Producer<String, String> producer = new KafkaProducer<>(props); 
          // 2.调用send方法
          for (int i = 0; i < 1; i++) { 
          ProducerRecord record = new ProducerRecord<>("${logGroupId}_${logStreamId}", "${message}"); 
          // 配置recordHeader 
          // record.headers().add(new RecordHeader("LTS_LOG_TYPE","FORMAT".getBytes())); 
          producer.send(record); 
          } 
          // 3.关闭生产者
          producer.close(); 
          } 
          }

          1. Python SDK调用示例。

          from kafka import KafkaProducer 
          producer = KafkaProducer( 
          # 配置地址
          bootstrap_servers="${ip}:${port}", 
          # 配置消息确认机制
          acks="0", 
          # 配置压缩方式
          compression_type ="${compression_type}" 
          # 配置认证方式
          sasl_mechanism="PLAIN", 
          security_protocol="SASL_PLAINTEXT", 
          # 用户名 projectId 密码 accessKey#accessSecret 
          sasl_plain_username="${projectId}", 
          sasl_plain_password="${accessKey}#${accessSecret}" 
          ) 
          print('start producer') 
          for i in range(0, 3): 
          data = bytes("${message}", encoding="utf-8") 
          future = producer.send("${logGroupId}_{logStreamId}", data) 
          result = future.get(timeout=10) 
          print(result) 
          print('end producer')

          报错说明

          当参数错误或不匹配时,会有相应的报错提示。

          报错说明

          报错信息报错原因
          TopicAuthorizationExceptionprojectId(项目ID)、accessKey(AK)、accessSecret(SK)参数错误或者不匹配。
          UnknownTopicOrPartitionExceptionlogGroupId(日志组ID)、logStreamId(日志流ID)参数错误或者不匹配。
          InvalidRecordException日志格式错误或者日志中的projectId(项目ID)、logGroupId(日志组ID)、logStreamId(日志流ID)与外部设置参数不一致。

           

          文档反馈

          建议您登录后反馈,可在建议与反馈里查看问题处理进度

          鼠标选中文档,精准反馈问题

          选中存在疑惑的内容,即可快速反馈问题,我们会跟进处理

          知道了

          上一篇 :  跨账号接入
          下一篇 :  主机管理
          搜索 关闭
          ©2026 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
          公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
          备案 京公网安备11010802043424号 京ICP备 2021034386号
          ©2026天翼云科技有限公司版权所有
          京ICP备 2021034386号
          备案 京公网安备11010802043424号
          增值电信业务经营许可证A2.B1.B2-20090001
          用户协议 隐私政策 法律声明