Hi all:
请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因? org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) at org.apache.flink.streaming.runtime.tasks.OperatorChain. (OperatorChain.java:144) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.InvalidClassException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class invalid for deserialization at java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150) at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
大致逻辑如下, 我有2条流: 1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka 2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka 代码如下: StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); List kafkaSourceConfiguration = this.kafkaConfiguration.getSource0(); KafkaInfo kafkaSinkConfiguration = this.kafkaConfiguration.getSink(); RecordTransformOperator transformOperator = new RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE); RecordKeySelector keySelector = new RecordKeySelector(); RecordComputeOperator computeOperator = new RecordComputeOperator(); Properties sinkProperties = new Properties(); sinkProperties.setProperty("bootstrap.servers", kafkaSinkConfiguration.getBootstrapServer()); FlinkKafkaProducer011 flinkKafkaProducer = new FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new KafkaSerializer(), sinkProperties);
List<SingleOutputStreamOperator<Tuple2<String, String>>> dataStreamList = new ArrayList<>(); for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) { Properties sourceProperties = new Properties(); sourceProperties.setProperty("bootstrap.servers", kafkaInfo.getBootstrapServer()); sourceProperties.setProperty("group.id", kafkaInfo.getGroupId()); sourceProperties.setProperty("max.poll.records", kafkaInfo.getMaxPollRecord()); sourceProperties.put("max.poll.interval.ms", kafkaInfo.getMaxPollIntervalMs()); String topicName = kafkaInfo.getTopicName(); FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer = new FlinkKafkaConsumer011(topicName, new KafkaDeserializer(), sourceProperties); SingleOutputStreamOperator<Tuple2<String, String>> singleOutputStreamOperator = streamExecutionEnvironment.addSource(flinkKafkaConsumer); dataStreamList.add(singleOutputStreamOperator); }
DataStream<Tuple2<String, String>> unionDataStream = dataStreamList.get(0); for(int i = 1; i<dataStreamList.size(); i++) { unionDataStream = unionDataStream.union(dataStreamList.get(i)); } unionDataStream.flatMap(transformOperator) .keyBy(keySelector) .flatMap(computeOperator) .addSink(flinkKafkaProducer);
RecordTransformOperator transformOperator1 = new RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM); Properties sinkProperties1 = new Properties(); sinkProperties1.setProperty("bootstrap.servers", kafkaSinkConfiguration.getBootstrapServer()); FlinkKafkaProducer011 flinkKafkaProducer1 = new FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new KafkaSerializer(), sinkProperties1); KafkaInfo kafkaInfo = this.kafkaConfiguration.getSource1().get(0); Properties sourceProperties = new Properties(); sourceProperties.setProperty("bootstrap.servers", kafkaInfo.getBootstrapServer()); sourceProperties.setProperty("group.id", kafkaInfo.getGroupId()); sourceProperties.setProperty("max.poll.records", kafkaInfo.getMaxPollRecord()); sourceProperties.put("max.poll.interval.ms", kafkaInfo.getMaxPollIntervalMs()); String topicName = kafkaInfo.getTopicName(); FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer = new FlinkKafkaConsumer011(topicName, new KafkaDeserializer(), sourceProperties); streamExecutionEnvironment .addSource(flinkKafkaConsumer) .flatMap(transformOperator1) .addSink(flinkKafkaProducer1); streamExecutionEnvironment.execute();*来自志愿者整理的flink
Hi, 根据报错内容,定位到你的代码在 at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) InstantiationUtil类的处理有问题,应该是反序列化问题。本地测试没问题,是因为本地不涉及到序列化*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。