E-MapReduce Kafka Kerberos集群授权

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介:

Kafka授权

如果没有开启Kafka认证(如Kerberos认证或者简单的用户名密码),即使开启了Kafka授权,用户也可以伪造身份访问服务。所以建议创建高安全模式(即支持Kerberos)的Kafka集群,详见Kerberos安全文档

备注:

本文的权限配置只针对E-MapReduce的高安全模式集群,即Kafka以Kerberos的方式启动。

1. 添加配置

在Kafka集群的配置管理->Kafka->配置->server.properties->自定义配置

添加如下几个参数:

key value 备注
authorizer.class.name kafka.security.auth.SimpleAclAuthorizer
super.users User:kafka User:kafka是必须的,可添加其它用户用分号(;)隔开

备注:

zookeeper.set.acl用来设置kafka在zookeeper中数据的权限,E-MapReduce集群中已经设置为true,所以上述不需要再添加该配置。该配置打开后,在Kerberos环境中,只有用户名称为kafka且通过Kerberos认证后才能执行kafka-topics.sh命令(kafka-topics.sh会直接读写/修改zookeeper中的数据)。

2. 重启Kafka集群

在Kafka集群的配置管理->HBase->操作->RESTART All Components

3. 授权(ACL)

3.1 基本概念

Kafka官方文档定义:

Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H On Resource R"

即ACL过程涉及Principal Allowed/Denied Operation Host Resource

  • Principal:用户名
安全协议 value
PLAINTEXT ANONYMOUS
SSL ANONYMOUS
SASL_PLAINTEXT mechanism为PLAIN时,用户名是client_jaas.conf指定的用户名,mechanism为GSSAPI时,用户名为client_jaas.conf指定的principal
SASL_SSL
  • Allowed/Denied: 允许/拒绝
  • Operation: 操作

Read,Write,Create,DeleteAlter,Describe,ClusterAction,AlterConfigs,DescribeConfigs,IdempotentWrite,All

  • Host: 针对的机器
  • Resource: 权限作用的资源对象

Topic, Group, Cluster, TransactionalId

Operation/Resource的一些详细对应关系,如哪些Resource支持哪些Operation的授权,详见KIP-11 - Authorization Interface

3.2 授权命令

-使用脚本 kafka-acls.sh (/usr/lib/kafka-current/bin/kafka-acls.sh) 进行授权
具体是使用方式可以直接执行 kafka-acls.sh --help进行查看。

4. 操作示例

在已经创建的E-MapReduce高安全Kafka集群的master节点上进行相关示例操作。

4.1 新建用户test

useradd test

4.2 创建topic

第一节添加配置的备注中提到zookeeper.set.acl=true,kafka-topics.sh需要在kafka账号执行,而且kafka账号下要通过Kerberos认证。

#kafka_client_jaas.conf中已经设置了kafka的Kerberos认证相关信息
export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
# zookeeper地址改成自己集群的对应地址(执行hostnamed的输出)
kafka-topics.sh --create --zookeeper emr-header-1:2181/kafka-1.0.0 --replication-factor 3 --partitions 1 --topic test

4.3 test用户执行kafka-console-producer.sh

  • 创建test用户的keytab文件,用户zookeeper/kafka的认证

    su root
    sh /usr/lib/has-current/bin/hadmin-local.sh /etc/ecm/has-conf -k /etc/ecm/has-conf/admin.keytab
    HadminLocalTool.local: #直接按回车可以看到一些命令的用法
    HadminLocalTool.local: addprinc #输入命令按回车可以看到具体命令的用法
    HadminLocalTool.local: addprinc -pw 123456 test #添加test的princippal,密码设置为123456
    HadminLocalTool.local: ktadd -k /home/test/test.keytab test #导出keytab文件,后续可使用该文件
  • 添加kafka_client_test.conf

    如文件放到/home/test/kafka_client_test.conf,内容如下:

    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="/home/test/test.keytab"
    principal="test";
       };
    
       // Zookeeper client authentication
       Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    useTicketCache=false
    serviceName="zookeeper"
    keyTab="/home/test/test.keytab"
    principal="test";
     };
  • 添加producer.conf

    如文件放到/home/test/producer.conf,内容如下:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
  • 执行kafka-console-producer.sh

    su test
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
    kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092

    由于没有设置ACL,所以上述会报错:

    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]

  • 设置ACL

    同样kafka-acls.sh也需要kafka账号执行

     su kafka
     export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
     kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Write --topic test
  • 再执行kafka-console-producer.sh

    su test
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
     kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092

    正常:

    [2018-02-28 22:25:36,178] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
    >alibaba
    >E-MapReduce
    >

4.4 test用户执行kafka-console-consumer.sh

上面成功执行kafka-console-producer.sh,并往topic里面写入一些数据后,就可以执行kafka-console-consumer.sh进行消费测试.

  • 添加consumer.conf

    如文件放到/home/test/consumer.conf,内容如下:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
  • 执行kafka-console-consumer.sh

    su test
    #kafka_client_test.conf跟上面producer使用的是一样的
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
    kafka-console-consumer.sh --consumer.config consumer.conf  --topic test --bootstrap-server emr-worker-1:9092 --group test-group  --from-beginning

    由于未设置权限,会报错:

    org.apache.kafka.common.errors.GroupAuthorizationException: 
    Not authorized to access group: test-group
  • 设置ACL

    su kafka
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
    #test-group权限
    kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --group test-group
    # topic权限
     kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --topic test
  • 再执行kafka-console-consumer.sh

    su test
    #kafka_client_test.conf跟上面producer使用的是一样的
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
    kafka-console-consumer.sh --consumer.config consumer.conf  --topic test --bootstrap-server emr-worker-1:9092 --group test-group  --from-beginning

    正常输出:

     alibaba
     E-MapReduce
目录
相关文章
|
3月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
135 4
|
4月前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
182 2
|
2月前
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
2月前
|
消息中间件 存储 Kafka
2024最全Kafka集群方案汇总
Apache Kafka 是一个高吞吐量、可扩展、可靠的分布式消息系统,广泛应用于数据驱动的应用场景。Kafka 支持集群架构,具备高可用性和容错性。其核心组件包括 Broker(服务器实例)、Topic(消息分类)、Partition(有序消息序列)、Producer(消息发布者)和 Consumer(消息消费者)。每个分区有 Leader 和 Follower,确保数据冗余和高可用。Kafka 2.8+ 引入了不依赖 Zookeeper 的 KRaft 协议,进一步简化了集群管理。常见的集群部署方案包括单节点和多节点集群,后者适用于生产环境以确保高可用性。
79 0
|
3月前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
4月前
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
134 6
|
6月前
|
消息中间件 Kafka 测试技术
【Kafka揭秘】Leader选举大揭秘!如何打造一个不丢失消息的强大Kafka集群?
【8月更文挑战第24天】Apache Kafka是一款高性能分布式消息系统,利用分区机制支持数据并行处理。每个分区含一个Leader处理所有读写请求,并可有多个副本确保数据安全与容错。关键的Leader选举机制保障了系统的高可用性和数据一致性。选举发生于分区创建、Leader故障或被手动移除时。Kafka提供多种选举策略:内嵌机制自动选择最新数据副本为新Leader;Unclean选举快速恢复服务但可能丢失数据;Delayed Unclean选举则避免短暂故障下的Unclean选举;Preferred选举允许基于性能或地理位置偏好指定特定副本为首选Leader。
136 5
|
6月前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
6月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
144 2
|
6月前
|
消息中间件 Java Kafka
Linux——Kafka集群搭建
Linux——Kafka集群搭建
78 0