开发者社区> 问答> 正文

关于canal支持kafka sasl问题

我查看canal的connector.kafka源码发现没有开放对kafka sasl的支持,所以我自己为生产者和消费者添加了sasl功能。 生产者添加编译完是可以同步sasl认证正常连接到kafka的消费者添加时,我遇到了问题,错误信息如下

`2020-09-27 16:49:04.464 [Thread-4] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - process error! org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:793) ~[na:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644) ~[na:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:624) ~[na:na] at com.alibaba.otter.canal.connector.kafka.consumer.CanalKafkaConsumer.connect(CanalKafkaConsumer.java:77) ~[na:na] at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.process(AdapterProcessor.java:184) ~[client-adapter.launcher-1.1.5-SNAPSHOT.jar:na] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_141]

Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:125) ~[na:na] at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140) ~[na:na] at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65) ~[na:na] at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88) ~[na:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:710) ~[na:na] ... 5 common frames omitted

Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794) ~[na:1.8.0_141] at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195) ~[na:1.8.0_141] at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) ~[na:1.8.0_141] at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) ~[na:1.8.0_141] at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_141] at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680) ~[na:1.8.0_141] at javax.security.auth.login.LoginContext.login(LoginContext.java:587) ~[na:1.8.0_141] at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52) ~[na:na] at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:53) ~[na:na] at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:89) ~[na:na] at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:114) ~[na:na] ... 9 common frames omitted`

而我的sasl.jaas.config等参数配置已经在生产者中验证成功了,配置代码如下,添加于CanalKafkaConsumer的init方法尾部

if (Boolean.parseBoolean(properties.getProperty(KafkaConstants.CANAL_MQ_KAFKA_SASL_ENABLE))){ kafkaProperties.put("security.protocol", properties.getProperty(KafkaConstants.CANAL_MQ_KAFKA_SECURITY_PROTOCOL)); kafkaProperties.put("sasl.mechanism", properties.getProperty(KafkaConstants.CANAL_MQ_KAFKA_SASL_MECHANISM)); kafkaProperties.put("sasl.jaas.config", properties.getProperty(KafkaConstants.CANAL_MQ_KAFKA_SASL_JAAS_CONFIG)); }

我尝试了三种sasl配置添加方式:

1、sasl.jaas.config

2、应用启动脚本中-D追加系统参数

3、System.setProperty

我对java的JAAS身份验证原理了解还很浅显,观察canal的client-adapter的启动代码发现kafka consumer是线程启动的,是否可能于此有关。 恳请大佬解惑,感谢。

原提问者GitHub用户q2515045

展开
收起
山海行 2023-04-27 20:04:56 109 0
1 条回答
写回答
取消 提交回答
  • 我找到问题了 Adapter中实例化consumer使用的静态实例化,这个和生产端代码一样,。 但是,这个实例化调取的init没有new consumer对象,new操作在下面声明的子线程中做的,我修改connector代码将new操作移至init方法内,运行通过。 再跟进验证是否存在其它问题。。

    回答18.png

    原回答者GitHub用户q2515045

    2023-04-28 14:36:02
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载