Flink是什么
Flink 是 Apache 基金会旗下的一个开源大数据处理框架。目前,Flink 已经成为各大公司
大数据实时处理的发力重点,特别是国内以阿里为代表的一众互联网大厂都在全力投入,为
Flink 社区贡献了大量源码。如今 Flink 已被很多人认为是大数据实时处理的方向和未来,许多
公司也都在招聘和储备掌握 Flink 技术的人才。
Flink 的主要应用场景,就是处理大规模的数据流。那为什么一定要用 Flink
呢?数据处理还有没有其他的方式?
批处理和流处理
数据处理有不同的方式。
对于具体应用来说,有些场景数据是一个一个来的,是一组有序的数据序列,我们把它叫
作“数据流”;而有些场景的数据,本身就是一批同时到来,是一个有限的数据集,这就是批
量数据(有时也直接叫数据集)。
容易想到,处理数据流,当然应该“来一个就处理一个”,这种数据处理模式就叫作流处理;因为这种处理是即时的,所以也叫实时处理。与之对应,处理批量数据自然就应该一批读
入、一起计算,这种方式就叫作批处理,也叫作离线处理。
Flink 的核心特性
Flink 区别与传统数据处理框架的特性如下。
- 高吞吐和低延迟。每秒处理数百万个事件,毫秒级延迟。
- 结果的准确性。Flink 提供了事件时间(event-time)和处理时间(processing-time)语义。对于乱序事件流,事件时间语义仍然能提供一致且准确的结果。
- 精确一次(exactly-once)的状态一致性保证。
- 可以连接到最常用的存储系统,如 Apache Kafka、Apache Cassandra、Elasticsearch、JDBC、Kinesis 和(分布式)文件系统,如 HDFS 和 S3。
- 高可用。本身高可用的设置,加上与 K8s,YARN 和Mesos 的紧密集成,再加上从故 障中快速恢复和动态扩展任务的能力,Flink 能做到以极少的停机时间 7×24 全天候 运行。
- 能够更新应用程序代码并将作业(jobs)迁移到不同的 Flink 集群,而不会丢失应用 程序的状态。
分层 API
最底层级的抽象仅仅提供了有状态流,它将处理函数(Process Function)嵌入到了
DataStream API 中。底层处理函数(Process Function)与 DataStream API 相集成,可以对某
些操作进行抽象,它允许用户可以使用自定义状态处理来自一个或多个数据流的事件,且状态
具有一致性和容错保证。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以
处理复杂的计算。
实际上,大多数应用并不需要上述的底层抽象,而是直接针对核心 API(Core APIs) 进行编程,比如 DataStream API(用于处理有界或无界流数据)以及 DataSet API(用于处理有界数据集)。这些 API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations)、连接(joins)、聚合(aggregations)、窗口(windows)操作等。DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。这些 API 处理的数据类型以类(classes)的形式由各自的编程语言所表示。
Table API 是以表为中心的声明式编程,其中表在表达流数据时会动态变化。Table API 遵
循关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时 API 提供可比
较的操作,例如 select、join、group-by、aggregate 等。
尽管 Table API 可以通过多种类型的用户自定义函数(UDF)进行扩展,仍不如核心 API
更具表达能力,但是使用起来代码量更少,更加简洁。除此之外,Table API 程序在执行之前
会使用内置优化器进行优化。
我们可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与
DataStream 以及 DataSet 混合使用。
Flink 提供的最高层级的抽象是 SQL。这一层抽象在语法与表达能力上与 Table API 类似,
但是是以 SQL 查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询
可以直接在 Table API 定义的表上执行。
目前 Flink SQL 和 Table API 还在开发完善的过程中,很多大厂都会二次开发符合自己需
要的工具包。而 DataSet 作为批处理 API 实际应用较少,2020 年 12 月 8 日发布的新版本 1.12.0,
已经完全实现了真正的流批一体,DataSet API 已处于软性弃用(soft deprecated)的状态。用
Data Stream API 写好的一套代码, 即可以处理流数据, 也可以处理批数据,只需要设置不同的
执行模式。这与之前版本处理有界流的方式是不一样的,Flink 已专门对批处理数据做了优化
处理。本书中以介绍 DataStream API 为主,采用的是目前最新版本 Flink 1.13.0。
HelloWorld快速上手
创建maven项目添加依赖
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.13.0</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.12</scala.binary.version> <slf4j.version>1.7.30</slf4j.version> </properties> <dependencies> <!-- 引入 Flink 相关依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 引入日志管理相关依赖--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.14.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
在resources下添加log4j.properties
log4j.rootLogger=error, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
在当前项目下创建input目录,新建文件words.txt
hello world hello flink hello java
最原始的批处理,不建议使用
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class BatchWordCount { public static void main(String[] args) throws Exception{ //创建环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //读取文件 DataSource<String> words = env.readTextFile("input/words.txt"); //转换为二元组 FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = words.flatMap((String line, Collector<Tuple2<String, Long>> out) -> { for (String word : line.split(" ")) { out.collect(Tuple2.of(word, 1l)); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)); //按照key进行分组 UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0); //聚合统计 AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1); //打印结果 sum.print(); } }
最新api接口的批处理
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; public class BoundedStreamWordCount { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lineDSS = env.readTextFile("input/words.txt"); SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS .flatMap((String line, Collector<String> words) -> { Arrays.stream(line.split(" ")).forEach(words::collect); }) .returns(Types.STRING) .map(word -> Tuple2.of(word, 1L)) .returns(Types.TUPLE(Types.STRING, Types.LONG)); KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0); SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS.sum(1); result.print(); env.execute(); } }
实时流式处理,由于是试试,我们通过给端口发数据模拟实时数据
nc -lk 7777
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; public class StreamWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lineDSS = env.socketTextStream("localhost", 7777); SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS .flatMap((String line, Collector<String> words) -> { Arrays.stream(line.split(" ")).forEach(words::collect); }) .returns(Types.STRING) .map(word -> Tuple2.of(word, 1L)) .returns(Types.TUPLE(Types.STRING, Types.LONG)); KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne .keyBy(t -> t.f0); SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS .sum(1); System.out.println("-----------"); result.print(); env.execute(); } }
结语
感谢尚硅谷