E-MapReduce Kafka Kerberos集群授权

简介:

Kafka授权

如果没有开启Kafka认证(如Kerberos认证或者简单的用户名密码),即使开启了Kafka授权,用户也可以伪造身份访问服务。所以建议创建高安全模式(即支持Kerberos)的Kafka集群,详见Kerberos安全文档

备注:

本文的权限配置只针对E-MapReduce的高安全模式集群,即Kafka以Kerberos的方式启动。

1. 添加配置

在Kafka集群的配置管理->Kafka->配置->server.properties->自定义配置

添加如下几个参数:

key value 备注
authorizer.class.name kafka.security.auth.SimpleAclAuthorizer
super.users User:kafka User:kafka是必须的,可添加其它用户用分号(;)隔开

备注:

zookeeper.set.acl用来设置kafka在zookeeper中数据的权限,E-MapReduce集群中已经设置为true,所以上述不需要再添加该配置。该配置打开后,在Kerberos环境中,只有用户名称为kafka且通过Kerberos认证后才能执行kafka-topics.sh命令(kafka-topics.sh会直接读写/修改zookeeper中的数据)。

2. 重启Kafka集群

在Kafka集群的配置管理->HBase->操作->RESTART All Components

3. 授权(ACL)

3.1 基本概念

Kafka官方文档定义:

Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H On Resource R"

即ACL过程涉及Principal Allowed/Denied Operation Host Resource

  • Principal:用户名
安全协议 value
PLAINTEXT ANONYMOUS
SSL ANONYMOUS
SASL_PLAINTEXT mechanism为PLAIN时,用户名是client_jaas.conf指定的用户名,mechanism为GSSAPI时,用户名为client_jaas.conf指定的principal
SASL_SSL
  • Allowed/Denied: 允许/拒绝
  • Operation: 操作

Read,Write,Create,DeleteAlter,Describe,ClusterAction,AlterConfigs,DescribeConfigs,IdempotentWrite,All

  • Host: 针对的机器
  • Resource: 权限作用的资源对象

Topic, Group, Cluster, TransactionalId

Operation/Resource的一些详细对应关系,如哪些Resource支持哪些Operation的授权,详见KIP-11 - Authorization Interface

3.2 授权命令

-使用脚本 kafka-acls.sh (/usr/lib/kafka-current/bin/kafka-acls.sh) 进行授权
具体是使用方式可以直接执行 kafka-acls.sh --help进行查看。

4. 操作示例

在已经创建的E-MapReduce高安全Kafka集群的master节点上进行相关示例操作。

4.1 新建用户test

useradd test

4.2 创建topic

第一节添加配置的备注中提到zookeeper.set.acl=true,kafka-topics.sh需要在kafka账号执行,而且kafka账号下要通过Kerberos认证。

#kafka_client_jaas.conf中已经设置了kafka的Kerberos认证相关信息
export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
# zookeeper地址改成自己集群的对应地址(执行hostnamed的输出)
kafka-topics.sh --create --zookeeper emr-header-1:2181/kafka-1.0.0 --replication-factor 3 --partitions 1 --topic test

4.3 test用户执行kafka-console-producer.sh

  • 创建test用户的keytab文件,用户zookeeper/kafka的认证

    su root
    sh /usr/lib/has-current/bin/hadmin-local.sh /etc/ecm/has-conf -k /etc/ecm/has-conf/admin.keytab
    HadminLocalTool.local: #直接按回车可以看到一些命令的用法
    HadminLocalTool.local: addprinc #输入命令按回车可以看到具体命令的用法
    HadminLocalTool.local: addprinc -pw 123456 test #添加test的princippal,密码设置为123456
    HadminLocalTool.local: ktadd -k /home/test/test.keytab test #导出keytab文件,后续可使用该文件
  • 添加kafka_client_test.conf

    如文件放到/home/test/kafka_client_test.conf,内容如下:

    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="/home/test/test.keytab"
    principal="test";
       };
    
       // Zookeeper client authentication
       Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    useTicketCache=false
    serviceName="zookeeper"
    keyTab="/home/test/test.keytab"
    principal="test";
     };
  • 添加producer.conf

    如文件放到/home/test/producer.conf,内容如下:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
  • 执行kafka-console-producer.sh

    su test
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
    kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092

    由于没有设置ACL,所以上述会报错:

    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]

  • 设置ACL

    同样kafka-acls.sh也需要kafka账号执行

     su kafka
     export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
     kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Write --topic test
  • 再执行kafka-console-producer.sh

    su test
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
     kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092

    正常:

    [2018-02-28 22:25:36,178] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
    >alibaba
    >E-MapReduce
    >

