开发者社区> 问答> 正文

单任务多条流的逻辑报错

Hi all:

请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因? org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) at org.apache.flink.streaming.runtime.tasks.OperatorChain. (OperatorChain.java:144) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.InvalidClassException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class invalid for deserialization at java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150) at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)

大致逻辑如下, 我有2条流: 1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka 2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka 代码如下: StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); List kafkaSourceConfiguration = this.kafkaConfiguration.getSource0(); KafkaInfo kafkaSinkConfiguration = this.kafkaConfiguration.getSink(); RecordTransformOperator transformOperator = new RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE); RecordKeySelector keySelector = new RecordKeySelector(); RecordComputeOperator computeOperator = new RecordComputeOperator(); Properties sinkProperties = new Properties(); sinkProperties.setProperty("bootstrap.servers", kafkaSinkConfiguration.getBootstrapServer()); FlinkKafkaProducer011 flinkKafkaProducer = new FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new KafkaSerializer(), sinkProperties);

List<SingleOutputStreamOperator<Tuple2<String, String>>> dataStreamList = new ArrayList<>(); for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) { Properties sourceProperties = new Properties(); sourceProperties.setProperty("bootstrap.servers", kafkaInfo.getBootstrapServer()); sourceProperties.setProperty("group.id", kafkaInfo.getGroupId()); sourceProperties.setProperty("max.poll.records", kafkaInfo.getMaxPollRecord()); sourceProperties.put("max.poll.interval.ms", kafkaInfo.getMaxPollIntervalMs()); String topicName = kafkaInfo.getTopicName(); FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer = new FlinkKafkaConsumer011(topicName, new KafkaDeserializer(), sourceProperties); SingleOutputStreamOperator<Tuple2<String, String>> singleOutputStreamOperator = streamExecutionEnvironment.addSource(flinkKafkaConsumer); dataStreamList.add(singleOutputStreamOperator); }

DataStream<Tuple2<String, String>> unionDataStream = dataStreamList.get(0); for(int i = 1; i<dataStreamList.size(); i++) { unionDataStream = unionDataStream.union(dataStreamList.get(i)); } unionDataStream.flatMap(transformOperator) .keyBy(keySelector) .flatMap(computeOperator) .addSink(flinkKafkaProducer);

RecordTransformOperator transformOperator1 = new RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM); Properties sinkProperties1 = new Properties(); sinkProperties1.setProperty("bootstrap.servers", kafkaSinkConfiguration.getBootstrapServer()); FlinkKafkaProducer011 flinkKafkaProducer1 = new FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new KafkaSerializer(), sinkProperties1); KafkaInfo kafkaInfo = this.kafkaConfiguration.getSource1().get(0); Properties sourceProperties = new Properties(); sourceProperties.setProperty("bootstrap.servers", kafkaInfo.getBootstrapServer()); sourceProperties.setProperty("group.id", kafkaInfo.getGroupId()); sourceProperties.setProperty("max.poll.records", kafkaInfo.getMaxPollRecord()); sourceProperties.put("max.poll.interval.ms", kafkaInfo.getMaxPollIntervalMs()); String topicName = kafkaInfo.getTopicName(); FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer = new FlinkKafkaConsumer011(topicName, new KafkaDeserializer(), sourceProperties); streamExecutionEnvironment .addSource(flinkKafkaConsumer) .flatMap(transformOperator1) .addSink(flinkKafkaProducer1); streamExecutionEnvironment.execute();*来自志愿者整理的flink

展开
收起
毛毛虫雨 2021-12-06 15:29:58 817 0
1 条回答
写回答
取消 提交回答
  • Hi, 根据报错内容,定位到你的代码在 at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) InstantiationUtil类的处理有问题,应该是反序列化问题。本地测试没问题,是因为本地不涉及到序列化*来自志愿者整理的flink

    2021-12-06 16:02:38
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
动态、高效,蚂蚁动态卡片的内核逻辑 立即下载
为并行图数据处理提供高层抽象/语言 立即下载
事务、全局索引、透明分布式 立即下载