从零编写第一个 Flink 应用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink 是一个流计算引擎。本文主要介绍如何从零编写一个统计单词出现次数的 Flink 应用(后面简称为 WordCount)。由于 Flink 概念很多,对初学者会造成极大困扰,所以本文不会涉及太多概念,即使没有 Flink 基础也可以完成本文的示例应用。

Apache Flink 是一个流计算引擎。本文主要介绍如何从零编写一个统计单词出现次数的 Flink 应用(后面简称为 WordCount)。由于 Flink 概念很多,对初学者会造成极大困扰,所以本文不会涉及太多概念,即使没有 Flink 基础也可以完成本文的示例应用。


开发环境准备


要编写 Flink 应用,需要安装 Java (Java 8 或 Java 11) 和 Maven。


你可以使用下面的命令查看当前已安装的 Java 版本信息:


$ java-versionjava version "1.8.0_181"Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)


使用下面的命令查看当前已安装的 Maven 版本信息:


$ aliyun mvn -versionApache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/Cellar/maven/3.6.3_1/libexec
Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre
Default locale: en_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.16", arch: "x86_64", family: "mac"

此外,我也建议你使用 IntelliJ IDEA 来编写 Java 应用,我当前的版本是 2020.3.2。


项目初始化


我们可以使用 Flink  Maven Archetype 快速创建一个项目。该模板中包含了 Flink 应用需要的 flink-streaming-java、flink-clients 等依赖。


$ mvn archetype:generate  \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.12.0 \
-DgroupId=com.flink.demo \
-DartifactId=first-flink-project \
-Dversion=0.0.1-SNAPSHOT \
-Dpackage=com.flink.demo \
-DinteractiveMode=false


你可以将上面的 groupId、artifactId、package 等参数改为你喜欢的值。


使用上面参数生成的项目结构如下:


$ tree first-flink-project
first-flink-project
├── first-flink-project.iml
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── flink
        │           └── demo
        │               ├── BatchJob.java
        │               └── StreamingJob.java
        └── resources
            └── log4j2.properties


其中 StreamingJob.java 和  BatchJob.java 是 Flink 模板中默认的流处理任务和批处理任务代码示例,我们先不管它。


接下来我们使用 IntelliJ IDEA 导入项目,你也也可以使用 idea 命令直接打开项目。


$ idea first-flink-project


然后我们简单修改一下 pom.xml,删除其中的 lifecycle-mapping 插件,这是为了解决 Eclipse 报错而添加的,在  IntelliJ IDEA 中不需要。


编写 Flink 代码


src/main/java/com/flink/demo 中添加一个 WindowWordCount.java 类,输入下面的这段代码。这就是 WordCount 应用的所有代码了,现在看不懂没有关系,后面我会一一讲解。


packagecom.flink.demo;
importorg.apache.flink.api.common.functions.FlatMapFunction;
importorg.apache.flink.api.java.tuple.Tuple2;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.datastream.DataStreamSource;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
importorg.apache.flink.streaming.api.windowing.time.Time;
importorg.apache.flink.util.Collector;
publicclassWindowWordCount {
publicstaticvoidmain(String[] args) throwsException {
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String>source=env.socketTextStream("localhost", 9999);
DataStream<Tuple2<String, Integer>>dataStream=source                .flatMap(newSplitter())
                .keyBy(value->value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);
dataStream.print();
env.execute("Window WordCount");
    }
publicstaticclassSplitterimplementsFlatMapFunction<String, Tuple2<String, Integer>> {
@OverridepublicvoidflatMap(Stringsentence, Collector<Tuple2<String, Integer>>out) throwsException {
for (Stringword : sentence.split(" ")) {
out.collect(newTuple2<String, Integer>(word, 1));
            }
        }
    }
}


这段代码的主要功能就是从 Socket 中读取数据(即单词),然后每 5s 统计一次所有单词出现的次数,然后输出。


WordCount 应用的完整代码可以在这里看到 first-flink-project


在讲解代码前,我们可以先运行看看效果。


运行 Flink 应用


首先我们使用 netcat 启动一个 Socket 输入流:


$ nc-lk9999


然后通过 IntelliJ IDEA  来运行 WindowWordCount 类:


image.png


第一次运行可能有报错 NoClassDefFoundError :


Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction
 at java.lang.Class.getDeclaredMethods0(Native Method)
 at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
 at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
 at java.lang.Class.getMethod0(Class.java:3018)
 at java.lang.Class.getMethod(Class.java:1784)
 at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
 at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)

原因在 pom.xml 中, flink-streaming-javaflink-clients 这两个依赖的 scope 是 provided,所以它们不会被打包到 jar 中。这些依赖是 flink 的核心依赖,当把 jar 包部署到 flink 上运行时,运行环境中已经内置这些依赖了,所以我们也不需要把它们打包到 jar 中。


解决这个报错的办法就是在  IntelliJ IDEA  运行配置中勾选 Include dependencies with "Provided" scope


