请教一个Kafka Consumer反序列问题: 一个kafka consumer job 提交到Flink session cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下: 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)) 2020-05-27 17:05:22 org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639) at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:705)*来自志愿者整理的FLINK邮件归档
应该是maven-shade那边配置问题, 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。