flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
1、版本说明 flink版本:1.10.2 kafka版本:1.1.0
2、kafka鉴权说明 仅使用了sasl鉴权方式 在kafka客户端有配置 kafka_server-jass.conf、 server.properties、producer.properties、consumer.properties
3、主要配置参数 sasl.mechanism=PLAIN security.protocol=SASL_PLAINTEXT sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule required username="xx" password="xx-secret"; 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。
4、用于flink SQL连接的jar包 flink-sql-connector-kafka_2.11-1.10.2.jar flink-jdbc_2.11-1.10.2.jar flink-csv-1.10.2-sql-jar.jar
5、我的思路 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。
6、启动客户端 ./bin/sql-client.sh embedded -l sql_lib/ 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包
7、建表语句: create table test_hello ( name string ) with ( ... ... 'connector.properties.sasl.mechanism' = 'PLAIN', 'connector.properties.security.protocol' = 'SASL_PLAINTEXT', 'connector.properties.sasl.jaas.config' = 'org.apache.kafka.comon.security.plain.PlainLoginModule required username="xx" password="xx-secret";', 'format.type' = 'csv' );
建表没有问题,可以正常建表。
查询表的时候,就会报错,select * from test_hello; 报错如下: could not execute sql statement. Reason: javax.security.auth.login.loginException: unable to find loginModule class: org.apache.kafka.common.security.plain.PlainLoginModule 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?
kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
我感觉还是jar的问题。如下: 将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为 org.apache.flink.kafka.shaded.org.apache.kafka.common.securi ty.plain.PlainLoginModule
因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。*来自志愿者整理的flink邮件归档