如何为Kafka加上账号密码(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本小节我们就为Kafka添加最简单的认证方式,也就是SASL_PLAINTEXT(即SASL/PLAIN+ 非加密通道)。

认证策略SASL/PLAIN

上篇文章中我们讲解了Kafka认证方式和基础概念,并比较了不同方式的使用场景。

我们在《2024年了,如何更好的搭建Kafka集群?》中集群统一使用PLAINTEXT通信。Kafka通常是在内网使用,但也有特殊的使用场景需要暴漏到公网上,如果未设置认证的Kafka集群允许通过公网访问,或暴漏给全部研发人员是极不安全的方式。

本小节我们就为Kafka添加最简单的认证方式,也就是SASL_PLAINTEXT(即SASL/PLAIN+ 非加密通道)。

配置服务集群节点

Kafka有三个地方可以做认证:
borker节点之间的认证、controller节点间的认证、外部客户端连接集群认证。公司内部使用我们只需要配置外部客户端连接时认证就可以了。本文基于kraft单节点部署Kafka,该节点既是controller又是broker节点。

在开始前,请先前往官网下载安装包(本文使用 Kafka-v3.6.1):
https://kafka.apache.org/downloads

1. 创建jaas配置文件

文件:volume/config/kafka-server-jaas.conf

KafkaServer {
   
    org.apache.kafka.common.security.plain.PlainLoginModule required
    user_admin="123456";
};

账号密码以user_{USERNAME}="PASSWORD"这种形式配置,固定写死在jaas的配置文件中。

2. 修改配置文件

修改config/kraft/server.properties如下:

# 修改listeners和advertised
listeners=PLAINTEXT://:9092,CONTROLLER://:9093,SASL_PLAINTEXT://:19092
advertised.listeners=PLAINTEXT://192.168.56.103:9092,SASL_PLAINTEXT://192.168.56.103:19092
# 添加mechanisms配置
sasl.enabled.mechanisms=PLAIN

虽然该实验我们只有一个节点,但我们按照集群的方式分配端口:

  • 9092:用于broker节点内部通讯和内网客户端通信
  • 19092:用于broker节点外部通讯和外网客户端通信
  • 9093:用于多个controller节点之间的通信

我们只需要对外部流量做认证即可,也就是所有通过19092端口的通信需要经过认证。

配置文件中有三处需要注意:

  1. sasl.enabled.mechanisms=PLAIN 意思是采用PLAIN这种方式做认证。
  2. listeners中的SASL_PLAINTEXT监听器,表示监听19092端口,并对经过该端口的通信做认证。
  3. advertised.listeners中的SASL_PLAINTEXT监听器,表示仅允许符合192.168.56.103:19092的流量包抵达SASL_PLAINTEXT监听器。

关于advertised.listeners,Kafka有一套复杂的监听器系统。Kafka允许定义多个监听器监听不同端口,将不同的流量划分到不同的端口。这样的好处是当一个端口流量较大时,不影响其它端口的通信。

3. 启动节点

执行下面这条命令启动节点:

KAFKA_OPTS=-Djava.security.auth.login.config=config/kraft/jaas.conf  bin/kafka-server-start.sh config/kraft/server.properties

环境变量KAFKA_OPTS用于指定JAAS配置文件。

经过上面3个步骤服务端就配置好了,下面我们使用客户端验证认证是否生效。

配置客户端

我们将介绍三种客户端连接方式:

  1. kafka命令工具测试
  2. offset explorer图形工具连接kafka
  3. flink连接kafka

kafka命令工具测试

在config目录中创建admin.conf

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";

提示:确保账号密码与服务端配置一致


# 创建topic
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server=192.168.56.103:19092 --command-config=config/admin.conf
# 查看topic
bin/kafka-topics.sh --list --bootstrap-server=192.168.56.103:19092 --command-config=config/admin.conf

使用图形工具Offset explorer连接kafka

![[attach/Pasted image 20240204135019.png]]
此处只需要填写好集群名称即可,我们没有用zk版,zookeeper相关配置无需修改。
![[attach/Pasted image 20240127151018.png]]
![[attach/Pasted image 20240127151059.png]]
![[attach/Pasted image 20240127151109.png]]

这三处设置好,就可以正常连接了。

Flink连接kafka

在无认证的基础上,额外添加三个认证参数:

  • SECURITY_PROTOCOL_CONFIG
  • SASL_MECHANISM
  • SASL_JAAS_CONFIG
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5");
        properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        properties.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
        properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='123456';");
        /*
        org.apache.flink.streaming包下面的消费者和生产者已被标记不建议使用
        org.apache.flink.connector.kafka 推荐使用该包中的工具类
         */
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("192.168.56.103:19092")
                .setTopics("test-topic")
                .setGroupId("test")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setProperties(properties)
                .build();

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);
        env.setParallelism(1);
        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "test-tip");
        stream.print();

        env.execute("Kafka to Print");

我们已经为Kafka集群配置好SASL/PLAIN的认证方式。但这个方式的缺点也是显而易见,它只能在启动Kafka前,固定填写好用户名和密码,无法做到灵活修改账号密码。若要修改,只能重启集群。下一篇文章,我们介绍一种可以灵活修改的认证方式,即SASL/SCRAM。


转载请保留本文连接:程序饲养员

相关文章
|
3月前
|
消息中间件 安全 Java
如何为Kafka加上账号密码(一)
一直以来,我们公司内网的Kafka集群都是在裸奔,只要知道端口号,任何人都能连上集群操作一番。直到有个主题莫名消失,才引起我们的警觉,是时候该考虑为它添加一套认证策略了。
304 2
|
3月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
485 2
2024年了,如何更好的搭建Kafka集群?
|
4月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
49 0
|
7月前
|
消息中间件 存储 Kubernetes
Helm方式部署 zookeeper+kafka 集群 ——2023.05
Helm方式部署 zookeeper+kafka 集群 ——2023.05
266 0
|
4月前
|
消息中间件 Kafka Linux
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
72 0
|
4月前
|
消息中间件 数据可视化 关系型数据库
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
154 0
|
5月前
|
消息中间件 存储 算法
Kafka Raft集群搭建
Kafka Raft集群搭建
83 0
|
2月前
|
消息中间件 存储 缓存
Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
49 1
|
2月前
|
消息中间件 存储 Kafka
Kafka【环境搭建 02】kafka_2.11-2.4.1 基于 zookeeper 搭建高可用伪集群(一台服务器实现三个节点的 Kafka 集群)
【2月更文挑战第19天】Kafka【环境搭建 02】kafka_2.11-2.4.1 基于 zookeeper 搭建高可用伪集群(一台服务器实现三个节点的 Kafka 集群)
140 1
|
6月前
|
消息中间件 数据可视化 Kafka
消息中间件系列教程(21) -Kafka- 集群搭建(自带Zookeeper)
消息中间件系列教程(21) -Kafka- 集群搭建(自带Zookeeper)
68 0