您可以通过Kafka协议上报日志到日志服务,目前支持各类Kafka Producer SDK或采集工具,仅依赖于Kafka协议。支持以下场景:
场景1:已有基于开源采集的自建系统,仅修改配置文件便可以将日志上报到LTS,例如Logstash。
场景2:希望通过Kafka producer SDK来采集日志并上报,不必再安装采集ICAgent。
前提条件
确认云日志服务的区域,请用户根据所在区域,获取regionid。
获取需要上报到LTS的日志组ID、日志流ID。
当前仅支持内网上报,需要在ECS主机上使用。
相关限制
当前仅支持内网上报,端口固定为,IP根据所在局点进行配置。
支持 Kafka 协议版本为:1.0.X,2.X.X,3.X.X。
支持压缩方式:gzip,snappy,lz4。
KAFKA认证方式为 SASL_PLAINTEXT 认证。
KAFKA协议的ACKS参数必须设置为0。
配置方式
使用Kafka协议上报日志时,需要使用到的通用参数如下。
通用参数
| 参数名称 | 描述 | 类型 |
|---|---|---|
| projectId | 用户账号的项目ID(project id) | String |
| logGroupId | LTS的日志组ID | String |
| logStreamId | LTS的日志流ID | String |
| regionName | 云日志服务的区域 | String |
| accessKey | 用户账号的AK | String |
| accessSecret | 用户账号的SK | String |
使用Kafka协议上报日志时,需要配置以下参数。
配置参数
| 参数名称 | 说明 |
|---|---|
| 连接类型 | 当前支持SASL_PLAINTEXT |
| hosts | Kafka的IP和PORT地址,格式为lts-kafka.${regionName}.${external_global_domain_name}:9095 其中IP根据局点进行配置,PORT固定为。 |
| topic | Kafka的topic名称,格式为${日志组ID}_${日志流ID},即LTS的日志组ID和日志流ID通过下划线连接,作为topic的名称。 |
| username | Kafka访问用户名,配置为用户账号的项目ID。 |
| password | Kafka访问密码,格式为${accessKey}#${accessSecret},即用户账号的AK和SK通过#连接,作为Kafka的访问密码。 |
${message}日志格式
仅当headers中添加了key为LTS_LOG_TYPE,value为FORMAT的header时,日志需要符合该格式规范。
日志参数
| 参数名称 | 是否必选 | 参数类型 | 描述 |
|---|---|---|---|
| tenant_project_id | 是 | String | 用户账号的项目ID。 |
| tenant_group_id | 是 | String | LTS的日志组ID。 |
| tenant_stream_id | 是 | String | LTS的日志流ID。 |
| log_time_ns | 是 | Long | 日志数据采集时间,UTC时间(纳秒)。 采集时间需在日志存储时间范围之内,否则上报日志会被删除。比如日志组的日志存储时间是7天,则此参数不应早于当前时间的7天前。 |
| contents | 是 | Array of String | 日志内容。 |
| labels | 是 | Object | 用户自定义label。 请不要将字段名称设置为内置保留字段,否则可能会造成字段名称重复、查询不精确等问题。 |
日志示例
{ "tenant_project_id": "${projectId}", "tenant_group_id": "${logGroupId}", "tenant_stream_id": "${logStreamId}", "log_time_ns": "XXXXXXXXXXXXXXXXXXX", "contents": [ "This is a log 1", "This is a log 2" ], "labels": { "type": "kafka" } }
调用示例
Beat系列软件调用(FileBeat等)。以FileBeat为例,配置参数如下:
output.kafka: hosts: ["${ip}:${port}"] partition.round_robin: reachable_only: false username: "${projectId}" password: "${accessKey}#${accessSecret}" topic: "${logGroupId}_${logStreamId}" sasl.mechanism: "PLAIN" security.protocol: "SASL_PLAINTEXT" acks: "0" compression: gzip
通过Logstash软件上报日志。
input { stdin {} } output { kafka { # 配置地址bootstrap_servers => "${ip}:${port}" # 配置topic topic_id => "${logGroupId}_${logStreamId}" # 配置消息确认机制acks => "0" # 配置压缩方式compression_type => "gzip" # 配置认证方式security_protocol => "SASL_PLAINTEXT" sasl_mechanism => "PLAIN" # 用户名 projectId 密码 accessKey#accessSecret sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';" } }
通过Flume软件上报日志。
#Name a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
SDK 调用示例
Java SDK调用示例。
maven依赖(示例kafka协议版本为2.7.1):
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.7.1</version> </dependency> </dependencies>
代码示例:
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerDemo { public static void main(String[] args) { Properties props = new Properties(); // 配置地址props.put("bootstrap.servers", "${ip}:${port}"); // 配置消息确认机制props.put("acks", "0"); // 配置认证方式props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); // 用户名 projectId 密码 accessKey#accessSecret props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectId}' password='${accessKey}#${accessSecret}';"); // 配置压缩方式props.put("compression.type", "${compress_type}"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 1.创建一个生产者对象Producer<String, String> producer = new KafkaProducer<>(props); // 2.调用send方法for (int i = 0; i < 1; i++) { ProducerRecord record = new ProducerRecord<>("${logGroupId}_${logStreamId}", "${message}"); // 配置recordHeader // record.headers().add(new RecordHeader("LTS_LOG_TYPE","FORMAT".getBytes())); producer.send(record); } // 3.关闭生产者producer.close(); } }
Python SDK调用示例。
from kafka import KafkaProducer producer = KafkaProducer( # 配置地址bootstrap_servers="${ip}:${port}", # 配置消息确认机制acks="0", # 配置压缩方式compression_type ="${compression_type}" # 配置认证方式sasl_mechanism="PLAIN", security_protocol="SASL_PLAINTEXT", # 用户名 projectId 密码 accessKey#accessSecret sasl_plain_username="${projectId}", sasl_plain_password="${accessKey}#${accessSecret}" ) print('start producer') for i in range(0, 3): data = bytes("${message}", encoding="utf-8") future = producer.send("${logGroupId}_{logStreamId}", data) result = future.get(timeout=10) print(result) print('end producer')
报错说明
当参数错误或不匹配时,会有相应的报错提示。
报错说明
| 报错信息 | 报错原因 |
|---|---|
| TopicAuthorizationException | projectId(项目ID)、accessKey(AK)、accessSecret(SK)参数错误或者不匹配。 |
| UnknownTopicOrPartitionException | logGroupId(日志组ID)、logStreamId(日志流ID)参数错误或者不匹配。 |
| InvalidRecordException | 日志格式错误或者日志中的projectId(项目ID)、logGroupId(日志组ID)、logStreamId(日志流ID)与外部设置参数不一致。 |