本地执行代码如下
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //创建一个tableEnvironment StreamTableEnvironment fsTableEnv = StreamTableEnvironment.getTableEnvironment(fsEnv); //读取数据源 DataStream ds1 = fsEnv.readTextFile("D:\output.txt"); ds1.print();
//数据转换
DataStream<Tuple2<String, String>> ds2 = ds1.map(new MapFunction<String, Tuple2<String, String>>() {
private static final long serialVersionUID = -3027796541526131219L;
@Override
public Tuple2<String, String> map(String s) throws Exception {
String[] splits = s.split(",");
return new Tuple2<>(splits[0], splits[1]);
}
});
ds2.print();
//DataStream 转sql, 指定字段名
Table table = fsTableEnv.fromDataStream(ds2, "id,name");
table.printSchema();
// DataStream<Tuple2<Boolean, Tuple>> stream1 = fsTableEnv.toRetractStream(table, Types.TUPLE(Types.STRING, Types.STRING)); DataStream<Tuple2<Boolean, Row>> sinkStream = fsTableEnv.toRetractStream(table,Row.class);
报错如下 Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/typeutils/TypeDeserializerAdapter at org.apache.flink.table.runtime.types.CRowTypeInfo.createSerializer(CRowTypeInfo.scala:57) at org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:864) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:308) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:293) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:680) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:253) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:212) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699) at Group1.test.DataStreamTable2.main(DataStreamTable2.java:61) Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.typeutils.TypeDeserializerAdapter
注意import的StreamExecutionEnvironment
// java 的头是
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
// scala的头是:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
如果不小心用了java的环境初始化context, 编译器并不能很好的提示, 会出现一堆看上去在java环境才能有的错误, 比如:
missing parameter type for expanded function
cannot resolve symbol with such signature
类似的很多找不到定义, 不明白类型等等错误, 所以写scala程序的第一步一定要确认StreamExecutionEnvironment是不是选的对
2 maven-shade-plugin, 打包失败, Error creating shaded jar: null: IllegalArgumentException
很显然是包依赖版本问题, shade打包是用append的方法. 我出这个错误主要是因为引入了ES的sink, 参照网上的代码, 需要同时引入依赖flink-connector-elasticsearch5_2.11和org.elasticsearch.
引入org.elasticsearch主要是为了两个定义
importorg.elasticsearch.action.index.IndexRequest
importorg.elasticsearch.client.Requests
没有太好的解决版本, 试验了几个版本后, 发现这两个版本是匹配的
org.apache.flink flink-connector-elasticsearch5_2.11 flink 1.3.2
org.elasticsearch elasticsearch 5.5.0
sink参考代码如下:
val config =newjava.util.HashMap[String,String]
config.put("cluster.name","my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
// config.put("bulk.flush.max.actions", "1")
val transportAddresses =newjava.util.ArrayList[InetSocketAddress]
transportAddresses.add(newInetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))
transportAddresses.add(newInetSocketAddress(InetAddress.getByName("10.2.3.1"),9300))
qosSliceStream.addSink(newElasticsearchSink(config,transportAddresses,newElasticsearchSinkFunction[String] { def createIndexRequest(element:String): IndexRequest = { val json =newjava.util.HashMap[String,String]
json.put("data",element)
Requests.indexRequest().index("my-index").type
("my-type").source(json)
}
importorg.apache.flink.api.common.functions.RuntimeContext
importorg.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
def process(element:String,ctx: RuntimeContext,indexer: RequestIndexer):Unit= { indexer.add(createIndexRequest(element))
}
}))
3 scala: java list to scala list
问题出现在, 做flatMap, 想把一个二维的java数组展开用于流计算, 但是怎么写都不成, 按理说直接flatMap(x=>x)就可以了, 各种调试发现, log中一直有java.util.list字样, 后来怀疑这个List因为scala不认, 所以不能flatten.
查了一下, 果然, 需要做的也很简单.
import scala.collection.JavaConverters._
.flatMap(x => x.asScala.toList)
4 TimerException{java.lang.NoSuchMethodError: scala.runtime.LongRef.create(J)Lscala/runtime/LongRef;}
这个是scala编译版本和运行时版本不一致导致的, 可以看到2.10和2.11对这块的定义明显不一致.
为了保证编译时的 scala 版本和运行时的 scala 版本一致, 最好使用项目依赖的 scala-library 运行程序.
最终问题定位为, 提交flink任务的版本为1.3.0, 其使用的scala版本为2.10, 解决方法就是升级到1.3.2, 对应版本为2.11. 就好了
开源项目的版本使用还是挺重要的, 很多项目都会任性的升级, 即使是顶级项目. 所以大公司才有有专人维护开源代码吧.
推荐一个工具, IDEA中有个把包依赖用图形表示的功能, 非常实用, 可以支持索引, 和方便的排除操作.
5 Flink Kafka Connector的特别之处
这一点其实单独写一篇也够了, 不过还是先简单记录下. 先说现象, 准备把几个flink的任务做迁移, 需要迁到一个不同的物理集群上, 担心稳定性和部署方面的事, 就没有停掉之前的任务, 在新集群起了一个相同group ID的任务, 料想kafka会统一分配patitions给到两个任务, 我们最终得到的数据量是不变的, 每条消息只被处理一次. 但是, 事实并不是这个样子的, 几个小时后, 后端的磁盘就报警了, 看了一下发现, 日志量double啦, 也就是说同一个group name的两个flink 任务, 并没有共享任务.
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。