使用KAFKA协议上报日志到LTS(1) ${accessSecret}"; Bind a1.sources.r1.channels c1 a1.sinks.k1.channel c1 SDK 调用示例 1. Java SDK调用示例。 maven依赖(示例kafka协议版本为2.7.1): org.apache.kafka kafkaclients 2.7.1 代码示例: package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerDemo { public static void main(String[] args) { Properties props new Properties(); // 配置地址 props.put("bootstrap.servers", "${ip}:${port}"); // 配置消息确认机制 props.put("acks", "0"); // 配置认证方式 props.put("security.protocol", "SASLPLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); // 用户名 projectId 密码 accessKey accessSecret props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username'${projectId}' password'${accessKey} ${accessSecret}';"); // 配置压缩方式 props.put("compression.type", "${compresstype}"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 1.创建一个生产者对象 Producer producer new KafkaProducer<>(props); // 2.调用send方法 for (int i 0; i ("${logGroupId}${logStreamId}", "${message}"); // 配置recordHeader // record.headers().add(new RecordHeader("LTSLOGTYPE","FORMAT".getBytes())); producer.send(record); } // 3.关闭生产者 producer.close(); } } 2. Python SDK调用示例。 from kafka import KafkaProducer producer KafkaProducer(
来自: