在Kafka中,SASL机制包含三种,它们分别是Kerberos、PLAIN、SCRAM。以PLAIN认证为示例
1.配置Server
1)解压安装包
tar -zxvf kafka_2.11-2.4.1.tgz -C /home/xyp9x/
2)改名
mv kafka_2.11-2.4.1 kafka_sasl
2)在kafka目录下创建logs、kafka-logs文件夹
mkdir logs kafka-logs
3)config目录中创建kafka_server_jaas.conf文件,前三行是配置管理员账户(该账户与上面server.properties中配置的super.users一样)后面的即 user_用户名="该用户的密码"。之后配置ACL的时候需要用到这里配置的用户
vi kafka_server_jaas.conf KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_xyp9x="xyp9x"; };
4)修改配置文件
vi server.properties 将原内容全部清空,输入以下内容: #broker的全局唯一编号,不能重复 broker.id=0 #是否允许删除topic delete.topic.enable=true #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘IO的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的最大缓冲区大小 socket.request.max.bytes=104857600 #kafka数据存放的路径 log.dirs=/home/xyp9x/kafka_sasl/kafka-logs #topic在当前broker上的分区个数 num.partitions=1 #用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 #segment文件保留的最长时间,超时将被删除 log.retention.hours=168 #配置连接Zookeeper集群地址 zookeeper.connect=bigdata111:2181,bigdata112:2181,bigdata113:2181 #kafka连接zookeeper超时时间90s zookeeper.connection.timeout.ms=90000 #SASL_PLAINTEXT #这里的listener中的hostname在三台机器上换成每台机器对应的hostname:ip listeners=SASL_PLAINTEXT://x.x.1.111:9092 #使用的认证协议 security.inter.broker.protocol=SASL_PLAINTEXT #SASL机制 sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN #完成身份验证的类 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer #如果没有找到ACL(访问控制列表)配置,则允许任何操作。 #设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问 #默认为false,ACL机制为白名单机制,只有白名单中的用户可以访问 #allow.everyone.if.no.acl.found=true #设置admin超级用户 super.users=User:admin
5)然后在Kafka启动脚本中添加配置文件路径
vi kafka-server-start.sh #!/bin/bash export KAFKA_OPTS="-Djava.security.auth.login.config=/home/xyp9x/kafka_sasl/config/kafka_server_jaas.conf"
6)配置环境变量
vi /etc/profile # Kafka_sasl export KAFKA_HOME=/home/xyp9x/kafka_sasl export PATH=$PATH:$KAFKA_HOME/bin
7)分发
rsync -r /etc/profile bigdata112:/etc/ rsync -r /etc/profile bigdata113:/etc/ rsync -r kafka_sasl bigdata112:`pwd` rsync -r kafka_sasl bigdata113:`pwd`
8)分别在112和113上修改配置文件
vi server.properties broker.id=1 listeners=SASL_PLAINTEXT://x.x.1.112:9092 vi server.properties broker.id=2 listeners=SASL_PLAINTEXT://x.x.1.113:9092
9)更新111、112、113的环境变量
source /etc/profile
2.配置Client(当Kafka Server端配置启用了SASL/PLAIN,那么Client连接的时候需要配置认证信息)
1)在config目录创建kafka_client_jaas.conf
vi kafka_client_jaas.conf KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; };
2)在producer.properties和consumer.properties文件中设置认证协议
vi producer.properties security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN vi consumer.properties security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
3)在kafka-console-producer.sh脚本和kafka-console-consumer.sh脚本中添加JAAS文件的路径
vi kafka-console-producer.sh #!/bin/bash export KAFKA_OPTS="-Djava.security.auth.login.config=/home/xyp9x/kafka_sasl/config/kafka_client_jaas.conf" vi kafka-console-consumer.sh #!/bin/bash export KAFKA_OPTS="-Djava.security.auth.login.config=/home/xyp9x/kafka_sasl/config/kafka_client_jaas.conf"
4)分发
rsync -r config/kafka_client_jaas.conf bigdata112:/home/xyp9x/kafka_sasl/config/ rsync -r config/kafka_client_jaas.conf bigdata113:/home/xyp9x/kafka_sasl/config/ rsync -r config/producer.properties bigdata112:/home/xyp9x/kafka_sasl/config/ rsync -r config/producer.properties bigdata113:/home/xyp9x/kafka_sasl/config/ rsync -r config/consumer.properties bigdata112:/home/xyp9x/kafka_sasl/config/ rsync -r config/consumer.properties bigdata113:/home/xyp9x/kafka_sasl/config/ rsync -r bin/kafka-console-producer.sh bigdata112:/home/xyp9x/kafka_sasl/bin/ rsync -r bin/kafka-console-producer.sh bigdata113:/home/xyp9x/kafka_sasl/bin/ rsync -r bin/kafka-console-consumer.sh bigdata112:/home/xyp9x/kafka_sasl/bin/ rsync -r bin/kafka-console-consumer.sh bigdata113:/home/xyp9x/kafka_sasl/bin/
5)以安全认证的方式启动 kafka-server:
bin/kafka-server-start.sh -daemon config/server.properties
6)创建一个 topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
7)以安全认证的方式启动 kafka-producer
bin/kafka-console-producer.sh --broker-list x.x.1.111:9092,x.x.1.112:9092,x.x.1.113:9092 --topic test --producer.config config/producer.properties
8)以安全认证的方式启动 kafka-consumer
bin/kafka-console-consumer.sh --bootstrap-server x.x.1.111:9092,x.x.1.112:9092,x.x.1.113:9092 --topic test --consumer.config config/consumer.properties
3.ACL操作(在配置好SASL后,启动Zookeeper集群和Kafka集群之后,就可以使用kafka-acls.sh脚本来操作ACL机制)
1)查看:在kafka-acls.sh脚本中传入list参数来查看ACL授权
bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181
2)创建:创建待授权主题之前,在kafka-acls.sh脚本中指定JAAS文件路径,然后在执行创建操作
bin/kafka-topics.sh --create --zookeeper x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --replication-factor 3 --partitions 3 --topic kafka_acl_topic
3)生产者授权:对生产者执行授权操作
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --add --allow-principal User:producer --operation Write --topic kafka_acl_topic
4)消费者授权:对生产者执行授权后,通过消费者来进行验证
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --add --allow-principal User:consumer --operation Read --topic kafka_acl_topic
5)删除:通过remove参数来回收生产者权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --remove --allow-principal User:producer --operation Write --topic kafka_acl_topic
6)删除:通过remove参数来回收消费者权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1