Java客户端接入示例 生产消息 import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.Future; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.SslConfigs; public class KafkaProducerDemo { public static void main(String args[]) { //加载kafka.properties Properties kafkaProperties JavaKafkaConfigurer.getKafkaProperties(); Properties props new Properties(); //设置接入点,请通过控制台获取对应Topic的接入点 props.put(ProducerConfig.BOOTSTRAPSERVERSCONFIG, kafkaProperties.getProperty("bootstrap.servers")); //设置SSL根证书的路径,请记得将XXX修改为自己的路径 props.put(SslConfigs.SSLTRUSTSTORELOCATIONCONFIG, kafkaProperties.getProperty("ssl.truststore.location")); //根证书store的密码,保持不变 props.put(SslConfigs.SSLTRUSTSTOREPASSWORDCONFIG, "c24f5210"); //接入协议,目前支持使用SSL协议接入 props.put(CommonClientConfigs.SECURITYPROTOCOLCONFIG, "SSL"); //Kafka消息的序列化方式 props.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //请求的最长等待时间 props.put(ProducerConfig.MAXBLOCKMSCONFIG, 30 1000); //设置客户端内部重试次数 props.put(ProducerConfig.RETRIESCONFIG, 5); //设置客户端内部重试间隔 props.put(ProducerConfig.RECONNECTBACKOFFMSCONFIG, 3000); //hostname校验改成空 props.put(SslConfigs.SSLENDPOINTIDENTIFICATIONALGORITHMCONFIG, ""); //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可; //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个 KafkaProducer producer new KafkaProducer (props); //构造一个Kafka消息 String topic kafkaProperties.getProperty("topic"); //消息所属的Topic,请在控制台申请之后,填写在这里 String value "this is the message's value"; //消息的内容 try { //批量获取 futures 可以加快速度, 但注意,批量不要太大 List > futures new ArrayList >(128); for (int i 0; i kafkaMessage new ProducerRecord (topic, value + ": " + i); Future metadataFuture producer.send(kafkaMessage); futures.add(metadataFuture); } producer.flush(); for (Future future: futures) { //同步获得Future对象的结果 try { RecordMetadata recordMetadata future.get(); System.out.println("Produce ok:" + recordMetadata.toString()); } catch (Throwable t) { t.printStackTrace(); } } } catch (Exception e) { System.out.println("error occurred"); e.printStackTrace(); } } }