从零编写第一个 Flink 应用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink 是一个流计算引擎。本文主要介绍如何从零编写一个统计单词出现次数的 Flink 应用(后面简称为 WordCount)。由于 Flink 概念很多,对初学者会造成极大困扰,所以本文不会涉及太多概念,即使没有 Flink 基础也可以完成本文的示例应用。

Apache Flink 是一个流计算引擎。本文主要介绍如何从零编写一个统计单词出现次数的 Flink 应用(后面简称为 WordCount)。由于 Flink 概念很多,对初学者会造成极大困扰,所以本文不会涉及太多概念,即使没有 Flink 基础也可以完成本文的示例应用。


开发环境准备


要编写 Flink 应用,需要安装 Java (Java 8 或 Java 11) 和 Maven。


你可以使用下面的命令查看当前已安装的 Java 版本信息:


$ java-versionjava version "1.8.0_181"Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)


使用下面的命令查看当前已安装的 Maven 版本信息:


$ aliyun mvn -versionApache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/Cellar/maven/3.6.3_1/libexec
Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre
Default locale: en_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.16", arch: "x86_64", family: "mac"

此外,我也建议你使用 IntelliJ IDEA 来编写 Java 应用,我当前的版本是 2020.3.2。


项目初始化


我们可以使用 Flink  Maven Archetype 快速创建一个项目。该模板中包含了 Flink 应用需要的 flink-streaming-java、flink-clients 等依赖。


$ mvn archetype:generate  \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.12.0 \
-DgroupId=com.flink.demo \
-DartifactId=first-flink-project \
-Dversion=0.0.1-SNAPSHOT \
-Dpackage=com.flink.demo \
-DinteractiveMode=false


你可以将上面的 groupId、artifactId、package 等参数改为你喜欢的值。


使用上面参数生成的项目结构如下:


$ tree first-flink-project
first-flink-project
├── first-flink-project.iml
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── flink
        │           └── demo
        │               ├── BatchJob.java
        │               └── StreamingJob.java
        └── resources
            └── log4j2.properties


其中 StreamingJob.java 和  BatchJob.java 是 Flink 模板中默认的流处理任务和批处理任务代码示例,我们先不管它。


接下来我们使用 IntelliJ IDEA 导入项目,你也也可以使用 idea 命令直接打开项目。


$ idea first-flink-project


然后我们简单修改一下 pom.xml,删除其中的 lifecycle-mapping 插件,这是为了解决 Eclipse 报错而添加的,在  IntelliJ IDEA 中不需要。


编写 Flink 代码


src/main/java/com/flink/demo 中添加一个 WindowWordCount.java 类,输入下面的这段代码。这就是 WordCount 应用的所有代码了,现在看不懂没有关系,后面我会一一讲解。


packagecom.flink.demo;
importorg.apache.flink.api.common.functions.FlatMapFunction;
importorg.apache.flink.api.java.tuple.Tuple2;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.datastream.DataStreamSource;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
importorg.apache.flink.streaming.api.windowing.time.Time;
importorg.apache.flink.util.Collector;
publicclassWindowWordCount {
publicstaticvoidmain(String[] args) throwsException {
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String>source=env.socketTextStream("localhost", 9999);
DataStream<Tuple2<String, Integer>>dataStream=source                .flatMap(newSplitter())
                .keyBy(value->value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);
dataStream.print();
env.execute("Window WordCount");
    }
publicstaticclassSplitterimplementsFlatMapFunction<String, Tuple2<String, Integer>> {
@OverridepublicvoidflatMap(Stringsentence, Collector<Tuple2<String, Integer>>out) throwsException {
for (Stringword : sentence.split(" ")) {
out.collect(newTuple2<String, Integer>(word, 1));
            }
        }
    }
}


这段代码的主要功能就是从 Socket 中读取数据(即单词),然后每 5s 统计一次所有单词出现的次数,然后输出。


WordCount 应用的完整代码可以在这里看到 first-flink-project


在讲解代码前,我们可以先运行看看效果。


运行 Flink 应用


首先我们使用 netcat 启动一个 Socket 输入流:


$ nc-lk9999


然后通过 IntelliJ IDEA  来运行 WindowWordCount 类:


image.png


第一次运行可能有报错 NoClassDefFoundError :


Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction
 at java.lang.Class.getDeclaredMethods0(Native Method)
 at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
 at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
 at java.lang.Class.getMethod0(Class.java:3018)
 at java.lang.Class.getMethod(Class.java:1784)
 at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
 at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)

