Python 环境安装 1. 安装Python。(Python版本为2.7或3.X。) 2. 安装依赖库。(使用公网连接需要安装confluentkafka 1.9.2及以下版本的依赖库) pip install confluentkafka1.9.2 3. 下载Demo包kafkaconfluentpythondemo.zip。 配置修改 1. 如果是ssl连接,需要在控制台下载证书。并且解压压缩包得到ssl.client.truststore.jks,执行以下命令生成caRoot.pem文件。 keytool importkeystore srckeystore ssl.client.truststore.jks destkeystore caRoot.p12 deststoretype pkcs12 openssl pkcs12 in caRoot.p12 out caRoot.pem 2. 修改setting.py文件。(calocation仅在ssl连接时需要配置) kafkasetting { 'bootstrapservers': 'XXX', 'topicname': 'XXX', 'groupname': 'XXX' } 生产消息 发送以下命令发送消息。 python kafkaproducer.py 生产消息示例代码如下: from confluentkafka import Producer import setting conf setting.kafkasetting """初始化一个 Producer 对象""" p Producer({'bootstrap.servers': conf['bootstrapservers']}) def deliveryreport(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) """异步发送消息""" p.produce(conf['topicname'], "Hello".encode('utf8'), callbackdeliveryreport) p.poll(0) """在程序结束时,调用flush""" p.flush() 消费消息 发送以下命令消费消息。 python kafkaconsumer.py 消费消息示例代码如下: from confluentkafka import Consumer, KafkaError import setting conf setting.kafkasetting c Consumer({ 'bootstrap.servers': conf['bootstrapservers'], 'group.id': conf['groupname'], 'auto.offset.reset': 'latest' }) c.subscribe([conf['topicname']]) while True: msg c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() KafkaError.PARTITIONEOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf8'))) c.close()