kafka使用javaagent 生产者 c @Service public class KafkaProducerService { private final KafkaTemplate kafkaTemplate; public KafkaProducerService(KafkaTemplate kafkaTemplate) { this.kafkaTemplate kafkaTemplate; } / 发送简单消息 / public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); } } 消费者 监听者模式 若使用Spring kafka 的监听者模式,需要加上启动参数 c Dotel.instrumentation.springkafka.enabledfalse subscribe 模式 c @Component public class KafkaConsumerDemo { @Value("${spring.kafka.bootstrapservers:127.0.0.1:9092}") private String bootstrapServers; @Value("${demo.topic:demotopic}") private String topic; @Value("${demo.group:demogroup}") private String group; @PostConstruct public void init(){ new Thread(()>{ Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAPSERVERSCONFIG, bootstrapServers); props.put(ConsumerConfig.KEYDESERIALIZERCLASSCONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUEDESERIALIZERCLASSCONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUPIDCONFIG, group); // 消费者组ID props.put(ConsumerConfig.AUTOOFFSETRESETCONFIG, "earliest"); // 如果没有偏移量,从最早开始消费 props.put(ConsumerConfig.ENABLEAUTOCOMMITCONFIG, "true"); // 自动提交偏移量 props.put(ConsumerConfig.AUTOCOMMITINTERVALMSCONFIG, "1000"); // 自动提交间隔 // 2. 创建消费者对象 try (KafkaConsumer consumer new KafkaConsumer<>(props)) { // 3. 订阅主题 consumer.subscribe(Collections.singletonList(topic)); // 4. 持续拉取消息 while (true) { ConsumerRecords records consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { System.out.printf("消费到消息: topic%s, partition%d, offset%d, key%s, value%s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } catch (Exception e) { e.printStackTrace(); } }).start(); } }