Kafka安全机制与授权你了解吗?

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Kafka安全机制与授权你了解吗?

Kafka安全机制与授权(linux)

下载kafka

地址:https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz

下载:wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz

解压:tar zxvf kafka_2.12-2.5.0.tgz

进入目录:cd kafka_2.12-2.5.0/

[root@localhost kafka_2.12-2.5.0]# ll
总用量 56
drwxr-xr-x 3 root root  4096 4月   8 09:16 bin
drwxr-xr-x 2 root root  4096 4月   8 09:16 config
drwxr-xr-x 2 root root  8192 6月  30 13:01 libs
-rw-r--r-- 1 root root 32216 4月   8 09:13 LICENSE
-rw-r--r-- 1 root root   337 4月   8 09:13 NOTICE
drwxr-xr-x 2 root root    44 4月   8 09:16 site-docs
[root@localhost kafka_2.12-2.5.0]# 

bin目录放的都是可执行文件,config目录放的都是配置文件。

首先我们先创建一个证书
创建证书

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin  

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=test],SCRAM-SHA-512=[password=test]' --entity-type users --entity-name test  

验证证书

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name admin  

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name test  

这地方的admin就是下面的admin

我们先进入config目录下,创建一个名为kafka_server_jaas.conf文件,内容如下:

KafkaServer {
  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin"
  password="admin";
};

具体操作

[root@localhost kafka_2.12-2.5.0]# cd config/
[root@localhost config]# touch kafka_server_jaas.conf
[root@localhost config]# vim kafka_server_jaas.conf 
[root@localhost config]# 

创建好这个文件后,然后我们去修改server.properties

# 设置监听使用SASL而不是SSL
listeners=SASL_PLAINTEXT://192.168.0.98:9092
advertised.listeners=SASL_PLAINTEXT://192.168.0.98:9092

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256

# ACL
allow.everyone.if.no.acl.found=false
# 设置超级账号,如果是多个需要分号分割,例如:User:admin;User:root
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

#这里加上这句话就不需要kafka_server_jaas.conf这个文件了
#如果没加则需要执行
#export KAFKA_OPTS="-Djava.security.auth.login.config=/你的目录/kafka_2.12-2.5.0/config/kafka_server_jaas.conf"
listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="admin" \
    password="admin";

启动zookeeper

cd bin/
./zookeeper-server-start.sh ../config/zookeeper.properties 

启动kafka

./kafka-server-start.sh ../config/server.properties 

创建Topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_test

添加读写权限

# 添加读权限  
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Read --topic kafka_test  

# 添加写权限  
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Write --topic kafka_test  

# 添加消费者组权限  
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Read --group group_test   

# 查看权限列表
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list

生产者

/**
 * 生产者配置
 */
@Configuration
public class KafkaProducerConfig {


    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

    public ProducerFactory<String,String> producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfigs());
    }

    private Map<String, Object> producerConfigs() {

        HashMap<String, Object> properties = new HashMap<String, Object>();
        //配置的是kafka的端口
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.98:9092");
        //配置key的序列化类
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //配置value的序列化类
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,
        //这有助于提升客户端和服务端之间的性能,此配置控制默认批量大小(以字节为单位),默认值为16384
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        //0生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞吐量高。
        //1只要集群的首领节点收到消息,生产者会收到来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。默认使用这个配置。
        //all或-1只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        //producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
        //
        //配置文件设置sasl_plaintext认证
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
        properties.put(SaslConfigs.SASL_MECHANISM,"SCRAM-SHA-256");
        properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test\" password=\"test\";");
        return properties;
    }
    
}

生产消息

@RestController
public class TestController {

    private static Logger logger = LoggerFactory.getLogger(TestController.class);

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @GetMapping("test")
    public String test(){
        HashMap<String,String> hashMap = new HashMap<>();
        hashMap.put("name","zd");
        hashMap.put("age","18");
        //这地方如果有topic:testTopic会往这个topic添加消息,如果没有则创建这个testTopic然后往里面添加消息
        ListenableFuture<SendResult<String, String>> testTopic = kafkaTemplate.send("kafka_test", JSONObject.toJSONString(hashMap));

        //可有可无看自己需求
        testTopic.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.info("发送失败");
                //失败do something
            }

            @Override
            public void onSuccess(SendResult<String, String> integerStringSendResult) {
                logger.info("发送成功");
                //成功do something
            }
        });

        return "ok";
    }
}

消费者

