各种算子简介
以单词计数为例
先要将字符串数据解析成单词和次数 使用tuple2表示 第一个字段是单词 第二个字段是次数 次数初始值设置成1
flatmap
flatmap来做解析的工作 一行数据可能有多个单词
keyBy
将数据流按照单词字段即0号索引字段做分组 keyBy(int index) 得到一个以单词为key的tuple2数据流
timeWindow
在流上指定想要的窗口 并根据窗口中的数据计算结果 每5秒聚合一次单词数 每个窗口都是从零开始统计的 timeWindow 指定想要5秒的翻滚窗口(Tumble)
sum
第三个调用为每个key每个窗口指定了sum聚合函数 按照次数字段(即1号索引字段想家) 得到结果数据流 将每5秒输出一次 这5秒内每个单词出现的次数
将数据打印到控制台 所有算子操作(创建源、聚合、打印)只是构建了内部算子操作的图形 只有在execute被调用时才会在提交到集群或本地计算机上执行
执行报错 找不到代码异常
具体异常信息
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: org.apache.flink.streaming.scala.examples.remotejob.RemoteJobTest$$anon$2
解决方法
- 将当前目录文件夹打包成jar包
使用maven插件maven-jar-plugin
- 第三个参数指向该jar包
在FLink Web UI查看该任务的执行过程
编译异常
无效的标记
--add-exports=java.base/sun.net.util=ALL-UNNAMED
不支持hdfs文件系统
具体异常信息
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded
处理方式
- 下载 flink hadoop资源jar包
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-7.0/flink-shaded-hadoop-2-uber-2.7.5-7.0.jar
- 放入flink 安装包 lib目录下
每个节点都需要放上该jar包 然后重启flink集群环境
当前操作节点hadoop namenode节点为standby状态
具体详细信息
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby
解决方法
重新格式化2个namenode节点即可 具体详见
遗留问题
flink数据源来自于socket数据
问题是
Flink并没有监听到该socket数据 暂时还没有找到原因 了解的朋友们请联系我 指导我一下哦
如果本地环境是可以监听到的
后记
为了解决这个问题 我请教了下 “Apache Flink China社区”钉钉群里面的谢波老师 他告诉我: 通过java或scala一般创建本地执行环境 即 'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();' 很少有 'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(ip,port,jarfiles);' 这样用的 若使用flink分布式环境 那么通过web ui界面 上传jar包的方式来完成
这也就解释了为什么我没有找到相关资料 只能靠自己'摸着石头过河'了
结语
在了解一件新事物的时候 按照自己的想法 一番努力和挣扎之后 也许方向是错误的 但也会对它更进一步的了解了