前提
最近在使用运维团队给到的kafka集群时,需要使用sasl证人连接,这里记录一下
将运维人员给的sasl证书文件client_truststore.jks放在项目resource文件夹下
配置consumer
@Configuration public class SSLKafkaConsumerConfig { @Value("${outer.kafka.bootstrap.servers}") private String bootStrapServers; @Value("${outer.kafka.enable.auto.commit}") private String enableAutoCommit; @Value("${outer.kafka.auto.commit.interval}") private String autoCommitInterval; @Value("${outer.kafka.session.timeout}") private String sessionTimeout; @Value("${outer.kafka.auto.offset.reset}") private String autoOffsetReset; @Value("${outer.kafka.consumer.concurrency}") private Integer consumerConcurrency; @Value("${outer.kafka.consumer.poll.timeout}") private Integer consumerPollTimeout; @Value("${outer.kafka.security.protocol}") private String securityProtocol; //@Value("${outer.kafka.ssl.algorithm}") private String sslAlgorithm; //@Value("${outer.kafka.ssl.password}") private String sslPassword; @Value("${outer.kafka.sasl.mechanism}") private String saslMechanism; @Value("${outer.kafka.sasl.jaas.config}") private String saslJaasConfig; //@Value("${outer.kafka.sasl.truststore}") private String trustStore; @Value("${outer.kafka.consumer.service.product.consumer.group}") private String groupId; @Value("${outer.kafka.consumer.service.product.topic}") private String serviceProductTopic; public Map<String, Object> kafkaConfigs() { Map<String, Object> configMap = new HashMap<>(); configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); configMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); configMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); configMap.put("security.protocol", securityProtocol); configMap.put("sasl.mechanism", saslMechanism); configMap.put("sasl.jaas.config", saslJaasConfig); configMap.put("ssl.endpoint.identification.algorithm", ""); configMap.put("ssl.truststore.location", ClassUtils.getDefaultClassLoader().getResource("").getPath() + "client_truststore.jks"); configMap.put("ssl.truststore.password", sslPassword); return configMap; } @Bean public KafkaConsumer<String, String> sslKafkaConsumer() { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfigs()); consumer.subscribe(Arrays.asList(serviceProductTopic)); return consumer; } }
业务类中注入使用即可
@Autowired
@Qualifier("sslKafkaConsumer")
private KafkaConsumer<String, String> sslKafkaConsumer;
比如消费
while (true) { ConsumerRecords<String, String> records = null; try { records = sslKafkaConsumer.poll(timeoutMs); sslKafkaConsumer.commitSync(); } catch (ConcurrentModificationException e) { //e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); LOGGER.error("eService消费ssl kafka 消息失败。"); LOGGER.error(e.getMessage()); } if (Objects.isNull(records)) { continue; } System.out.print(records) }
上面只是介绍了使用,如果要给kafka配置ssl 以及acl权限,参考文章
https://blog.csdn.net/wsdc0521/article/details/108618997