E-MapReduce Kafka Kerberos集群授权

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介:

Kafka授权

如果没有开启Kafka认证(如Kerberos认证或者简单的用户名密码),即使开启了Kafka授权,用户也可以伪造身份访问服务。所以建议创建高安全模式(即支持Kerberos)的Kafka集群,详见Kerberos安全文档

备注:

本文的权限配置只针对E-MapReduce的高安全模式集群,即Kafka以Kerberos的方式启动。

1. 添加配置

在Kafka集群的配置管理->Kafka->配置->server.properties->自定义配置

添加如下几个参数:

key value 备注
authorizer.class.name kafka.security.auth.SimpleAclAuthorizer
super.users User:kafka User:kafka是必须的,可添加其它用户用分号(;)隔开

备注:

zookeeper.set.acl用来设置kafka在zookeeper中数据的权限,E-MapReduce集群中已经设置为true,所以上述不需要再添加该配置。该配置打开后,在Kerberos环境中,只有用户名称为kafka且通过Kerberos认证后才能执行kafka-topics.sh命令(kafka-topics.sh会直接读写/修改zookeeper中的数据)。

2. 重启Kafka集群

在Kafka集群的配置管理->HBase->操作->RESTART All Components

3. 授权(ACL)

3.1 基本概念

Kafka官方文档定义:

Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H On Resource R"

即ACL过程涉及Principal Allowed/Denied Operation Host Resource

  • Principal:用户名
安全协议 value
PLAINTEXT ANONYMOUS
SSL ANONYMOUS
SASL_PLAINTEXT mechanism为PLAIN时,用户名是client_jaas.conf指定的用户名,mechanism为GSSAPI时,用户名为client_jaas.conf指定的principal
SASL_SSL
  • Allowed/Denied: 允许/拒绝
  • Operation: 操作

Read,Write,Create,DeleteAlter,Describe,ClusterAction,AlterConfigs,DescribeConfigs,IdempotentWrite,All

  • Host: 针对的机器
  • Resource: 权限作用的资源对象

Topic, Group, Cluster, TransactionalId

Operation/Resource的一些详细对应关系,如哪些Resource支持哪些Operation的授权,详见KIP-11 - Authorization Interface

3.2 授权命令

-使用脚本 kafka-acls.sh (/usr/lib/kafka-current/bin/kafka-acls.sh) 进行授权
具体是使用方式可以直接执行 kafka-acls.sh --help进行查看。

4. 操作示例

在已经创建的E-MapReduce高安全Kafka集群的master节点上进行相关示例操作。

4.1 新建用户test

useradd test

4.2 创建topic

第一节添加配置的备注中提到zookeeper.set.acl=true,kafka-topics.sh需要在kafka账号执行,而且kafka账号下要通过Kerberos认证。

#kafka_client_jaas.conf中已经设置了kafka的Kerberos认证相关信息
export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
# zookeeper地址改成自己集群的对应地址(执行hostnamed的输出)
kafka-topics.sh --create --zookeeper emr-header-1:2181/kafka-1.0.0 --replication-factor 3 --partitions 1 --topic test

4.3 test用户执行kafka-console-producer.sh

  • 创建test用户的keytab文件,用户zookeeper/kafka的认证

    su root
    sh /usr/lib/has-current/bin/hadmin-local.sh /etc/ecm/has-conf -k /etc/ecm/has-conf/admin.keytab
    HadminLocalTool.local: #直接按回车可以看到一些命令的用法
    HadminLocalTool.local: addprinc #输入命令按回车可以看到具体命令的用法
    HadminLocalTool.local: addprinc -pw 123456 test #添加test的princippal,密码设置为123456
    HadminLocalTool.local: ktadd -k /home/test/test.keytab test #导出keytab文件,后续可使用该文件
  • 添加kafka_client_test.conf

    如文件放到/home/test/kafka_client_test.conf,内容如下:

    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="/home/test/test.keytab"
    principal="test";
       };
    
       // Zookeeper client authentication
       Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    useTicketCache=false
    serviceName="zookeeper"
    keyTab="/home/test/test.keytab"
    principal="test";
     };
  • 添加producer.conf

    如文件放到/home/test/producer.conf,内容如下:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
  • 执行kafka-console-producer.sh

    su test
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
    kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092

    由于没有设置ACL,所以上述会报错:

    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]

  • 设置ACL

    同样kafka-acls.sh也需要kafka账号执行

     su kafka
     export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
     kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Write --topic test
  • 再执行kafka-console-producer.sh

    su test
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
     kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092

    正常:

    [2018-02-28 22:25:36,178] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
    >alibaba
    >E-MapReduce
    >

