kafka使用SASL认证

简介: kafka使用SASL认证

前提

最近在使用运维团队给到的kafka集群时,需要使用sasl证人连接,这里记录一下

将运维人员给的sasl证书文件client_truststore.jks放在项目resource文件夹下

配置consumer

@Configuration
public class SSLKafkaConsumerConfig {
 
    @Value("${outer.kafka.bootstrap.servers}")
    private String bootStrapServers;
 
    @Value("${outer.kafka.enable.auto.commit}")
    private String enableAutoCommit;
 
    @Value("${outer.kafka.auto.commit.interval}")
    private String autoCommitInterval;
 
    @Value("${outer.kafka.session.timeout}")
    private String sessionTimeout;
 
    @Value("${outer.kafka.auto.offset.reset}")
    private String autoOffsetReset;
 
    @Value("${outer.kafka.consumer.concurrency}")
    private Integer consumerConcurrency;
 
    @Value("${outer.kafka.consumer.poll.timeout}")
    private Integer consumerPollTimeout;
 
 
    @Value("${outer.kafka.security.protocol}")
    private String securityProtocol;
 
    //@Value("${outer.kafka.ssl.algorithm}")
    private String sslAlgorithm;
 
    //@Value("${outer.kafka.ssl.password}")
    private String sslPassword;
 
    @Value("${outer.kafka.sasl.mechanism}")
    private String saslMechanism;
 
    @Value("${outer.kafka.sasl.jaas.config}")
    private String saslJaasConfig;
 
    //@Value("${outer.kafka.sasl.truststore}")
    private String trustStore;
 
 
    @Value("${outer.kafka.consumer.service.product.consumer.group}")
    private String groupId;
 
    @Value("${outer.kafka.consumer.service.product.topic}")
    private String serviceProductTopic;
 
 
    public Map<String, Object> kafkaConfigs() {
        Map<String, Object> configMap = new HashMap<>();
 
        configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        configMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
 
        configMap.put("security.protocol", securityProtocol);
        configMap.put("sasl.mechanism", saslMechanism);
        configMap.put("sasl.jaas.config", saslJaasConfig);
 
        configMap.put("ssl.endpoint.identification.algorithm", "");
        configMap.put("ssl.truststore.location", ClassUtils.getDefaultClassLoader().getResource("").getPath() + "client_truststore.jks");
        configMap.put("ssl.truststore.password", sslPassword);
 
        return configMap;
    }
 
    @Bean
    public KafkaConsumer<String, String> sslKafkaConsumer() {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfigs());
        consumer.subscribe(Arrays.asList(serviceProductTopic));
        return consumer;
    }
}

业务类中注入使用即可

   @Autowired

   @Qualifier("sslKafkaConsumer")

   private KafkaConsumer<String, String> sslKafkaConsumer;

比如消费

while (true) {
                ConsumerRecords<String, String> records = null;
                try {
                    records = sslKafkaConsumer.poll(timeoutMs);
                    sslKafkaConsumer.commitSync();
                } catch (ConcurrentModificationException e) {
                    //e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                    LOGGER.error("eService消费ssl kafka 消息失败。");
                    LOGGER.error(e.getMessage());
                }
                if (Objects.isNull(records)) {
                    continue;
                }
                System.out.print(records)
}

上面只是介绍了使用,如果要给kafka配置ssl 以及acl权限,参考文章

https://blog.csdn.net/wsdc0521/article/details/108618997


相关文章
|
4月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
174 4
|
4月前
|
消息中间件 安全 Kafka
Flink与Kafka的终极联盟:揭秘如何在一瞬间切换SASL机制,保护您的数据不受黑客侵袭!
【8月更文挑战第7天】Apache Flink作为高性能流处理框架,在与Kafka集成时确保数据安全至关重要。通过配置`KafkaConsumer`使用SASL机制如SCRAM-SHA-256或PLAIN,可有效防止未授权访问。SCRAM-SHA-256采用强化的身份验证流程提高安全性,而PLAIN机制则相对简单。配置涉及设置`properties`参数,包括指定`sasl.mechanism`、`security.protocol`及JAAS认证信息。合理选择和配置这些参数对于保护Flink应用与Kafka间的数据通信安全至关重要。
101 0
|
SQL 消息中间件 Kafka
flink 读取kafka 写入带kerberos认证的hive环境
flink 读取kafka 写入带kerberos认证的hive环境
|
消息中间件 Java Kafka
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
1510 0
|
消息中间件 存储 安全
基于SASL和ACL的Kafka安全性解析
本文主要介绍基于SCRAM进行身份验证,使用Kafka ACL进行授权,SSL进行加密以及使用camel-Kafka连接Kafka群集以使用camel路由生产和消费消息的过程。
466 0
|
消息中间件 Cloud Native 物联网
阿里云消息队列 RocketMQ、Kafka 荣获金融级产品稳定性测评 “先进级” 认证
在混沌工程技术沙龙--金融行业精品专场的分布式系统稳定性评估体系获奖名单中,阿里云分布式消息队列服务成为通过首批消息队列服务稳定性认证,荣获最高级别 “先进级” 认证的消息队列服务。
518 0
阿里云消息队列 RocketMQ、Kafka 荣获金融级产品稳定性测评 “先进级” 认证
|
消息中间件 Ubuntu Java
Kafka安装并配置SASL_PLAINTEXT认证
Kafka安装并配置SASL_PLAINTEXT认证
1171 0
|
消息中间件 存储 算法
【Kafka SASL/SCRAM动态认证集群部署
【Kafka SASL/SCRAM动态认证集群部署
1201 0
|
消息中间件 安全 Kafka
Kafka SASL集群部署
Kafka SASL集群部署
274 0
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
102 1