Java客户端接入示例 消费消息 import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class KafkaConsumerDemo { public static void main(String args[]) { //加载kafka.properties Properties kafkaProperties JavaKafkaConfigurer.getKafkaProperties(); Properties props new Properties(); //设置接入点,请通过控制台获取对应Topic的接入点 props.put(ProducerConfig.BOOTSTRAPSERVERSCONFIG, kafkaProperties.getProperty("bootstrap.servers")); //设置SSL根证书的路径,请记得将XXX修改为自己的路径 props.put(SslConfigs.SSLTRUSTSTORELOCATIONCONFIG, kafkaProperties.getProperty("ssl.truststore.location")); //根证书store的密码,保持不变 props.put(SslConfigs.SSLTRUSTSTOREPASSWORDCONFIG, "c24f5210"); //接入协议,目前支持使用SSL协议接入 props.put(CommonClientConfigs.SECURITYPROTOCOLCONFIG, "SSL"); //两次poll之间的最大允许间隔 //可更加实际拉去数据和客户的版本等设置此值,默认30s props.put(ConsumerConfig.SESSIONTIMEOUTMSCONFIG, 30000); //设置单次拉取的量,走公网访问时,该参数会有较大影响 props.put(ConsumerConfig.MAXPARTITIONFETCHBYTESCONFIG, 32000); props.put(ConsumerConfig.FETCHMAXBYTESCONFIG, 32000); //每次poll的最大数量 //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿 props.put(ConsumerConfig.MAXPOLLRECORDSCONFIG, 30); //消息的反序列化方式 props.put(ConsumerConfig.KEYDESERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUEDESERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //当前消费实例所属的消费组,请在控制台申请之后填写 //属于同一个组的消费实例,会负载消费消息 props.put(ConsumerConfig.GROUPIDCONFIG, kafkaProperties.getProperty("group.id")); //hostname校验改成空 props.put(SslConfigs.SSLENDPOINTIDENTIFICATIONALGORITHMCONFIG, ""); //构造消息对象,也即生成一个消费实例 KafkaConsumer consumer new org.apache.kafka.clients.consumer.KafkaConsumer (props); //设置消费组订阅的Topic,可以订阅多个 //如果GROUPIDCONFIG是一样,则订阅的Topic也建议设置成一样 List subscribedTopics new ArrayList (); //如果需要订阅多个Topic,则在这里add进去即可 //每个Topic需要先在控制台进行创建 subscribedTopics.add(kafkaProperties.getProperty("topic")); consumer.subscribe(subscribedTopics); //循环消费消息 while (true){ try { ConsumerRecords records consumer.poll(1000); //必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSIONTIMEOUTMSCONFIG //建议开一个单独的线程池来消费消息,然后异步返回结果 for (ConsumerRecord record : records) { System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset())); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); } } } }