4.4 test用户执行kafka-console-consumer.sh

上面成功执行kafka-console-producer.sh,并往topic里面写入一些数据后,就可以执行kafka-console-consumer.sh进行消费测试.

  • 添加consumer.conf

    如文件放到/home/test/consumer.conf,内容如下:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
  • 执行kafka-console-consumer.sh

    su test
    #kafka_client_test.conf跟上面producer使用的是一样的
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
    kafka-console-consumer.sh --consumer.config consumer.conf  --topic test --bootstrap-server emr-worker-1:9092 --group test-group  --from-beginning

    由于未设置权限,会报错:

    org.apache.kafka.common.errors.GroupAuthorizationException: 
    Not authorized to access group: test-group
  • 设置ACL

    su kafka
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
    #test-group权限
    kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --group test-group
    # topic权限
     kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --topic test
  • 再执行kafka-console-consumer.sh

    su test
    #kafka_client_test.conf跟上面producer使用的是一样的
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
    kafka-console-consumer.sh --consumer.config consumer.conf  --topic test --bootstrap-server emr-worker-1:9092 --group test-group  --from-beginning

    正常输出:

     alibaba
     E-MapReduce
目录
相关文章
|
10月前
|
数据库 数据安全/隐私保护
阿里云E-MapReduce集群-开源Ldap密码不安全问题解决方案
社区开源Ldap密码不安全问题解决方案
|
SQL 消息中间件 Kafka
flink 读取kafka 写入带kerberos认证的hive环境
flink 读取kafka 写入带kerberos认证的hive环境
|
11月前
|
分布式计算 资源调度 Hadoop
Hadoop基础学习---5、MapReduce概述和WordCount实操(本地运行和集群运行)、Hadoop序列化
Hadoop基础学习---5、MapReduce概述和WordCount实操(本地运行和集群运行)、Hadoop序列化
|
分布式计算 Ubuntu Hadoop
【集群模式】执行MapReduce程序-wordcount
因为是在hadoop集群下通过jar包的方式运行我们自己写的wordcount案例,所以需要传递的是 HDFS中的文件路径,所以我们需要修改上一节【本地模式】中 WordCountRunner类 的代码
|
存储 分布式计算 监控
Hadoop, Hadoop涉及到的一些常见概念(分布式与集群、HDFS、MapReduce等),Hadoop怎么用?
Hadoop, Hadoop涉及到的一些常见概念(分布式与集群、HDFS、MapReduce等),Hadoop怎么用?
455 0
|
消息中间件 安全 Kafka
Kafka安全机制与授权你了解吗?
Kafka安全机制与授权你了解吗?
|
弹性计算 分布式计算 Java
E-MapReduce集群-JAVA客户端远程连接HDFS
阿里云E-MapReduce集群-JAVA客户端远程连接HDFS
|
分布式计算 资源调度 Java
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
|
SQL druid 数据可视化
【Druid】(九)E-MapReduce Druid 集群集成 Superset(数据探查与可视化平台 )1
【Druid】(九)E-MapReduce Druid 集群集成 Superset(数据探查与可视化平台 )1
309 0
【Druid】(九)E-MapReduce Druid 集群集成 Superset(数据探查与可视化平台 )1
|
SQL JSON druid
【Druid】(九)E-MapReduce Druid 集群集成 Superset(数据探查与可视化平台 )2
【Druid】(九)E-MapReduce Druid 集群集成 Superset(数据探查与可视化平台 )2
173 0
【Druid】(九)E-MapReduce Druid 集群集成 Superset(数据探查与可视化平台 )2