4.4 test用户执行kafka-console-consumer.sh

上面成功执行kafka-console-producer.sh,并往topic里面写入一些数据后,就可以执行kafka-console-consumer.sh进行消费测试.

  • 添加consumer.conf

    如文件放到/home/test/consumer.conf,内容如下:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
  • 执行kafka-console-consumer.sh

    su test
    #kafka_client_test.conf跟上面producer使用的是一样的
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
    kafka-console-consumer.sh --consumer.config consumer.conf  --topic test --bootstrap-server emr-worker-1:9092 --group test-group  --from-beginning

    由于未设置权限,会报错:

    org.apache.kafka.common.errors.GroupAuthorizationException: 
    Not authorized to access group: test-group
  • 设置ACL

    su kafka
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
    #test-group权限
    kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --group test-group
    # topic权限
     kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --topic test
  • 再执行kafka-console-consumer.sh

    su test
    #kafka_client_test.conf跟上面producer使用的是一样的
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
    kafka-console-consumer.sh --consumer.config consumer.conf  --topic test --bootstrap-server emr-worker-1:9092 --group test-group  --from-beginning

    正常输出:

     alibaba
     E-MapReduce
目录
相关文章
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
474 4
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
493 2
|
10月前
|
消息中间件 运维 Java
搭建Zookeeper、Kafka集群
本文详细介绍了Zookeeper和Kafka集群的搭建过程,涵盖系统环境配置、IP设置、主机名设定、防火墙与Selinux关闭、JDK安装等基础步骤。随后深入讲解了Zookeeper集群的安装与配置,包括数据目录创建、节点信息设置、SASL认证配置及服务启动管理。接着描述了Kafka集群的安装,涉及配置文件修改、安全认证设置、生产消费认证以及服务启停操作。最后通过创建Topic、发送与查看消息等测试验证集群功能。全网可搜《小陈运维》获取更多信息。
889 1
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
3104 1
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
11月前
|
消息中间件 人工智能 安全
秒级灾备恢复:Kafka 2025 AI自愈集群下载及跨云Topic迁移终极教程
Apache Kafka 2025作为企业级实时数据中枢,实现五大革新:量子安全传输(CRYSTALS-Kyber抗量子加密算法)、联邦学习总线(支持TensorFlow Federated/Horizontal FL框架)、AI自愈集群(MTTR缩短至30秒内)、多模态数据处理(原生支持视频流、3D点云等)和跨云弹性扩展(AWS/GCP/Azure间自动迁移)。平台采用混合云基础设施矩阵与软件依赖拓扑设计,提供智能部署架构。安装流程涵盖抗量子安装包获取、量子密钥配置及联邦学习总线设置。
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
501 1
|
消息中间件 存储 Kafka
2024最全Kafka集群方案汇总
Apache Kafka 是一个高吞吐量、可扩展、可靠的分布式消息系统,广泛应用于数据驱动的应用场景。Kafka 支持集群架构,具备高可用性和容错性。其核心组件包括 Broker(服务器实例)、Topic(消息分类)、Partition(有序消息序列)、Producer(消息发布者)和 Consumer(消息消费者)。每个分区有 Leader 和 Follower,确保数据冗余和高可用。Kafka 2.8+ 引入了不依赖 Zookeeper 的 KRaft 协议,进一步简化了集群管理。常见的集群部署方案包括单节点和多节点集群,后者适用于生产环境以确保高可用性。
859 0
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
340 6
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
370 0
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
229 0