连接已开启SASL的Kafka实例 命令行模式连接实例 以下操作命令以Linux系统为例进行说明。 步骤 1 在客户端所在主机的“/etc/hosts”文件中配置host和IP的映射关系,以便客户端能够快速解析实例的Broker。 其中,IP地址必须为实例连接地址(从前提条件获取的连接地址),host为每个实例主机的名称(主机的名称由您自行设置,但不能重复)。 例如: 10.154.48.120 server01 10.154.48.121 server02 10.154.48.122 server03 步骤 2 解压Kafka命令行工具。 进入文件压缩包所在目录,然后执行以下命令解压文件。 tar zxf [kafkatar] 其中, [kafkatar] 表示命令行工具的压缩包名称。 例如: tar zxf kafka2.122.7.2.tgz 步骤 3 根据SASL认证机制,修改Kafka命令行工具配置文件。 1、PLAIN机制: 在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。 sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required username"" password""; sasl.mechanismPLAINsecurity.protocolSASLSSL ssl.truststore.location{ssltruststorepath} ssl.truststore.passworddms@kafka ssl.endpoint.identification.algorithm 参数说明: username和password为创建Kafka实例过程中开启SASLSSL时填入的用户名和密码,或者创建SASLSSL用户时设置的用户名和密码。 ssl.truststore.location配置为client.jks证书的存放路径。注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中复制路径时的“”,否则客户端获取证书失败。 ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka 。 ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要 保持关闭状态,必须设置为空 。 2、SCRAMSHA512机制: 在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。 sasl.jaas.configorg.apache.kafka.common.security.scram.ScramLoginModule required username"" password""; sasl.mechanismSCRAMSHA512 security.protocolSASLSSL ssl.truststore.location{ssltruststorepath} ssl.truststore.passworddms@kafka ssl.endpoint.identification.algorithm 参数说明: username和password为创建Kafka实例过程中开启SASLSSL时填入的用户名和密码,或者创建SASLSSL用户时设置的用户名和密码。 ssl.truststore.location配置为client.jks证书的存放路径。注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中复制路径时的“”,否则客户端获取证书失败。 ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka 。 ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要 保持关闭状态,必须设置为空 。 步骤 4 进入Kafka命令行工具的“/bin”目录下。 注意,Windows系统下需要进入“/bin/windows”目录下。 步骤 5 执行如下命令进行生产消息。 ./kafkaconsoleproducer.sh brokerlist {连接地址} topic {Topic名称} producer.config ../config/producer.properties 参数说明如下: 连接地址:从前提条件获取的连接地址,如果是公网访问,请使用“公网连接地址”,如果是VPC内访问,请使用“内网连接地址”,请根据实际情况选择。 Topic名称:Kafka实例下创建的Topic名称。如果Kafka实例开启了自动创建Topic功能,此参数值可以填写已创建的Topic名称,也可以填写未创建的Topic名称。 本文以公网访问为例,Kafka实例连接地址为“10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095”。 执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。 [root@ecskafka bin]