本地执行代码如下
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //创建一个tableEnvironment StreamTableEnvironment fsTableEnv = StreamTableEnvironment.getTableEnvironment(fsEnv); //读取数据源 DataStream ds1 = fsEnv.readTextFile("D:\output.txt"); ds1.print();
//数据转换
DataStream<Tuple2<String, String>> ds2 = ds1.map(new MapFunction<String, Tuple2<String, String>>() {
private static final long serialVersionUID = -3027796541526131219L;
@Override
public Tuple2<String, String> map(String s) throws Exception {
String[] splits = s.split(",");
return new Tuple2<>(splits[0], splits[1]);
}
});
ds2.print();
//DataStream 转sql, 指定字段名
Table table = fsTableEnv.fromDataStream(ds2, "id,name");
table.printSchema();
// DataStream<Tuple2<Boolean, Tuple>> stream1 = fsTableEnv.toRetractStream(table, Types.TUPLE(Types.STRING, Types.STRING)); DataStream<Tuple2<Boolean, Row>> sinkStream = fsTableEnv.toRetractStream(table,Row.class);
报错如下 Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/typeutils/TypeDeserializerAdapter at org.apache.flink.table.runtime.types.CRowTypeInfo.createSerializer(CRowTypeInfo.scala:57) at org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:864) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:308) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:293) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:680) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:253) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:212) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699) at Group1.test.DataStreamTable2.main(DataStreamTable2.java:61) Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.typeutils.TypeDeserializerAdapter
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
注意import的StreamExecutionEnvironment
// java 的头是
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
// scala的头是:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
如果不小心用了java的环境初始化context, 编译器并不能很好的提示, 会出现一堆看上去在java环境才能有的错误, 比如:
missing parameter type for expanded function
cannot resolve symbol with such signature
类似的很多找不到定义, 不明白类型等等错误, 所以写scala程序的第一步一定要确认StreamExecutionEnvironment是不是选的对
2 maven-shade-plugin, 打包失败, Error creating shaded jar: null: IllegalArgumentException
很显然是包依赖版本问题, shade打包是用append的方法. 我出这个错误主要是因为引入了ES的sink, 参照网上的代码, 需要同时引入依赖flink-connector-elasticsearch5_2.11和org.elasticsearch.
引入org.elasticsearch主要是为了两个定义
importorg.elasticsearch.action.index.IndexRequest
importorg.elasticsearch.client.Requests
没有太好的解决版本, 试验了几个版本后, 发现这两个版本是匹配的
org.apache.flink flink-connector-elasticsearch5_2.11 flink 1.3.2
org.elasticsearch elasticsearch 5.5.0
sink参考代码如下:
val config =newjava.util.HashMap[String,String]
config.put("cluster.name","my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
// config.put("bulk.flush.max.actions", "1")
val transportAddresses =newjava.util.ArrayList[InetSocketAddress]
transportAddresses.add(newInetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))
transportAddresses.add(newInetSocketAddress(InetAddress.getByName("10.2.3.1"),9300))
qosSliceStream.addSink(newElasticsearchSink(config,transportAddresses,newElasticsearchSinkFunction[String] { def createIndexRequest(element:String): IndexRequest = { val json =newjava.util.HashMap[String,String]
json.put("data",element)
Requests.indexRequest().index("my-index").type
("my-type").source(json)
}
importorg.apache.flink.api.common.functions.RuntimeContext
importorg.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
def process(element:String,ctx: RuntimeContext,indexer: RequestIndexer):Unit= { indexer.add(createIndexRequest(element))
}
}))
3 scala: java list to scala list
问题出现在, 做flatMap, 想把一个二维的java数组展开用于流计算, 但是怎么写都不成, 按理说直接flatMap(x=>x)就可以了, 各种调试发现, log中一直有java.util.list字样, 后来怀疑这个List因为scala不认, 所以不能flatten.
查了一下, 果然, 需要做的也很简单.
import scala.collection.JavaConverters._
.flatMap(x => x.asScala.toList)
4 TimerException{java.lang.NoSuchMethodError: scala.runtime.LongRef.create(J)Lscala/runtime/LongRef;}
这个是scala编译版本和运行时版本不一致导致的, 可以看到2.10和2.11对这块的定义明显不一致.
为了保证编译时的 scala 版本和运行时的 scala 版本一致, 最好使用项目依赖的 scala-library 运行程序.
最终问题定位为, 提交flink任务的版本为1.3.0, 其使用的scala版本为2.10, 解决方法就是升级到1.3.2, 对应版本为2.11. 就好了
开源项目的版本使用还是挺重要的, 很多项目都会任性的升级, 即使是顶级项目. 所以大公司才有有专人维护开源代码吧.
推荐一个工具, IDEA中有个把包依赖用图形表示的功能, 非常实用, 可以支持索引, 和方便的排除操作.
5 Flink Kafka Connector的特别之处
这一点其实单独写一篇也够了, 不过还是先简单记录下. 先说现象, 准备把几个flink的任务做迁移, 需要迁到一个不同的物理集群上, 担心稳定性和部署方面的事, 就没有停掉之前的任务, 在新集群起了一个相同group ID的任务, 料想kafka会统一分配patitions给到两个任务, 我们最终得到的数据量是不变的, 每条消息只被处理一次. 但是, 事实并不是这个样子的, 几个小时后, 后端的磁盘就报警了, 看了一下发现, 日志量double啦, 也就是说同一个group name的两个flink 任务, 并没有共享任务.
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。