原因在 pom.xml 中, flink-streaming-javaflink-clients 这两个依赖的 scope 是 provided,所以它们不会被打包到 jar 中。这些依赖是 flink 的核心依赖,当把 jar 包部署到 flink 上运行时,运行环境中已经内置这些依赖了,所以我们也不需要把它们打包到 jar 中。


解决这个报错的办法就是在  IntelliJ IDEA  运行配置中勾选 Include dependencies with "Provided" scope


image.png


成功运行后,就可以输入数据进入测试了。


image.png


如图所示,你可以看到输出了 helloworld 等单词出现的次数。


代码分析


创建执行环境


main 方法中的第一行代码用于创建执行环境。执行环境可以用来定义任务属性(如并发度)、创建数据源以及启动任务。


StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();


创建数据源


接下来就是创建数据源,数据源的作用是从外部系统如 Kafka、Rabbit MQ 或日志服务等系统中接收数据,然后将数据传输到 Flink 任务中。在 WordCount 应用中,我们从本地端口号为 9999 的 socket 中读取数据。env.socketTextStream 数据源默认是按行读取输入的数据,也就是说在 netcat 启动的输入流中输入一行数据,按下回撤(即 \n 字符),env.socketTextStream 就会读取这一行数据。当然,你也可以自定义数据的分隔符。


DataStreamSource<String>source=env.socketTextStream("localhost", 9999);

有了数据后,我们就可以对数据进行处理。Flink 提供了大量的 算子(operators) 用来处理数据,比如 Map、FlatMap、KeyBy、Reduce、Window 等等。


处理数据 - 转换


数据处理第一步就是使用 Flink 的 flatMap 算子处理输入的数据。我们使用了自定义的 Splitter 类将输入的一行数据按空格拆分为多个数据。因为我们可能在一行中输入多个单词。

publicstaticclassSplitterimplementsFlatMapFunction<String, Tuple2<String, Integer>> {
@OverridepublicvoidflatMap(Stringsentence, Collector<Tuple2<String, Integer>>out) throwsException {
for (Stringword : sentence.split(" ")) {
out.collect(newTuple2<String, Integer>(word, 1));
            }
        }
    }

Splitter 类继承了 FlatMapFunction 类,然后重载了  flatMap 方法。flatMap 有两个参数,第一个参数 sentence 就是输入的一行数据,out 是用来保存处理结果的变量 。out 中的数据类型是 Tuple2<String, Integer> ,最终我们输入的所有数据都会保存在 out 中。


举个例子,假设输入的第一行数据(也就是 sentence)为 hello hello world,则当前 out 为:

(hello, 1)
(hello, 1)
(world, 1)


接下来再输入一行新的数据 java flink,则最新的 out 为:

(hello, 1)
(hello, 1)
(world, 1)
(java, 1)
(flink, 1)


处理数据 - 分组


使用 flatMap 处理了输入的数据后,得到是输入的所有单词。要统计单词出现的次数,接下来我们还需要对单词进行分组,这时就可以使用 flink 的 keyBy 算子。


.keyBy(value->value.f0)


上面是 Lambda 的写法, 可能不方便理解。上面这行代码就等同于下面的代码:

.keyBy(newKeySelector<Tuple2<String, Integer>, Object>() {
@OverridepublicStringgetKey(Tuple2<String, Integer>value) throwsException {
returnvalue.f0;
                    }
})


这下看起来应该就清晰很多了,在 keyBy 中,我们主要是实现了 KeySelector 类并重载了 getKey,最终返回分组的 key,也就是输入的单词,这样 keyBy 算子就会根据单词去分组。


处理数据 - 窗口


在 WordCount 应用中,我们的需求是 5s 统计一次单词次数,所以需要用到 Flink 的窗口。这里我们使用的 TumblingProcessingTimeWindows ,代码如下:


.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))


其作用是每 5s 对数据流做一次切分。


可能理解起来比较抽象,举个例子,假设输入的单词及时间如下:

2021-01-2400:00:00hello2021-01-2400:00:00hello2021-01-2400:00:01word2021-01-2400:00:06java2021-01-2400:00:07flink2021-01-2400:00:07hello2021-01-2400:00:13hello

则会生成 3 个窗口(window),分别如下:

window1:   [ (hello, 1), (hello, 1), (wold, 1) ]
window2:   [ (java, 1), (flink, 1), (hello, 1) ]
window3:   [ (hello, 1) ]

处理数据 - 聚合


对数据按照时间窗口进行分组后,接下来就可以对每个窗口中每个分组的单词进行聚合了。在 WordCount 应用中,聚合就是对单词出现次数求和,求和也很简单,使用 sum 就可以:


.sum(1)


sum 会作用域于每个 window,这样就可以分别求出每个 window 中的单词出现次数,我们就可以计算出 5s 内每个单词出现的次数了。


