Kafka SASL集群部署

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Kafka SASL集群部署

在Kafka中,SASL机制包含三种,它们分别是Kerberos、PLAIN、SCRAM。以PLAIN认证为示例

1.配置Server

1)解压安装包

tar -zxvf kafka_2.11-2.4.1.tgz -C /home/xyp9x/

2)改名

mv kafka_2.11-2.4.1 kafka_sasl

2)在kafka目录下创建logs、kafka-logs文件夹

mkdir logs kafka-logs

3)config目录中创建kafka_server_jaas.conf文件,前三行是配置管理员账户(该账户与上面server.properties中配置的super.users一样)后面的即 user_用户名="该用户的密码"。之后配置ACL的时候需要用到这里配置的用户

vi kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_xyp9x="xyp9x";
};

4)修改配置文件

vi server.properties
将原内容全部清空,输入以下内容:
#broker的全局唯一编号,不能重复
broker.id=0
#是否允许删除topic
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的最大缓冲区大小
socket.request.max.bytes=104857600
#kafka数据存放的路径
log.dirs=/home/xyp9x/kafka_sasl/kafka-logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=bigdata111:2181,bigdata112:2181,bigdata113:2181
#kafka连接zookeeper超时时间90s
zookeeper.connection.timeout.ms=90000
#SASL_PLAINTEXT 
#这里的listener中的hostname在三台机器上换成每台机器对应的hostname:ip
listeners=SASL_PLAINTEXT://x.x.1.111:9092
#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
#完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#如果没有找到ACL(访问控制列表)配置,则允许任何操作。
#设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问
#默认为false,ACL机制为白名单机制,只有白名单中的用户可以访问
#allow.everyone.if.no.acl.found=true
#设置admin超级用户
super.users=User:admin

5)然后在Kafka启动脚本中添加配置文件路径

vi kafka-server-start.sh
#!/bin/bash
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/xyp9x/kafka_sasl/config/kafka_server_jaas.conf"

6)配置环境变量

vi /etc/profile
# Kafka_sasl
export KAFKA_HOME=/home/xyp9x/kafka_sasl
export PATH=$PATH:$KAFKA_HOME/bin

7)分发

rsync -r /etc/profile bigdata112:/etc/
rsync -r /etc/profile bigdata113:/etc/
rsync -r kafka_sasl bigdata112:`pwd`
rsync -r kafka_sasl bigdata113:`pwd`

8)分别在112和113上修改配置文件

vi server.properties
broker.id=1
listeners=SASL_PLAINTEXT://x.x.1.112:9092
vi server.properties
broker.id=2
listeners=SASL_PLAINTEXT://x.x.1.113:9092

9)更新111、112、113的环境变量

source /etc/profile
2.配置Client(当Kafka Server端配置启用了SASL/PLAIN,那么Client连接的时候需要配置认证信息)

1)在config目录创建kafka_client_jaas.conf

vi kafka_client_jaas.conf
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin";
};

2)在producer.properties和consumer.properties文件中设置认证协议

vi producer.properties
security.protocol=SASL_PLAINTEXT 
sasl.mechanism=PLAIN
vi consumer.properties
security.protocol=SASL_PLAINTEXT 
sasl.mechanism=PLAIN

3)在kafka-console-producer.sh脚本和kafka-console-consumer.sh脚本中添加JAAS文件的路径

vi kafka-console-producer.sh
#!/bin/bash
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/xyp9x/kafka_sasl/config/kafka_client_jaas.conf"
vi kafka-console-consumer.sh
#!/bin/bash
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/xyp9x/kafka_sasl/config/kafka_client_jaas.conf"

4)分发

rsync -r config/kafka_client_jaas.conf bigdata112:/home/xyp9x/kafka_sasl/config/
rsync -r config/kafka_client_jaas.conf bigdata113:/home/xyp9x/kafka_sasl/config/
rsync -r config/producer.properties bigdata112:/home/xyp9x/kafka_sasl/config/
rsync -r config/producer.properties bigdata113:/home/xyp9x/kafka_sasl/config/
rsync -r config/consumer.properties bigdata112:/home/xyp9x/kafka_sasl/config/
rsync -r config/consumer.properties bigdata113:/home/xyp9x/kafka_sasl/config/
rsync -r bin/kafka-console-producer.sh bigdata112:/home/xyp9x/kafka_sasl/bin/
rsync -r bin/kafka-console-producer.sh bigdata113:/home/xyp9x/kafka_sasl/bin/
rsync -r bin/kafka-console-consumer.sh bigdata112:/home/xyp9x/kafka_sasl/bin/
rsync -r bin/kafka-console-consumer.sh bigdata113:/home/xyp9x/kafka_sasl/bin/

