实战Kafka ACL机制

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
简介: 1.概述   在Kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。

1.概述

  在Kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。拿到Kafka Broker地址后,连接到Kafka集群,就可以操作集群上的所有主题了。由于没有权限控制,集群核心的业务主题时存在风险的。

2.内容

2.2 身份认证

  Kafka的认证范围包含如下:

  • Client与Broker之间
  • Broker与Broker之间
  • Broker与Zookeeper之间

  当前Kafka系统支持多种认证机制,如SSL、SASL(Kerberos、PLAIN、SCRAM)。

2.3  SSL认证流程

  在Kafka系统中,SSL协议作为认证机制默认是禁止的,如果需要使用,可以手动启动SSL机制。安装和配置SSL协议的步骤,如下所示:

  1. 在每个Broker中Create一个Tmp密钥库
  2. 创建CA
  3. 给证书签名
  4. 配置Server和Client

  执行脚本如下所示:

#! /bin/bash

# 1.Create rsa
keytool -keystore server.keystore.jks -alias dn1 -validity 365 -genkey -keyalg RSA
# 2.Create CA
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
# 3.Import client
keytool -keystore client.truststore.jks -alias CAROOT -import -file ca-cert
# 4.Import server
keytool -keystore server.truststore.jks -alias CAROOT -import -file ca-cert
# 5.Export
keytool -keystore server.keystore.jks -alias dn1 -certreq -file cert-file
# 6.Signed
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:123456
# 7.Import ca-cert
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
# 8.Import cert-signed
keytool -keystore server.keystore.jks -alias dn1 -import -file cert-signed

2.4 SASL认证流程

  在Kafka系统中,SASL机制包含三种,它们分别是Kerberos、PLAIN、SCRAM。以PLAIN认证为示例,下面给大家介绍PLAIN认证流程。

2.4.1  配置Server

  首先,在$KAFKA_HOME/config目录中新建一个文件,名为kafka_server_jaas.conf,配置内容如下:

KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="smartloli"
   password="smartloli-secret"
   user_admin="smartloli-secret";
};

Client {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="smartloli"
   password="smartloli-secret";
};

  然后在Kafka启动脚本(kafka-server-start.sh)中添加配置文件路径,设置内容如下:

[hadoop@dn1 bin]$ vi kafka-server-start.sh

# Add jaas file
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/soft/new/kafka/config/kafka_server_jaas.conf"

  接下来,配置server.properties文件,内容如下:

# Set ip & port
listeners=SASL_PLAINTEXT://dn1:9092
advertised.listeners=SASL_PLAINTEXT://dn1:9092
# Set protocol
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

# Add acl
allow.everyone.if.no.acl.found=true
auto.create.topics.enable=false
delete.topic.enable=true
advertised.host.name=dn1
super.users=User:admin

# Add class
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

2.4.2 配置Client

  当Kafka Server端配置启用了SASL/PLAIN,那么Client连接的时候需要配置认证信息,Client配置一个kafka_client_jaas.conf文件,内容如下:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret";
};

  然后,在producer.properties和consumer.properties文件中设置认证协议,内容如下:

security.protocol=SASL_PLAINTEXT 
sasl.mechanism=PLAIN

  最后,在kafka-console-producer.sh脚本和kafka-console-producer.sh脚本中添加JAAS文件的路径,内容如下:

# For example: kafka-console-producer.sh
hadoop@dn1 bin]$ vi kafka-console-producer.sh

# Add jaas file
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/soft/new/kafka\
/config/kafka_client_jaas.conf"

2.5 ACL操作

  在配置好SASL后,启动Zookeeper集群和Kafka集群之后,就可以使用kafka-acls.sh脚本来操作ACL机制。

  (1)查看:在kafka-acls.sh脚本中传入list参数来查看ACL授权新

[hadoop@dn1 bin]$ kafka-acls.sh --list --authorizer-properties zookeeper.connect=dn1:2181

  (2)创建:创建待授权主题之前,在kafka-acls.sh脚本中指定JAAS文件路径,然后在执行创建操作

[hadoop@dn1 bin]$ kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 1 --partitions 1 --topic kafka_acl_topic

  (3)生产者授权:对生产者执行授权操作

[hadoop@dn1 ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=dn1:2181 --add --allow-principalUser:producer --operation Write --topic kafka_acl_topic

  (4)消费者授权:对生产者执行授权后,通过消费者来进行验证

[hadoop@dn1 ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=dn1:2181 --add --allow-principalUser:consumer --operation Read --topic kafka_acl_topic

  (5)删除:通过remove参数来回收相关权限

[hadoop@dn1 bin]$ kafka-acls.sh --authorizer-properties zookeeper.connect=dn1:2181 --remove --allow-principal User:producer --operation Write --topic kafka_acl_topic3

3.总结

  在处理一些核心的业务数据时,Kafka的ACL机制还是非常重要的,对核心业务主题进行权限管控,能够避免不必要的风险。

4.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

联系方式:
邮箱:smartloli.org@gmail.com
Twitter: https://twitter.com/smartloli
QQ群(Hadoop - 交流社区1): 424769183
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢!

热爱生活,享受编程,与君共勉!


作者:哥不是小萝莉 [关于我][犒赏

出处:http://www.cnblogs.com/smartloli/

转载请注明出处,谢谢合作!

目录
相关文章
|
3月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
233 7
|
6月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
507 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
6月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
224 12
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
682 5
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
206 3
|
消息中间件 Java 大数据
Kafka ISR机制详解!
本文详细解析了Kafka的ISR(In-Sync Replicas)机制,阐述其工作原理及如何确保消息的高可靠性和高可用性。ISR动态维护与Leader同步的副本集,通过不同ACK确认机制(如acks=0、acks=1、acks=all),平衡可靠性和性能。此外,ISR机制支持故障转移,当Leader失效时,可从ISR中选取新的Leader。文章还包括实例分析,展示了ISR在不同场景下的变化,并讨论了其优缺点,帮助读者更好地理解和应用ISR机制。
1000 0
Kafka ISR机制详解!
|
消息中间件 负载均衡 Java
揭秘Kafka背后的秘密!Kafka 架构设计大曝光:深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理及流传输设计的高效率消息系统。其核心特性包括高吞吐量、低延迟及出色的可扩展性。Kafka采用分布式日志模型,支持数据分区与副本,确保数据可靠性和持久性。系统由Producer(消息生产者)、Consumer(消息消费者)及Broker(消息服务器)组成。Kafka支持消费者组,实现数据并行处理,提升整体性能。通过内置的故障恢复机制,即使部分节点失效,系统仍能保持稳定运行。提供的Java示例代码展示了如何使用Kafka进行消息的生产和消费,并演示了故障转移处理过程。
199 3
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
1328 0
下一篇
oss云网关配置