输出结果


数据处理完毕后,我们就可以将数据输出。通常我们会将数据输出到另一个外部系统,比如 Kafka、Rabbit MQ 或日志服务等。


这里我们直接使用 print 将数据打印出来。


dataStream.print();


运行 Flink 任务


最后,我们通过 env.execute() 方法来运行任务。


env.execute("Window WordCount");


其参数是任务名称。


Flink 任务只有在 execute() 被调用后,才会提交到集群或本地计算机上执行。execute() 调用前,Flink 只是生成了数据处理流图。


在本地 Flink 集群运行任务


现在我们已经可以通过 IntelliJ IDEA 来运行应用,你肯定也很想知道如何在本地 Flink 集群中运行 WordCount 应用。


其实也很简单。


编译代码


编译代码前,需要先修改  pom.xml 中的 mainClass。生成项目时默认是 com.flink.demo.StreamingJob,我们需要将其修改为 com.flink.demo.WindowWordCount ,这样通过 java -jar <name>.jar 命令执行 jar 包时,就会运行 WindowWordCount 类了。


<transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.flink.demo.WindowWordCount</mainClass></transformer></transformers>

修改后,使用 maven 编译项目:


$ mvn  clean package


编译成功后,会在 target 目录下生成 first-flink-project-0.0.1-SNAPSHOT.jar 文件。


启动 Flink 集群


接下来我们在本地启动 Flink 集群。如果你之前已经启动了,则不用执行该步骤了。


启动集群步骤可以参考 本地模式安装 ,下面我简单讲解一下。


首先需要先下载 Flink 集群的代码,你可以在这个页面 Flink Downloads 找到所有版本的 Flink。


下载后解压:


$ tar-xzf flink-1.12.1-bin-scala_2.11.tgz
$ cd flink-1.12.1


然后启动集群。flink 代码中附带了一个 bash 脚本,可以用来启动集群。


$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.


提交任务


在本地启动 Flink 集群后,然后我们就可以通过 flink run 命令将 WordCount 任务提交到本地集群了:


$ ./bin/flink run jobs/first-flink-project-0.0.1-SNAPSHOT.jar


jobs/first-flink-project-0.0.1-SNAPSHOT.jar 是前面编译后的 jar 包,我将其移到了 jobs 目录下。


任务启动后,可以在 log/ 目录下查看对应日志。


image.png


此外,你也可以通过 Web UI 来管理集群,在浏览器打开 localhost:8081 就可以看到管理界面。


image.png


停止集群


任务运行结束后,你可以通过 stop-cluster.sh 这个脚本来停止 flink 集群。


$ ./bin/stop-cluster.sh


总结


至此,我们的 WordCount 应用就开发并部署完成了。Flink 本身的概念非常多,本文基本没有涉及概念介绍,只是讲解了程序运行过程,希望通过本文的介绍, 能让你对 Flink 有个初步的印象。这样接下来再去接触 Flink 的概念,应该就会容易一些了。


参考


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
402 1
|
20天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
368 2
探索Flink动态CEP:杭州银行的实战案例
|
6月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
895 7
阿里云实时计算Flink在多行业的应用和实践
|
7月前
|
SQL 搜索推荐 OLAP
Flink 流批一体场景应用及落地情况
本文由阿里云 Flink 团队苏轩楠老师撰写,旨在介绍 Flink 流批一体在几个常见场景下的应用。
68158 11
Flink 流批一体场景应用及落地情况
|
5月前
|
机器学习/深度学习 监控 Serverless
Serverless 应用的监控与调试问题之Flink在内部使用的未来规划,以及接下来有什么打算贡献社区的创新技术
Serverless 应用的监控与调试问题之Flink在内部使用的未来规划,以及接下来有什么打算贡献社区的创新技术
|
5月前
|
机器学习/深度学习 监控 大数据
Serverless 应用的监控与调试问题之Flink在整个开源大数据生态中应该如何定位,差异化该如何保持
Serverless 应用的监控与调试问题之Flink在整个开源大数据生态中应该如何定位,差异化该如何保持
|
5月前
|
存储 监控 Serverless
Serverless 应用的监控与调试问题之Pravega和Flink实现端到端的auto-scaling要如何操作
Serverless 应用的监控与调试问题之Pravega和Flink实现端到端的auto-scaling要如何操作
|
5月前
|
SQL 监控 大数据
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
|
5月前
|
存储 监控 Cloud Native
Serverless 应用的监控与调试问题之Flink流批一体在架构层面有什么演进
Serverless 应用的监控与调试问题之Flink流批一体在架构层面有什么演进
|
5月前
|
存储 监控 Serverless
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决