5)以安全认证的方式启动 kafka-server:

bin/kafka-server-start.sh -daemon config/server.properties

6)创建一个 topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test

7)以安全认证的方式启动 kafka-producer

bin/kafka-console-producer.sh --broker-list x.x.1.111:9092,x.x.1.112:9092,x.x.1.113:9092 --topic test --producer.config config/producer.properties

8)以安全认证的方式启动 kafka-consumer

bin/kafka-console-consumer.sh --bootstrap-server x.x.1.111:9092,x.x.1.112:9092,x.x.1.113:9092 --topic test --consumer.config config/consumer.properties
3.ACL操作(在配置好SASL后,启动Zookeeper集群和Kafka集群之后,就可以使用kafka-acls.sh脚本来操作ACL机制)

1)查看:在kafka-acls.sh脚本中传入list参数来查看ACL授权

bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181

2)创建:创建待授权主题之前,在kafka-acls.sh脚本中指定JAAS文件路径,然后在执行创建操作

bin/kafka-topics.sh --create --zookeeper x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --replication-factor 3 --partitions 3 --topic kafka_acl_topic

3)生产者授权:对生产者执行授权操作

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --add --allow-principal User:producer --operation Write --topic kafka_acl_topic

4)消费者授权:对生产者执行授权后,通过消费者来进行验证

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --add --allow-principal User:consumer --operation Read --topic kafka_acl_topic

5)删除:通过remove参数来回收生产者权限

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1.113:2181 --remove --allow-principal User:producer --operation Write --topic kafka_acl_topic

6)删除:通过remove参数来回收消费者权限

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.1.111:2181,x.x.1.112:2181,x.x.1
相关文章
|
6月前
|
消息中间件 运维 Kafka
kafka使用SASL认证
kafka使用SASL认证
116 0
|
3月前
|
消息中间件 Kafka
一文吃透企业级elk技术栈:4. kafka 集群部署
一文吃透企业级elk技术栈:4. kafka 集群部署
|
3月前
|
消息中间件 安全 Kafka
Flink与Kafka的终极联盟:揭秘如何在一瞬间切换SASL机制,保护您的数据不受黑客侵袭!
【8月更文挑战第7天】Apache Flink作为高性能流处理框架,在与Kafka集成时确保数据安全至关重要。通过配置`KafkaConsumer`使用SASL机制如SCRAM-SHA-256或PLAIN,可有效防止未授权访问。SCRAM-SHA-256采用强化的身份验证流程提高安全性,而PLAIN机制则相对简单。配置涉及设置`properties`参数,包括指定`sasl.mechanism`、`security.protocol`及JAAS认证信息。合理选择和配置这些参数对于保护Flink应用与Kafka间的数据通信安全至关重要。
82 0
|
消息中间件 Kafka Linux
115 Kafka集群部署
115 Kafka集群部署
78 0
|
6月前
|
消息中间件 存储 Java
分布式实时消息队列Kafka(二)Kafka分布式集群部署
分布式实时消息队列Kafka(二)Kafka分布式集群部署
213 0
|
消息中间件 Java Kafka
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
1451 0
|
消息中间件 存储 分布式计算
Kafka集群部署
Kafka集群部署
Kafka集群部署
|
消息中间件 Kafka Apache
Kafka3.0集群部署(内附zookeeper3.5.7集群部署)
Kafka3.0集群部署(内附zookeeper3.5.7集群部署)
200 0
|
消息中间件 存储 安全
基于SASL和ACL的Kafka安全性解析
本文主要介绍基于SCRAM进行身份验证,使用Kafka ACL进行授权,SSL进行加密以及使用camel-Kafka连接Kafka群集以使用camel路由生产和消费消息的过程。
448 0
|
消息中间件 存储 分布式计算
消息队列KafKa的集群部署
🍅程序员小王的博客:程序员小王的博客 🍅 欢迎点赞 👍 收藏 ⭐留言 📝 🍅 如有编辑错误联系作者,如果有比较好的文章欢迎分享给我,我会取其精华去其糟粕 🍅java自学的学习路线:java自学的学习路线
311 0
消息队列KafKa的集群部署
下一篇
无影云桌面