/**
 * 消费者配置
 */
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() { Map<String, Object> properties = new HashMap<>();

        //配置的是kafka的端口
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.98:9092");
        //消息反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:""
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
        //如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程挂掉时,由新的consumer使用,默认:true
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //consumer自动向zookeeper提交offset的频率,默认:5000
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);
        //没有初始化的offset时,可以设置以下三种情况:(默认:latest)
        //earliest
        //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        //latest
        //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        //none
        //topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //消费者进程的标识。如果设置一个人为可读的值,跟踪问题会比较方便。。默认:""
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "zdpower");

        //配置文件设置sasl_plaintext认证
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
        properties.put(SaslConfigs.SASL_MECHANISM,"SCRAM-SHA-256");
        properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test1\" password=\"test1\";");
        return properties;
    }
}

消费消息

@Component
public class MyKafkaConsumer {

    public static Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class);

    @KafkaListener(containerFactory="kafkaListenerContainerFactory",topics = {"kafka_test"})
    public void preCommandTicket1(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            logger.info("----------------- record =" + record);
            logger.info("------------------ message =" + message);
        }
    }
}
相关文章
|
6月前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
387 0
|
1月前
|
消息中间件 Java 大数据
Kafka ISR机制详解!
本文详细解析了Kafka的ISR(In-Sync Replicas)机制,阐述其工作原理及如何确保消息的高可靠性和高可用性。ISR动态维护与Leader同步的副本集,通过不同ACK确认机制(如acks=0、acks=1、acks=all),平衡可靠性和性能。此外,ISR机制支持故障转移,当Leader失效时,可从ISR中选取新的Leader。文章还包括实例分析,展示了ISR在不同场景下的变化,并讨论了其优缺点,帮助读者更好地理解和应用ISR机制。
48 0
Kafka ISR机制详解!
|
1月前
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
172 0
|
3月前
|
消息中间件 负载均衡 Java
揭秘Kafka背后的秘密!Kafka 架构设计大曝光:深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理及流传输设计的高效率消息系统。其核心特性包括高吞吐量、低延迟及出色的可扩展性。Kafka采用分布式日志模型,支持数据分区与副本,确保数据可靠性和持久性。系统由Producer(消息生产者)、Consumer(消息消费者)及Broker(消息服务器)组成。Kafka支持消费者组,实现数据并行处理,提升整体性能。通过内置的故障恢复机制,即使部分节点失效,系统仍能保持稳定运行。提供的Java示例代码展示了如何使用Kafka进行消息的生产和消费,并演示了故障转移处理过程。
52 3
|
3月前
|
消息中间件 Java Kafka
如何在Kafka分布式环境中保证消息的顺序消费?深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据管道和流处理设计的分布式平台,以其高效的消息发布与订阅功能著称。在分布式环境中确保消息按序消费颇具挑战。本文首先介绍了Kafka通过Topic分区实现消息排序的基本机制,随后详细阐述了几种保证消息顺序性的策略,包括使用单分区Topic、消费者组搭配单分区消费、幂等性生产者以及事务支持等技术手段。最后,通过一个Java示例演示了如何利用Kafka消费者确保消息按序消费的具体实现过程。
123 3
|
3月前
|
消息中间件 负载均衡 Java
"深入Kafka核心:探索高效灵活的Consumer机制,以Java示例展示数据流的优雅消费之道"
【8月更文挑战第10天】在大数据领域,Apache Kafka凭借其出色的性能成为消息传递与流处理的首选工具。Kafka Consumer作为关键组件,负责优雅地从集群中提取并处理数据。它支持消息的负载均衡与容错,通过Consumer Group实现消息的水平扩展。下面通过一个Java示例展示如何启动Consumer并消费数据,同时体现了Kafka Consumer设计的灵活性与高效性,使其成为复杂消费场景的理想选择。
120 4
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
67 3
|
4月前
|
消息中间件 存储 监控
深入理解Kafka核心设计及原理(六):Controller选举机制,分区副本leader选举机制,再均衡机制
深入理解Kafka核心设计及原理(六):Controller选举机制,分区副本leader选举机制,再均衡机制
93 1
|
3月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
38 0
|
3月前
|
消息中间件 安全 Kafka
Flink与Kafka的终极联盟:揭秘如何在一瞬间切换SASL机制,保护您的数据不受黑客侵袭!
【8月更文挑战第7天】Apache Flink作为高性能流处理框架,在与Kafka集成时确保数据安全至关重要。通过配置`KafkaConsumer`使用SASL机制如SCRAM-SHA-256或PLAIN,可有效防止未授权访问。SCRAM-SHA-256采用强化的身份验证流程提高安全性,而PLAIN机制则相对简单。配置涉及设置`properties`参数,包括指定`sasl.mechanism`、`security.protocol`及JAAS认证信息。合理选择和配置这些参数对于保护Flink应用与Kafka间的数据通信安全至关重要。
74 0