4、输入数据 & 验证结果
前面验证了程序正常启动,接下来我们来验证输入和输出
先来监听输出,进入 Flink
的日志目录,接着通过 tail
命令监听任务执行者 TaskWorkder
(默认会启动一个任务执行者,所以编码为 0) 的日志输出
$ usr/local/Cellar/apache-flink/1.9.0/libexec/log $ tail -400f flink*-taskexecutor-0*.out
接着,在 nc -l 9000
对应的终端窗口中输入如下数据:
$ nc -l 9000 hello world test world test hello hello my world
最后就能够看到以下输出结果:
(hello,1) (world,1) (test,1) (world,2) (test,2) (hello,2) (hello,3) (my,1) (world,3)
每行字符以空格进行分割,然后分别进行汇总统计,得到的输出结果一致。
扩展阅读
如果你在官网阅览,应该也曾看到过 TimeWindow
时间窗口的例子,下面是 Demo
代码
publicclass SocketWindowWordCount { public static void main(String[] args) throws Exception { // the port to connect to String hostName = "127.0.0.1"; int port = 9000; // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream("localhost", port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { returnnew WordWithCount(a.getWord(), a.getCount() + b.getCount()); } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } }
这里的程序代码核心点在于,比之前的多了一个算子 timeWindow
,并且有两个参数,分别是时间窗口大小以及滑动窗口大小(Time size, Time slide
),下面是简单的输入和输出示意图:
由于滑动窗口大小是 1s,窗口是有重合的部分,然后每秒统计自己所在窗口的数据(5s 内传输过来的数据),可以看到第 6s 时,已经舍弃掉第 0s 输入的字符串数据。
小伙伴们也可以修改一下时间窗口大小和滑动窗口大小,然后输入自定义的数据,进行不同参数的设置,看下输出效果如何,是否有达到自己的预期。
这里先初步接触一下 时间(Time)和窗口(Window)概念,之后慢慢接触逐步加深理解吧。
总结
本文基于 Mac
系统、 Apache Flink 1.9
版本进行了项目搭建和 Demo
编写,介绍了 Suorce -> Transformation -> Sink
的流程。简单的实现了一个字符计数器,往套接字数据源 SocketTextStream
,源源不断的输入,然后进行统计出现的次数,如有疑惑或不对之处请与我讨论~