image.png


成功运行后,就可以输入数据进入测试了。


image.png


如图所示,你可以看到输出了 helloworld 等单词出现的次数。


代码分析


创建执行环境


main 方法中的第一行代码用于创建执行环境。执行环境可以用来定义任务属性(如并发度)、创建数据源以及启动任务。


StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();


创建数据源


接下来就是创建数据源,数据源的作用是从外部系统如 Kafka、Rabbit MQ 或日志服务等系统中接收数据,然后将数据传输到 Flink 任务中。在 WordCount 应用中,我们从本地端口号为 9999 的 socket 中读取数据。env.socketTextStream 数据源默认是按行读取输入的数据,也就是说在 netcat 启动的输入流中输入一行数据,按下回撤(即 \n 字符),env.socketTextStream 就会读取这一行数据。当然,你也可以自定义数据的分隔符。


DataStreamSource<String>source=env.socketTextStream("localhost", 9999);

有了数据后,我们就可以对数据进行处理。Flink 提供了大量的 算子(operators) 用来处理数据,比如 Map、FlatMap、KeyBy、Reduce、Window 等等。


处理数据 - 转换


数据处理第一步就是使用 Flink 的 flatMap 算子处理输入的数据。我们使用了自定义的 Splitter 类将输入的一行数据按空格拆分为多个数据。因为我们可能在一行中输入多个单词。

publicstaticclassSplitterimplementsFlatMapFunction<String, Tuple2<String, Integer>> {
@OverridepublicvoidflatMap(Stringsentence, Collector<Tuple2<String, Integer>>out) throwsException {
for (Stringword : sentence.split(" ")) {
out.collect(newTuple2<String, Integer>(word, 1));
            }
        }
    }

Splitter 类继承了 FlatMapFunction 类,然后重载了  flatMap 方法。flatMap 有两个参数,第一个参数 sentence 就是输入的一行数据,out 是用来保存处理结果的变量 。out 中的数据类型是 Tuple2<String, Integer> ,最终我们输入的所有数据都会保存在 out 中。


举个例子,假设输入的第一行数据(也就是 sentence)为 hello hello world,则当前 out 为:

(hello, 1)
(hello, 1)
(world, 1)


接下来再输入一行新的数据 java flink,则最新的 out 为:

(hello, 1)
(hello, 1)
(world, 1)
(java, 1)
(flink, 1)


处理数据 - 分组


使用 flatMap 处理了输入的数据后,得到是输入的所有单词。要统计单词出现的次数,接下来我们还需要对单词进行分组,这时就可以使用 flink 的 keyBy 算子。


.keyBy(value->value.f0)


上面是 Lambda 的写法, 可能不方便理解。上面这行代码就等同于下面的代码:

.keyBy(newKeySelector<Tuple2<String, Integer>, Object>() {
@OverridepublicStringgetKey(Tuple2<String, Integer>value) throwsException {
returnvalue.f0;
                    }
})


这下看起来应该就清晰很多了,在 keyBy 中,我们主要是实现了 KeySelector 类并重载了 getKey,最终返回分组的 key,也就是输入的单词,这样 keyBy 算子就会根据单词去分组。


处理数据 - 窗口


在 WordCount 应用中,我们的需求是 5s 统计一次单词次数,所以需要用到 Flink 的窗口。这里我们使用的 TumblingProcessingTimeWindows ,代码如下:


.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))


其作用是每 5s 对数据流做一次切分。


可能理解起来比较抽象,举个例子,假设输入的单词及时间如下:

2021-01-2400:00:00hello2021-01-2400:00:00hello2021-01-2400:00:01word2021-01-2400:00:06java2021-01-2400:00:07flink2021-01-2400:00:07hello2021-01-2400:00:13hello

则会生成 3 个窗口(window),分别如下:

window1:   [ (hello, 1), (hello, 1), (wold, 1) ]
window2:   [ (java, 1), (flink, 1), (hello, 1) ]
window3:   [ (hello, 1) ]

处理数据 - 聚合


对数据按照时间窗口进行分组后,接下来就可以对每个窗口中每个分组的单词进行聚合了。在 WordCount 应用中,聚合就是对单词出现次数求和,求和也很简单,使用 sum 就可以:


.sum(1)


sum 会作用域于每个 window,这样就可以分别求出每个 window 中的单词出现次数,我们就可以计算出 5s 内每个单词出现的次数了。


输出结果


数据处理完毕后,我们就可以将数据输出。通常我们会将数据输出到另一个外部系统,比如 Kafka、Rabbit MQ 或日志服务等。


这里我们直接使用 print 将数据打印出来。


dataStream.print();


运行 Flink 任务


最后,我们通过 env.execute() 方法来运行任务。


env.execute("Window WordCount");


其参数是任务名称。


Flink 任务只有在 execute() 被调用后,才会提交到集群或本地计算机上执行。execute() 调用前,Flink 只是生成了数据处理流图。


