编译运行Demo Java工程 介绍连接Kafka编译运行Demo Java工程 kafkaclients引入依赖 在使用Kafka时,你需要在你的项目中引入相应的依赖。具体的依赖项可能会因你的项目和需求而有所不同。在使用Kafka之前,请确保查阅官方文档以获取最新的依赖项和使用说明。 以Java编程语言为例,可以使用Kafka的Java客户端库。你可以在Maven或Gradle构建工具中添加以下依赖项: org.apache.kafka kafkaclients 示例代码 1. 从控制台获取以下信息 连接地址 实例连接地址从控制台实例详情菜单处获取,在实例详情页面的接入点信息一栏。 Topic名称 在Topic管理页面,选择需要的Topic名称。 消费组名称 在消费组管理页面,选择需要的消费组名称。 2. 在实例代码中替换以上信息即可实现消息。 import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { private final KafkaProducer producer; public final static String TOPIC "testtopic"; public final static String BROKERADDR "192.168.0.11:8090,192.168.0.9:8090,192.168.0.10:8090"; public Producer() { Properties props new Properties(); props.put(ProducerConfig.BOOTSTRAPSERVERSCONFIG, BROKERADDR); props.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKSCONFIG, "all"); props.put("retries",3); producer new KafkaProducer<>(props); } public void produce() { try { for (int i 0; i (TOPIC, data), new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception ! null) { // TODO: 异常处理 exception.printStackTrace(); return; } System.out.println("produce msg completed, partition id " + metadata.partition()); } }); } } catch (Exception e) { // TODO: 异常处理 e.printStackTrace(); } producer.flush(); producer.close(); } public static void main(String[] args) { Producer producer new Producer(); producer.produce(); } } 3. 同样在实例代码中替换以上信息即可消费消息。 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class Consumer { private org.apache.kafka.clients.consumer.Consumer consumer; private static final String GROUPID "testgroup"; private static final String TOPIC "testtopic"; public final static String BROKERADDR "192.168.0.11:8090,192.168.0.9:8090,192.168.0.10:8090"; public Consumer() { Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAPSERVERSCONFIG, BROKERADDR); props.put(ConsumerConfig.GROUPIDCONFIG, GROUPID); props.put(ConsumerConfig.AUTOOFFSETRESETCONFIG, "earliest"); props.put(ConsumerConfig.ENABLEAUTOCOMMITCONFIG, "false"); props.put(ConsumerConfig.KEYDESERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUEDESERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer new KafkaConsumer<>(props); } public void consume() { consumer.subscribe(Arrays.asList(TOPIC)); while (true){ try { ConsumerRecords records consumer.poll(1000); System.out.println("the numbers of topic:" + records.count()); for (ConsumerRecord record : records) { System.out.println("the data is " + record.value()); } }catch (Exception e){ // TODO: 异常处理 e.printStackTrace(); } } } public static void main(String[] args) { new Consumer().consume(); } }