kafka提供了多种安全认证机制,主要分为SSL和SASL 2大类。其中SASL/PLAIN是基于账号密码的认证方式,比较常用。下面将介绍配置SASL_PLAIN的具体步骤。
本文选择的kafka版本是3.5.0, zookeeper 是kafka内置的版本。
Zookeeper 启动SASL 配置
- 在zoo.cfg中启动SASL支持
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000 zookeeper.sasl.client=true
- 添加Zookeeper账号认证信息: 新建/etc/zoo_jaas.conf,username和password是zk集群之间的认证用户密码,user_kafka定义用户“kafka”
,用于一个客户端访问
Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="<ADMIN_PASSWORD>" user_kafka="<KAFKA_PASSWORD>"; };
- KAFKA_HEAP_OPTS 中添加jaas配置参数, 启动Zookeeper:
KAFKA_HEAP_OPTS="${KAFKA_HEAP_OPTS} -Djava.security.auth.login.config=/etc/zoo_jaas.conf" zookeeper-server-start.sh /XXX/zoo.cfg
kafka SASK配置和启动
- 在kafka 配置server.properties文件中开启SASL认证
listeners=INNER://:9092,EXTERNAL://:9094 sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN allow.everyone.if.no.acl.found=false authorizer.class.name=kafka.security.authorizer.AclAuthorizer super.users=User:admin listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,INNER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT inter.broker.listener.name=INNER
- 添加kafka账号认证信息:新建 kafka_server_jaas.conf,KafkaServer是访问kafka的用户认证,包括admin,producer和consumer。Client为访问Zookeeper的用户信息,KAFKA_PASSWORD需要和Zookeeper的kafka用户保持一致。
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="<ADMIN_PASSWORD>" user_admin="<ADMIN_PASSWORD>" user_producer="<PRODUCER_PASSWORD>" user_consumer="<CONSUMER_PASSWORD>"; }; Client{ org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="<KAFKA_PASSWORD>"; };
- 修改JVM参数, 添加jaas 配置信息,并启动kafka 服务端
export KAFKA_HEAP_OPTS="${KAFKA_HEAP_OPTS} -Djava.security.auth.login.config=/XXX/kafka_server_jaas.conf" ./kafka-server-start.sh /XXX/server.properties
Kafka SASL认证功能验证和使用
- 验证Zookeeper 服务认证功能
- 创建客户端用户认证文件 client_jaas.conf
Client{ org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="KAFKA_PASSWORD"; };
- 启动Zookeeper shell脚本访问Zookeeper
export KAFKA_HEAP_OPTS='${KAFKA_HEAP_OPTS} -Djava.security.auth.login.config=/XXX/client.conf' && ./zookeeper-shell.sh localhost:2181
- 创建客户端用户认证文件 client_jaas.conf
- 验证Kafka 服务认证功能
- 创建kafka 客户端配置client.properties。其中admin用于topic创建删除查询,producer用于生产消息,consumer用于消费消息
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="<ADMIN_PASSWORD>"; security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
- 测试使用kafka 客户端脚本:
./kafka-topics.sh --bootstrap-server localhost:9092 --list --command-config /XXX/client.properties
- kafka脚本创建topic
./kafka-topics.sh --bootstrap-server localhost:9092 --create --command-config /XXX/client.properties --topic test-topic
- 创建kafka 客户端配置client.properties。其中admin用于topic创建删除查询,producer用于生产消息,consumer用于消费消息