开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

在本地执行flink是报错TypeDeserializerAdapter not found

本地执行代码如下

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //创建一个tableEnvironment StreamTableEnvironment fsTableEnv = StreamTableEnvironment.getTableEnvironment(fsEnv); //读取数据源 DataStream ds1 = fsEnv.readTextFile("D:\output.txt"); ds1.print();

    //数据转换
    DataStream<Tuple2<String, String>> ds2 = ds1.map(new MapFunction<String, Tuple2<String, String>>() {
        private static final long serialVersionUID = -3027796541526131219L;

        @Override
        public Tuple2<String, String> map(String s) throws Exception {
            String[] splits =  s.split(",");
            return new Tuple2<>(splits[0], splits[1]);
        }
    });
    ds2.print();

    //DataStream 转sql, 指定字段名
    Table table = fsTableEnv.fromDataStream(ds2, "id,name");
    table.printSchema();

// DataStream<Tuple2<Boolean, Tuple>> stream1 = fsTableEnv.toRetractStream(table, Types.TUPLE(Types.STRING, Types.STRING)); DataStream<Tuple2<Boolean, Row>> sinkStream = fsTableEnv.toRetractStream(table,Row.class);

报错如下 Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/typeutils/TypeDeserializerAdapter at org.apache.flink.table.runtime.types.CRowTypeInfo.createSerializer(CRowTypeInfo.scala:57) at org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:864) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:308) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:293) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:680) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:253) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:212) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699) at Group1.test.DataStreamTable2.main(DataStreamTable2.java:61) Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.typeutils.TypeDeserializerAdapter

展开
收起
游客3yybxutymqvrm 2020-10-07 07:02:44 2488 0
1 条回答
写回答
取消 提交回答
  • 注意import的StreamExecutionEnvironment

    // java 的头是

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

    // scala的头是:

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

    如果不小心用了java的环境初始化context, 编译器并不能很好的提示, 会出现一堆看上去在java环境才能有的错误, 比如:

    missing parameter type for expanded function

    cannot resolve symbol with such signature

    类似的很多找不到定义, 不明白类型等等错误, 所以写scala程序的第一步一定要确认StreamExecutionEnvironment是不是选的对

    2 maven-shade-plugin, 打包失败, Error creating shaded jar: null: IllegalArgumentException

    很显然是包依赖版本问题, shade打包是用append的方法. 我出这个错误主要是因为引入了ES的sink, 参照网上的代码, 需要同时引入依赖flink-connector-elasticsearch5_2.11和org.elasticsearch.

    引入org.elasticsearch主要是为了两个定义

    importorg.elasticsearch.action.index.IndexRequest

    importorg.elasticsearch.client.Requests

    没有太好的解决版本, 试验了几个版本后, 发现这两个版本是匹配的

    org.apache.flink flink-connector-elasticsearch5_2.11 flink 1.3.2

    org.elasticsearch elasticsearch 5.5.0

    sink参考代码如下:

    val config =newjava.util.HashMap[String,String]

    config.put("cluster.name","my-cluster-name")

    // This instructs the sink to emit after every element, otherwise they would be buffered

    // config.put("bulk.flush.max.actions", "1")

    val transportAddresses =newjava.util.ArrayList[InetSocketAddress]

    transportAddresses.add(newInetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))

    transportAddresses.add(newInetSocketAddress(InetAddress.getByName("10.2.3.1"),9300))

    qosSliceStream.addSink(newElasticsearchSink(config,transportAddresses,newElasticsearchSinkFunction[String] { def createIndexRequest(element:String): IndexRequest = { val json =newjava.util.HashMap[String,String]

    json.put("data",element)

    Requests.indexRequest().index("my-index").type("my-type").source(json)

    }

    importorg.apache.flink.api.common.functions.RuntimeContext

    importorg.apache.flink.streaming.connectors.elasticsearch.RequestIndexer

    def process(element:String,ctx: RuntimeContext,indexer: RequestIndexer):Unit= { indexer.add(createIndexRequest(element))

    }

    }))

    3 scala: java list to scala list

    问题出现在, 做flatMap, 想把一个二维的java数组展开用于流计算, 但是怎么写都不成, 按理说直接flatMap(x=>x)就可以了, 各种调试发现, log中一直有java.util.list字样, 后来怀疑这个List因为scala不认, 所以不能flatten.

    查了一下, 果然, 需要做的也很简单.

    import scala.collection.JavaConverters._

    .flatMap(x => x.asScala.toList)

    4 TimerException{java.lang.NoSuchMethodError: scala.runtime.LongRef.create(J)Lscala/runtime/LongRef;}

    这个是scala编译版本和运行时版本不一致导致的, 可以看到2.10和2.11对这块的定义明显不一致.

    为了保证编译时的 scala 版本和运行时的 scala 版本一致, 最好使用项目依赖的 scala-library 运行程序.

    最终问题定位为, 提交flink任务的版本为1.3.0, 其使用的scala版本为2.10, 解决方法就是升级到1.3.2, 对应版本为2.11. 就好了

    开源项目的版本使用还是挺重要的, 很多项目都会任性的升级, 即使是顶级项目. 所以大公司才有有专人维护开源代码吧.

    推荐一个工具, IDEA中有个把包依赖用图形表示的功能, 非常实用, 可以支持索引, 和方便的排除操作.

    5 Flink Kafka Connector的特别之处

    这一点其实单独写一篇也够了, 不过还是先简单记录下. 先说现象, 准备把几个flink的任务做迁移, 需要迁到一个不同的物理集群上, 担心稳定性和部署方面的事, 就没有停掉之前的任务, 在新集群起了一个相同group ID的任务, 料想kafka会统一分配patitions给到两个任务, 我们最终得到的数据量是不变的, 每条消息只被处理一次. 但是, 事实并不是这个样子的, 几个小时后, 后端的磁盘就报警了, 看了一下发现, 日志量double啦, 也就是说同一个group name的两个flink 任务, 并没有共享任务.

    2021-02-20 15:40:52
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载