在本地 Flink 集群运行任务


现在我们已经可以通过 IntelliJ IDEA 来运行应用,你肯定也很想知道如何在本地 Flink 集群中运行 WordCount 应用。


其实也很简单。


编译代码


编译代码前,需要先修改  pom.xml 中的 mainClass。生成项目时默认是 com.flink.demo.StreamingJob,我们需要将其修改为 com.flink.demo.WindowWordCount ,这样通过 java -jar <name>.jar 命令执行 jar 包时,就会运行 WindowWordCount 类了。


<transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.flink.demo.WindowWordCount</mainClass></transformer></transformers>

修改后,使用 maven 编译项目:


$ mvn  clean package


编译成功后,会在 target 目录下生成 first-flink-project-0.0.1-SNAPSHOT.jar 文件。


启动 Flink 集群


接下来我们在本地启动 Flink 集群。如果你之前已经启动了,则不用执行该步骤了。


启动集群步骤可以参考 本地模式安装 ,下面我简单讲解一下。


首先需要先下载 Flink 集群的代码,你可以在这个页面 Flink Downloads 找到所有版本的 Flink。


下载后解压:


$ tar-xzf flink-1.12.1-bin-scala_2.11.tgz
$ cd flink-1.12.1


然后启动集群。flink 代码中附带了一个 bash 脚本,可以用来启动集群。


$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.


提交任务


在本地启动 Flink 集群后,然后我们就可以通过 flink run 命令将 WordCount 任务提交到本地集群了:


$ ./bin/flink run jobs/first-flink-project-0.0.1-SNAPSHOT.jar


jobs/first-flink-project-0.0.1-SNAPSHOT.jar 是前面编译后的 jar 包,我将其移到了 jobs 目录下。


任务启动后,可以在 log/ 目录下查看对应日志。


image.png


此外,你也可以通过 Web UI 来管理集群,在浏览器打开 localhost:8081 就可以看到管理界面。


image.png


停止集群


任务运行结束后,你可以通过 stop-cluster.sh 这个脚本来停止 flink 集群。


$ ./bin/stop-cluster.sh


总结


至此,我们的 WordCount 应用就开发并部署完成了。Flink 本身的概念非常多,本文基本没有涉及概念介绍,只是讲解了程序运行过程,希望通过本文的介绍, 能让你对 Flink 有个初步的印象。这样接下来再去接触 Flink 的概念,应该就会容易一些了。


参考


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
5月前
|
SQL 分布式计算 流计算
Flink在各大互联网公司的应用
Flink在各大互联网公司的应用
34 0
|
4月前
|
机器学习/深度学习 搜索推荐 算法
优秀的推荐系统架构与应用:从YouTube到Pinterest、Flink和阿里巴巴
优秀的推荐系统架构与应用:从YouTube到Pinterest、Flink和阿里巴巴
106 0
|
21天前
|
SQL 运维 DataWorks
Flink CDC在阿里云DataWorks数据集成应用实践
本文整理自阿里云 DataWorks 数据集成团队的高级技术专家 王明亚(云时)老师在 Flink Forward Asia 2023 中数据集成专场的分享。
492 2
Flink CDC在阿里云DataWorks数据集成应用实践
|
24天前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
81 0
|
3月前
|
资源调度 Kubernetes Java
Flink--day02、Flink部署(Yarn集群搭建下的会话模式部署、单作业模式部署、应用模式部署)
Flink--day022、Flink部署(Yarn集群搭建下的会话模式部署、单作业模式部署、应用模式部署)
145 5
|
3月前
|
机器学习/深度学习 消息中间件 算法
Flink ML的新特性解析与应用
本文整理自阿里巴巴算法专家赵伟波,在 Flink Forward Asia 2023 AI特征工程专场的分享。
129269 5
Flink ML的新特性解析与应用
|
7月前
|
资源调度 Kubernetes Java
Flink--2、Flink部署(Yarn集群搭建下的会话模式部署、单作业模式部署、应用模式部署)
Flink--2、Flink部署(Yarn集群搭建下的会话模式部署、单作业模式部署、应用模式部署)
|
3月前
|
SQL 存储 人工智能
Flink 在蚂蚁实时特征平台的深度应用
本文整理自蚂蚁集团高级技术专家赵亮星云,在 Flink Forward Asia 2023 AI 特征工程专场的分享。
530 3
Flink 在蚂蚁实时特征平台的深度应用
|
4月前
|
存储 NoSQL MongoDB
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
本文整理自阿里云 Flink 团队归源老师关于阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference 的研究。
46945 2
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
|
4月前
|
供应链 算法 新能源
基于 Flink 的实时数仓在曹操出行运营中的应用
本文整理自曹操出行基础研发部负责人史何富,在 Flink Forward Asia 2023 主会场的分享。
90438 2
基于 Flink 的实时数仓在曹操出行运营中的应用