(1)Flink处理的数据集类型
任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。
数据可以被作为 无界 或者 有界 流来处理。
有界数据集:
有界数据集具有时间边界,在处理过程中数据一定会在某个时间范围内起始和结束,有可能是一分钟,也有可能是一天内的交易数据。对有界数据集的数据处理方式被称为批计算。
无界数据集:
数据从一开始生成就一直持续不断地产生新的数据,因此数据是没有边界的,例如服务器的日志、传感器信号数据等。和批处理数据方式对应,对无界数据集的数据处理方式被称为流式数据处理。
统一数据处理:
Spark和Flink
(2)Flink核心编程接口
Flink 为流式/批式处理应用程序的开发提供了不同级别的抽象。
Flink根据数据集类型的不同将核心数据处理接口分为两大类型,一类是支持批计算的接口DataSet API,另外一类是支持流计算的接口DataStream API。同时Flink将数据处理接口抽象成四层,由上向下分别为SQL API、Table API、DataStream/DataSet API以及Stateful StreamProcessing API。用户可以根据需要选择任意一层抽象接口来开发Flink应用。
Flink SQL:
Flink提供了统一的SQL API的完成对批计算和流计算的处理,对SQL API还在逐步完善中。
Table API:
Table API将内存中的DataStream和DataSet数据集在原有的基础上增加Schema信息,将数据类型统一抽象成表的结构,然后通过Table API提供的接口处理对应的数据集。SQL API可以直接查询Table API中注册表的数据表。Table API构建在DataStream和DataSet之上的同时,提供了大量面向领域语言的编程接口。例如GroupByKey、 Join等操作符,提供给用户一种更加友好的处理数据集的方式。除此之外,Table API在转换为DataStream和DataSet的数据处理过程中,也应用了大量的优化规则对数据逻辑进行了优化。同时Table API中的table可以和DataStream和DataSet之间进行相互转换。
DataStream API 和DataSet API:
DataStream API和DataSet API主要面向具有开发经验的用户,用户可以使用DataStream API处理无界流数据,使用DataSet API处理批量数据。
Stateful Stream Process API:
Stateful Stream Process APIFlinkStatefu是中处理 Stream最底层的接口,用户可以使用 Stateful Stream Process接口操作状态、时间等底层数据。
(3)Flink编程结构
(3.1)ExecutionEnvironment
运行Flink程序的第一步就是获取相应的执行环境,执行环境决定了程序是在本地环境执行还是集群环境运行。批量处理和流处理分别使用不同的ExecutionEnvironment。有三种方式获取程序的执行环境
以官方提供的流处理案例:
获取ExecutionEnvironment方式(一):
默认的执行环境创建方式,它会根据上下文去创建正确的ExecutionEnvironment,如果你在IDE中执行程序或者将程序作为一个常规的Java/Scala程序执行,那么它将为你创建一个本地的环境,你的程序将在本地执行。如果你将你的程序打成jar包,并通过命令行调用它,那么Flink集群管理器将执行你的main方法并且getExecutionEnvironment()方法将为你的程序在集群中执行生成一个执行环境。所以getExecutionEnvironment方法在本地执行或者集群执行都可用这个方法。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
获取ExecutionEnvironment方式(二):
本地执行环境创建方式,当程序在IDE中运行时候,可以通过createLocalEnvironment创建基于本地的执行环境,可以指定并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);
获取ExecutionEnvironment方式(三):
创建远程执行环境。远程环境将程序发送到集群执行。适用于本地直接发送程序到集群上执行测试。
StreamExecutionEnvironment env = StreamExecutionEnvironment .createRemoteEnvironment("bigdata-pro-m07", 8081,"WordCountJava.jar");
(3.2)初始化数据
readTextFile
socketTextStream
通过读取文件并转化为DataStream[String]数据集,这样就完成了从本地文件到分布式数据集的转换,同时在Flink中提供了多种从外部读取数据的连接器,包括批量和实时的数据连接器,能够将Flink系统和其他第三方系统连接,直接获取外部数据。
(3.3)执行转换操作
Flink中的Transformation操作都是通过不同的Operator来实现的,每个Operator内部通过实现Function接口完成数据处理逻辑的定义。在DataStream和DataSet中提供了大量的算子。如Map、FlatMap、Filter、KeyBy等。
Flink中定义Function的计算逻辑可以通过三种方式完成:
方法一:通过创建Class实现Function接口
方法二:通过创建匿名内部类实现Function接口
方法三:通过实现RichFunction
(3.4)分区key指定
分区的目的是将相同key的value放在同一个partition中
根据字段位置指定(Flink1.12不推荐)
官方1.12版本中使用的这种方式:
根据字段名称指定(Flink1.12不推荐)
使用字段名称需要DataStream中的数据结构类型必须是Tuple类或者P0J0类
通过key选择器指定
基于POJO类型通过key选择器指定key分区代码示例:
package com.aikfk.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/5 3:07 下午 */ public class WordCountJava3 { public static void main(String[] args) throws Exception { // 准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<WordCount> dataStream = env.socketTextStream("bigdata-pro-m07",9999) .flatMap(new FlatMapFunction<String, WordCount>() { @Override public void flatMap(String line, Collector<WordCount> collector) throws Exception { String[] words = line.split(" "); for (String word : words){ collector.collect(new WordCount(word,1)); } } }) // 将相同key的value放在同一个partition(按照key选择器指定) .keyBy(new KeySelector<WordCount, Object>() { @Override public Object getKey(WordCount wordCount) throws Exception { return wordCount.word; } }) .reduce(new ReduceFunction<WordCount>() { @Override public WordCount reduce(WordCount t1, WordCount t2) throws Exception { return new WordCount(t1.word , t1.count + t2.count); } }); dataStream.print(); env.execute("Window WordCount"); } /** * POJO类 */ public static class WordCount{ public String word; public int count; public WordCount() { } public WordCount(String word, int count) { this.word = word; this.count = count; } @Override public String toString() { return "WordCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
运行结果:
2> WordCount{word='hive', count=1} 3> WordCount{word='java', count=1} 2> WordCount{word='hive', count=2} 2> WordCount{word='hive', count=3} 3> WordCount{word='java', count=2} 2> WordCount{word='hive', count=4} 15> WordCount{word='hadoop', count=1}
(3.5)输出结果
输出到控制台
输出到文件
输出到外部存储
(3.6)程序触发
应用的执行,需要调用ExecutionEnvironment的Execute()方法来触发应用程序的执行,DataStream流式应用需要显性的指定execute()方法来运行程序,如果不调用程序则不会执行;对于DataSet API输出算子中已经包含了对execute()方法的调用,则不需要显性调用execute()方法;
(4)Flink支持的数据类型
原生数据类型
env.fromElements(3,1,3,4); env.fromElements("spark", "hbase", "java");
- Java Tuples类型
env.fromElements(new Tuple2<String, Integer>("spark", 1), new Tuple2<String, Integer>("java", 1));
- Scala Case Class类型
case class WordCount(word : String, count : Int)
- P0J0类型(具有默认构造函数)
- 特殊数据类型
env.fromElements(Map("name" -> "beo"), Map("name" -> "henry"))