【Kafka SASL/SCRAM动态认证集群部署

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 【Kafka SASL/SCRAM动态认证集群部署

目的:配置 SASL/PLAIN 验证,实现了对 Kafka 的权限控制。但 SASL/PLAIN 验证有一个问题:只能在 JAAS 文件 KafkaServer 中配置用户,一旦 Kafka 启动,无法动态新增用户。SASL/SCRAM 验证可以动态新增用户并分配权限。

1. 服务端配置

1、解压安装包

tar -zxvf kafka_2.11-2.4.1.tgz -C /home/xyp9x/

2、改名

mv kafka_2.11-2.4.1 kafka_scram

3、在 kafka_scram 目录下创建 logs、kafka-logs 文件夹

mkdir logs kafka-logs

4、修改配置文件

vi server.properties
将原内容全部清空,输入以下内容:
#broker的全局唯一编号,不能重复
broker.id=0
#是否允许删除topic
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的最大缓冲区大小
socket.request.max.bytes=104857600
#kafka数据存放的路径
log.dirs=/home/xyp9x/kafka_scram/kafka-logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=bigdata111:2181,bigdata112:2181,bigdata113:2181
#kafka连接zookeeper超时时间90s
zookeeper.connection.timeout.ms=90000

5、配置环境变量

vi /etc/profile
# Kafka_scram
export KAFKA_HOME=/home/xyp9x/kafka_scram
export PATH=$PATH:$KAFKA_HOME/bin

6、分发

rsync -r /etc/profile bigdata112:/etc/
rsync -r /etc/profile bigdata113:/etc/
rsync -r kafka_scram bigdata112:/home/xyp9x/
rsync -r kafka_scram bigdata113:/home/xyp9x/

7)分别在 112 和 113 上修改配置文件

vi server.properties
broker.id=1
vi server.properties
broker.id=2

8)更新 111、112、113 的环境变量

source /etc/profile

9、启动集群(依次在 111、112、113 节点上启动 kafka)

bin/kafka-server-start.sh -daemon config/server.properties

10、创建 SCRAM 证书(SCRAM-SHA-256/SCRAM-SHA-512 是对密码加密的算法,二者有其一即可)

此方法是把凭证(credential)存储在 Zookeeper,可以使用 kafka-configs.sh 在 Zookeeper 中创建凭据。对于每个 SCRAM 机制,必须添加具有机制名称的配置来创建凭证,在启动 Kafka broker 之前创建代理间通信的凭据。所以第一步,在没有设置任何权限的配置下启动 Kafka。

bin/kafka-configs.sh --zookeeper x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --alter --entity-type users --entity-name admin --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]'

11、查看 SCRAM 证书

bin/kafka-configs.sh --zookeeper x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --describe --entity-type users --entity-name admin

12、在 config 目录中创建 kafka_server_jaas.conf 文件

vi kafka_server_jaas.conf
KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin";
};

13、修改配置文件

vi server.properties
在原有基础上添加以下内容:
#认证配置
listeners=SASL_PLAINTEXT://:9092
#PLAINTEXT不加密明文传输, 省事, 性能也相对好
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
#ACL配置
#设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问
#设置为false,ACL机制为白名单机制,只有白名单中的用户可以访问
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

14、在 Kafka 启动脚本中添加配置文件路径

vi kafka-server-start.sh
#!/bin/bash
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/xyp9x/kafka_scram/config/kafka_server_jaas.conf"

15、分发

rsync -r config/kafka_server_jaas.conf bigdata112:/home/xyp9x/kafka_scram/config/
rsync -r config/kafka_server_jaas.conf bigdata113:/home/xyp9x/kafka_scram/config/
rsync -r config/server.properties bigdata112:/home/xyp9x/kafka_scram/config/
rsync -r config/server.properties bigdata113:/home/xyp9x/kafka_scram/config/
rsync -r bin/kafka-server-start.sh bigdata112:/home/xyp9x/kafka_scram/bin/
rsync -r bin/kafka-server-start.sh bigdata113:/home/xyp9x/kafka_scram/bin/

16、分别在 112 和 113 上修改配置文件

vi server.properties
broker.id=1
vi server.properties
broker.id=2

2. 客户端配置

1、为 admin 用户创建一个 jaas 文件

vi kafka_client_jaas_admin.conf
KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin";
};

2、在 producer.properties 文件中添加认证协议

vi producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256

3、在 consumer.properties 文件中添加认证协议

vi consumer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256

4、在 kafka-console-producer.sh 脚本中添加 JAAS 文件的路径

vi kafka-console-producer.sh
#!/bin/bash
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/xyp9x/kafka_scram/config/kafka_client_jaas_admin.conf"

5、在 kafka-console-consumer.sh 脚本中添加 JAAS 文件的路径

vi kafka-console-consumer.sh
#!/bin/bash
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/xyp9x/kafka_scram/config/kafka_client_jaas_admin.conf"

6、分发

rsync -r config/kafka_client_jaas_admin.conf bigdata112:/home/xyp9x/kafka_scram/config/
rsync -r config/kafka_client_jaas_admin.conf bigdata113:/home/xyp9x/kafka_scram/config/
rsync -r config/producer.properties bigdata112:/home/xyp9x/kafka_scram/config/
rsync -r config/producer.properties bigdata113:/home/xyp9x/kafka_scram/config/
rsync -r config/consumer.properties bigdata112:/home/xyp9x/kafka_scram/config/
rsync -r config/consumer.properties bigdata113:/home/xyp9x/kafka_scram/config/
rsync -r bin/kafka-console-producer.sh bigdata112:/home/xyp9x/kafka_scram/bin/
rsync -r bin/kafka-console-producer.sh bigdata113:/home/xyp9x/kafka_scram/bin/
rsync -r bin/kafka-console-consumer.sh bigdata112:/home/xyp9x/kafka_scram/bin/
rsync -r bin/kafka-console-consumer.sh bigdata113:/home/xyp9x/kafka_scram/bin/

7、启动 kafka-broker

bin/kafka-server-start.sh -daemon config/server.properties

8、创建 topic

bin/kafka-topics.sh --zookeeper x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --create --partitions 3 --replication-factor 3 --topic test

9、超级用户启动生产者

bin/kafka-console-producer.sh --broker-list x.x.1.111:9092,x.x.1.112:9092,x.x.1.113:9092 --topic test --producer.config config/producer.properties

10、超级用户启动消费者

bin/kafka-console-consumer.sh --bootstrap-server x.x.1.111:9092,x.x.1.112:9092,x.x.1.113:9092 --topic test --consumer.config config/consumer.properties

3. ACL 操作(在配置好 SASL 后,启动 Zookeeper 集群和 Kafka 集群之后,就可以使用 kafka-acls.sh 脚本来操作 ACL 机制)

1、创建普通用户 SCRAM 证书

bin/kafka-configs.sh --zookeeper x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --alter --entity-type users --entity-name xyp9x --add-config 'SCRAM-SHA-256=[iterations=8192,password=xyp9x],SCRAM-SHA-512=[password=xyp9x]'

2、查看 SCRAM 证书

bin/kafka-configs.sh --zookeeper x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --describe --entity-type users --entity-name xyp9x

3、删除 SCRAM 证书

bin/kafka-configs.sh --zookeeper x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --alter --delete-config 'SCRAM-SHA-512' --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name xyp9x

4、查看 ACL 授权

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --list

5、对普通用户生产者授写权限

//允许xyp9x用户从所有IP地址写
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --add --allow-principal User:xyp9x --operation Write --topic test
//禁止xyp9x用户从所有IP地址写
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --remove --allow-principal User:xyp9x --operation Write --topic test
//允许xyp9x用户从x.x.1.111和x.x.1.112写
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --add --allow-principal User:xyp9x --allow-host x.x.1.111 --allow-host x.x.1.112 --operation Write --topic test
//禁止xyp9x用户从x.x.1.111和x.x.1.112写
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --remove --allow-principal User:xyp9x --allow-host x.x.1.111 --allow-host x.x.1.112 --operation Write --topic test

6、对普通用户消费者组授权

//允许xyp9x用户使用test-consumer-group消费组消费消息
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --add --allow-principal User:xyp9x --operation Read --group test-consumer-group
//禁止xyp9x用户使用test-consumer-group消费组消费消息
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --remove --allow-principal User:xyp9x --operation Read --group test-consumer-group

7、对普通用户消费者授读权限

//允许xyp9x用户从所有IP地址读
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --add --allow-principal User:xyp9x --operation Read --topic test
//禁止xyp9x用户从所有IP地址读
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --remove --allow-principal User:xyp9x --operation Read --topic test
//允许xyp9x用户从x.x.1.111和x.x.1.112读
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --add --allow-principal User:xyp9x --allow-host x.x.1.111 --allow-host x.x.1.112 --operation Read --topic test
//禁止xyp9x用户从x.x.1.111和x.x.1.112读
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --remove --allow-principal User:xyp9x --allow-host x.x.1.111 --allow-host x.x.1.112 --operation Read --topic test

8、总结

SASL/SCRAM验证方法可以在Kafka服务启动之后,动态的新增用户分并配权限,在业务变动频繁,开发人员多的
相关文章
|
7月前
|
消息中间件 运维 Kafka
kafka使用SASL认证
kafka使用SASL认证
116 0
|
4月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
153 4
|
4月前
|
消息中间件 Kafka
一文吃透企业级elk技术栈:4. kafka 集群部署
一文吃透企业级elk技术栈:4. kafka 集群部署
|
4月前
|
消息中间件 安全 Kafka
Flink与Kafka的终极联盟:揭秘如何在一瞬间切换SASL机制,保护您的数据不受黑客侵袭!
【8月更文挑战第7天】Apache Flink作为高性能流处理框架,在与Kafka集成时确保数据安全至关重要。通过配置`KafkaConsumer`使用SASL机制如SCRAM-SHA-256或PLAIN,可有效防止未授权访问。SCRAM-SHA-256采用强化的身份验证流程提高安全性,而PLAIN机制则相对简单。配置涉及设置`properties`参数,包括指定`sasl.mechanism`、`security.protocol`及JAAS认证信息。合理选择和配置这些参数对于保护Flink应用与Kafka间的数据通信安全至关重要。
82 0
|
5月前
|
消息中间件 Kubernetes Kafka
AutoMQ 产品动态 | 发布 1.1.0,兼容至 Apache Kafka 3.7,支持 Kaf
AutoMQ 产品动态 | 发布 1.1.0,兼容至 Apache Kafka 3.7,支持 Kaf
72 0
AutoMQ 产品动态 | 发布 1.1.0,兼容至 Apache Kafka 3.7,支持 Kaf
|
消息中间件 JSON 负载均衡
kafka 动态扩容现有 topic 的分区数和副本数
kafka 动态扩容现有 topic 的分区数和副本数
1824 0
|
消息中间件 Kafka Linux
115 Kafka集群部署
115 Kafka集群部署
78 0
|
7月前
|
消息中间件 存储 Java
分布式实时消息队列Kafka(二)Kafka分布式集群部署
分布式实时消息队列Kafka(二)Kafka分布式集群部署
213 0
|
消息中间件 Java Kafka
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
977 1
|
消息中间件 Java Kafka
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
1452 0
下一篇
无影云桌面