执行环境、数据源(source)、转换操作(transformation)、输出(sink)四大部分
一、执行环境(Execution Environment)
1、创建执行环境
// 批处理环境 ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); // 流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 智能执行环境:
getExecutionEnvironment( )
最简单的方式,就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
这种“智能”的方式不需要我们额外做判断,用起来简单高效,是最常用的一种创建执行环境的方式。
- 本地执行环境:
createLocalEnvironment()
这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数。
LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- 集群执行环境:
createRemoteEnvironment( )
这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定
要在集群中运行的 Jar 包。
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( "host", 1234, "path/to/jarFile.jar" );
2、执行模式(Execution Mode)
处理环境获取
- 流执行模式(STREAMING)
这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 STREAMING 执行模式。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 流执行模式(STREAMING) env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
- 批执行模式(BATCH)
专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架。对于不会持续计算的有界数据,我们用这种模式处理会更方便。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 批执行模式(BATCH env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- 自动模式(AUTOMATIC)
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 自动模式(AUTOMATIC) env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
Batch模式其它配置
- 通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH。
- 通过代码配置
// 获取流式处理环境 StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); // 采用批执行模式 env.setRuntimeMode(RuntimeExecutionMode.BATCH);
3、触发程序执行
需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当 main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”(lazy execution)。
所以我们需要显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。
env.execute();
二、源算子(Source)
一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
Flink 代码中通用的添加 source 的方式,是调用执行环境的 addSource()方法:
DataStream<String> stream = env.addSource(...);
1、数据源类准备
Event类字段设计(抽象化数据源)
字段名 | 数据类型 | 说明 |
user | String | 用户名 |
url | String | 用户访问的url |
timestamp | Long | 用户访问的url的时间戳 |
代码:
public class Event { public String user; public String url; public Long timstamp; public Event(String user, String url, Long timstamp) { this.user = user; this.url = url; this.timstamp = timstamp; } public Event() { } @Override public String toString() { return "Event{" + "user='" + user + '\'' + ", url='" + url + '\'' + ", timstamp=" + timstamp + '}'; } }
2、从集合中读取数据
// 1、从集合中读取数据 StreamExecutionEnvironment env01 = StreamExecutionEnvironment.getExecutionEnvironment(); env01.setParallelism(1); // 创建集合 ArrayList<Event> clicks = new ArrayList<>(); clicks.add(new Event("Mary", "./home", 1000L)); clicks.add(new Event("Bob", "./cart", 2000L)); DataStreamSource<ArrayList<Event>> stream01 = env01.fromElements(clicks); stream01.print(); env01.execute();
直接列举元素
DataStreamSource<Event> steam01 = env01.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L) );
3、从文件中读取数据
从存储介质中读取数据
StreamExecutionEnvironment env02 = StreamExecutionEnvironment.getExecutionEnvironment(); // 2、从文件中读取数据 DataStreamSource<String> stream02 = env02.readTextFile("input/word.txt");
4、从Socket中读取数据
行程流式数据读取,读取 socket 文本流。这种方式由于吞吐量小、稳定性较差,一般也是用于测试。
StreamExecutionEnvironment env03 = StreamExecutionEnvironment.getExecutionEnvironment(); // 3、从Socket中读取数据 DataStreamSource<String> stream03 = env03.socketTextStream("localhost", 7777);
5、从Kafka中读取数据
Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传输方式,恰恰和流处理是完全一致的。所以可以说 Kafka 和 Flink 天生一对,是当前处理流式数据的双子星。在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输,Flink 进行分析计算,这样的架构已经成为众多企业的首选。
略微遗憾的是,与 Kafka 的连接比较复杂,Flink 内部并没有提供预实现的方法。所以我们只能采用通用的 addSource 方式、实现一个 SourceFunction 了。
以Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者 FlinkKafkaConsumer,它就是用来读取 Kafka 数据的SourceFunction。
pom依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
代码
// 4、从Kafka中读取数据 StreamExecutionEnvironment env04 = StreamExecutionEnvironment.getExecutionEnvironment(); env04.setParallelism(1); // Kafka配置信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "hadoop102:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); // 添加数据源(addSource) DataStreamSource<String> stream04 = env04.addSource(new FlinkKafkaConsumer<String>( "clicks", new SimpleStringSchema(), properties )); stream04.print("kafka"); env04.execute();
6、自定义源算子(source)
创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法:run()
和 cancel()
。
run()
方法:使用运行时上下文对象(SourceContext)向下游发送数据;cancel()
方法:通过标识位控制退出循环,来达到中断数据源的效果。
数据源类
import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Calendar; import java.util.Random; public class ClickSource implements SourceFunction<Event> { // 声明一个布尔常量,作为控制数据生成的标识位 private Boolean running = true; @Override public void run(SourceContext<Event> ctx) throws Exception { Random random = new Random(); String[] users = {"Mary", "Alice", "Bob"}; String[] urls = {"/data/en", "/data/en1", "/data/en2"}; while (running) { ctx.collect(new Event( users[random.nextInt(users.length)], users[random.nextInt(urls.length)], Calendar.getInstance().getTimeInMillis() )); // 间隔1s生成一个点击事件,方便观看 Thread.sleep(1000); } } @Override public void cancel() { running = false; } }
执行代码:
// 5、自定义源算子 StreamExecutionEnvironment env05 = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 env05.setParallelism(1); DataStreamSource<Event> stream05 = env05.addSource(new ClickSource()); stream05.print("sourceCustom"); env05.execute();
设置2个并行度
// 设置2个并行度(大于1) StreamExecutionEnvironment env06 = StreamExecutionEnvironment.getExecutionEnvironment(); env06.addSource(new CustomerSource()).setParallelism(2).print(); env06.execute();
三、转换算子(Transformation)
数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream。
我们可以针对一条流进行转换处理,也可以进行分流、合流等多流转换操作,从而组合成复杂的数据流拓扑。
1、基本转换算子
1.1 映射(map)
map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
我们只需要基于 DataStrema 调用 map()方法就可以进行转换处理。方法需要传入的参数是接口 MapFunction 的实现;返回值类型还是 DataStream,不过泛型(流中的元素类型)可能改变。
// 1、创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2、从文件中读取数据:按行读取(存储的元素就是每行的文本) env.setParallelism(1); DataStreamSource<Event> stream = env.fromElements(); // -Map-1 传入匿名类,实现MapFunction stream.map(new MapFunction<Event, String>() { @Override public String map(Event e) throws Exception { return e.user; } }); // -Map-2 传入MapFunction的实现类 stream.map(new UserExtractor()).print();
1.2 过滤(filter)
filter 转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉。
进行 filter 转换之后的新数据流的数据类型与原数据流是相同的。filter 转换需要传入的参数需要实现 FilterFunction 接口,而FilterFunction 内要实现 filter()方法,就相当于一个返回布尔类型的条件表达式。
// -Filter-1 传入匿名类实现FilterFunction接口 stream.filter(new FilterFunction<Event>() { @Override public boolean filter(Event e) throws Exception { return e.user.equals("Mary"); } }); // -Filter-2 传入FilterFunction实现类 stream.filter(new UserFilter()).print(); env.execute();
1.3 扁平映射(flatMap)
flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
同 map 一样,flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。
flatMap 操作会应用在每一个输入事件上面,FlatMapFunction 接口中定义了 flatMap 方法,用户可以重写这个方法,在这个方法中对输入数据进行处理,并决定是返回 0 个、1 个或多个结果数据。因此 flatMap 并没有直接定义返回值类型,而是通过一个“收集器”(Collector)来指定输出。希望输出结果时,只要调用收集器的.collect()方法就可以了;这个方法可以多次调用,也可以不调用。所以flatMap 方法也可以实现 map 方法和 filter 方法的功能,当返回结果是 0 个的时候,就相当于对数据进行了过滤,当返回结果是 1 个的时候,相当于对数据进行了简单的转换操作。
// 1、创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2、从文件中读取数据:按行读取(存储的元素就是每行的文本) env.setParallelism(1); // -FlatMap-1 传入FlatMapFunction实现类 stream.flatMap(new MyFlatMap()).print(); env.execute();
实现类(MyFlatMap)
class MyFlatMap implements FlatMapFunction<Event, String> { @Override public void flatMap(Event value, Collector<String> out) throws Exception { if (value.user.equals("Mary")) { out.collect(value.user); } else if (value.user.equals("Bob")) { out.collect(value.user); out.collect(value.url); } } }
2、聚合算子(Aggregation)
要对每个词出现的频次进行叠加统计,这种操作不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并这就是所谓的“聚合”(Aggregation),也对应着 MapReduce 中的 reduce 操作。
2.1 按键分区(keyBy)
对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 keyBy 来完成的。
keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。
基于不同的 key,流中的数据将被分配到不同的分区中去,所有相同的key都会聚集到同一个分区中。
在内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 hashCode()方法。
keyBy()方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来指定 key:比如对于 Tuple 数据类型,可以指定字段的位置或者多个位置的组合;对于 POJO 类型,可以指定字段的名称(String);另外,还可以传入 Lambda 表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取 key 的逻辑。
// 1、创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2、从文件中读取数据:按行读取(存储的元素就是每行的文本) env.setParallelism(1); DataStreamSource<Event> stream = env.fromElements(); // - 1= 按键分区-keyBy-1:Lamda表示 KeyedStream<Event, String> keyenStream1 = stream.keyBy(e -> e.user); // - 按键分区-keyBy-2:使用匿名类实现KeySelector KeyedStream<Event, String> keyedStream2 = stream.keyBy(new KeySelector<Event, String>() { @Override public String getKey(Event e) throws Exception { return e.user; } });
需要注意的是:keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为KeyedStream。KeyedStream 可以认为是“分区流”或者“键控流”,它是对 DataStream 按照key 的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定 key 的类型。
2.2 简单聚合(keyBy)
有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们内置实现了一些最基本、最简单的聚合 API,主要有以下几种:
sum()
:在输入流上,对指定的字段做叠加求和的操作。min()
:在输入流上,对指定的字段求最小值。max()
:在输入流上,对指定的字段求最大值。minBy()
:与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包含字段最小值的整条数据。maxBy()
:与 max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。
简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。
对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以 f0、f1、f2、…来命名的。
DataStreamSource<Tuple2<String, Integer>> stream2 = env.fromElements( Tuple2.of("a", 1), Tuple2.of("b", 2) ); // -2=简单聚合:集合 stream2.keyBy(r -> r.f0).sum(1).print(); stream2.keyBy(r -> r.f0).sum("f1").print(); stream2.keyBy(r -> r.f0).max(1).print(); stream2.keyBy(r -> r.f0).max("f1").print(); stream2.keyBy(r -> r.f0).min(1).print(); stream2.keyBy(r -> r.f0).min("f1").print(); stream2.keyBy(r -> r.f0).maxBy(1).print(); stream2.keyBy(r -> r.f0).maxBy("f1").print(); stream2.keyBy(r -> r.f0).minBy(1).print(); stream2.keyBy(r -> r.f0).minBy("f1").print(); // -2=简单聚合:POJO类 DataStreamSource<Event> stream3 = env.fromElements( new Event("Mary", "./home", 100L), new Event("Mary", "./home", 100L) ); stream3.keyBy(s -> s.user).max("timstamp");
Event类:
public class Event { public String user; public String url; public Long timstamp; }
2.3 归约聚合(reduce)
它可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。
与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
案例:
我们将数据流按照用户 id 进行分区,然后用一个 reduce 算子实现 sum 的功能,统计每个用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce 算子实现 maxBy 的功能,记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。
// -3规约聚合(reduce) SingleOutputStreamOperator<Tuple2<String, Long>> stream03 = env.addSource(new MyClickSource()) .map(new MapFunction<Event, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(Event value) throws Exception { return Tuple2.of(value.user, 1L); } }); // 使用用户名,进行分流 stream03.keyBy(s -> s.f0) .reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception { // 每到1条数据,用户PV的统计值+1 return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }) .map(s->{ Tuple2<String, Long> s1 = s;}) // 为每一条数据分配同一个key,将聚合结果发送到1条数据流中 .keyBy(r -> true) .reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception { // 将累加器更新为当前最大的PV统计值,然后向下游发送累加器的值 return value1.f1 > value2.f1 ? value1 : value2; } }) .print();
3、用户自定义函数(UDF)
概念:可以通过自定义函数类或者匿名类来实现接口,也可以直接传入 Lambda 表达式。这就是所谓的用户自定义函数(user-defined function,UDF)
3.1 函数类(Function Classes)
对于大部分操作而言,都需要传入一个用户自定义函数(UDF),实现相关操作的接口,来完成处理逻辑的定义。Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,最简单直接的方式,就是自定义一个函数类,实现对应的接口。
例如: MapFunction、FilterFunction、ReduceFunction 等。
实现了 FilterFunction 接口,用来筛选 url 中包含“home”的事件:
// 0、创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // -1 函数类型 DataStreamSource<Event> stream01 = env.fromElements( new Event("Mary", "/hom", 100L), new Event("Mary", "/hom", 100L) ); stream01.flatMap(new MyFlatMap2()).print();
自定义筛选类:
class MyFlatMap2 implements FlatMapFunction<Event, String> { @Override public void flatMap(Event value, Collector<String> out) throws Exception { if (value.user.equals("Mary")) { out.collect(value.user); } else if (value.user.equals("Bob")) { out.collect(value.user); out.collect(value.url); } } }
匿名函数实现
// -1 函数类型 -匿名函数 SingleOutputStreamOperator<Event> data = stream01.filter(new FilterFunction<Event>() { @Override public boolean filter(Event value) throws Exception { return value.url.contains("home"); } }); data.print();
3.2 Lamdba表达式(匿名函数)
Flink 的所有算子都可以使用 Lambda 表达式的方式来进行编码,但是,当 Lambda 表达式使用 Java 的泛型时,我们需要显式的声明类型信息。
下例演示了如何使用 Lambda 表达式来实现一个简单的 map() 函数,我们使用 Lambda 表达式来计算输入的平方。在这里,我们不需要声明 map() 函数的输入 i 和输出参数的数据类型,因为 Java 编译器会对它们做出类型推断。
// -2 lambda表达式 DataStreamSource<Event> stream2 = env.fromElements( new Event("Mary", "/hom", 100L), new Event("Mary", "/hom", 100L) ); stream2.map(s -> s.url).print();
指定返回类型为String
// 指定返回值类型(string) DataStream<String> returns = stream2.flatMap((Event a, Collector<String> out) -> { out.collect(a.url); }).returns(Types.STRING);
3.3 复函数类(Rich Function Classes)
“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其Rich 版本。富函数类一般是以抽象类的形式出现的。富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等。
Rich Function 有生命周期的概念。典型的生命周期方法有:
- open()方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map()或者 filter()方法被调用之前,open()会首先被调用。所以像文件 IO 的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在 open()方法中完成。
- close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一些清理工作。
// 复函数类(Rich Function Classes) StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment(); env2.setParallelism(2); DataStreamSource<Event> stream3 = env2.fromElements( new Event("Mary", "/hom", 1000L), new Event("Mary", "/hom", 2000L), new Event("Mary", "/hom", 3000L), new Event("Mary", "/hom", 4000L) ); // 将点击事件转换成长整型的时间戳输出 stream3.map(new RichMapFunction<Event, Long> () { @Override public void open(Configuration parameters) throws Exception { // 初始化一些工作(建立MySQL连接) super.open(parameters); System.out.println("索引为:" + getRuntimeContext().getIndexOfThisSubtask()+"的任务开始"); } @Override public Long map(Event value) throws Exception { // 对数据库进行读/写 return value.timstamp; } @Override public void close() throws Exception { // 清理工作(关闭MySQL连接) super.close(); System.out.println("索引为:" + getRuntimeContext().getIndexOfThisSubtask()+"执行任务结束"); } }).print();
输出结果:
输出结果: 索引为:0 的任务开始 索引为:1 的任务开始 1> 1000 2> 2000 2> 4000 1> 3000 索引为:0 执行任务结束 索引为:1 执行任务结束
常用的操作步骤:
public class MyFlatMap extends RichMapFunction<IN, OUT> { @Override public void open(Configuration configuration) { // 做一些初始化工作 // 例如建立一个和 MySQL 的连接 } @Override public void flatMap(IN in, Collector<OUT out) { // 对数据库进行读写 } @Override public void close() { // 清理工作,关闭和 MySQL 数据库的连接。 } }
4、物理分区(Physical Partitioning)
keyBy()
:按照键的哈希值来进行重新分区的操作。只不过这种分区操作只能保证把数据按key“分开”,至于分得均不均匀、每个 key 的数据具体会分到哪一区去,这些是完全无从控制的——所以我们有时也说,keyBy 是一种逻辑分区(logical partitioning)操作。
物理分区:物理分区与 keyBy 另一大区别在于,keyBy 之后得到的是一个 KeyedStream,而物理分区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出,分区算子并不对数据进行转换处理,只是定义了数据的传输方式。
常见的物理分区策略:
- 随机分配(Random)
- 轮询分配(Round-Robin)
- 重缩放(Rescale)
- 广播(Broadcast)
4.1 随机分区(shuffle)
随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区,因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。
经过随机分区之后,得到的依然是一个 DataStream。
案例:将数据读入之后直接打印到控制台,将输出的并行度设置为 4,中间经历一次 shuffle。执行多次,观察结果是否相同。
// 1、创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // - 1、随机分区(.shuffle()) env.setParallelism(1);//并行度为1 DataStreamSource<Event> stream01 = env.addSource(new MyClickSource()); // 经洗牌后打印输出,并行度为4(分4个线程进行数据打印-处理) stream01.shuffle().print("shuffle").setParallelism(4); env.execute();
执行结果:
shuffle:1> Event{user='Bob', url='./cart', timestamp=...} shuffle:4> Event{user='Cary', url='./home', timestamp=...} shuffle:3> Event{user='Alice', url='./fav', timestamp=...} shuffle:4> Event{user='Cary', url='./cart', timestamp=...} shuffle:3> Event{user='Cary', url='./fav', timestamp=...} shuffle:1> Event{user='Cary', url='./home', timestamp=...} shuffle:2> Event{user='Mary', url='./home', timestamp=...} shuffle:1> Event{user='Bob', url='./fav', timestamp=...} shuffle:2> Event{user='Mary', url='./home', timestamp=...}
4.2 轮询分区(Round-Robin)
轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,通过调用 DataStream 的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。
注:Round-Robin 算法用在了很多地方,例如 Kafka 和 Nginx。
// - 2、轮询分区(.rebalance()) env.setParallelism(1); // 读取数据源,并行度为1 DataStreamSource<Event> stream02 = env.addSource(new MyClickSource()); // 经轮询重分组后打印输出,并行度为4 stream02.rebalance().print("rebalance").setParallelism(4); env.execute();
结果:数据被平均分配到所有并行任务中去了。
rebalance:2> Event{user='Cary', url='./fav', timestamp=...} rebalance:3> Event{user='Mary', url='./cart', timestamp=...} rebalance:4> Event{user='Mary', url='./fav', timestamp=...} rebalance:1> Event{user='Cary', url='./home', timestamp=...} rebalance:2> Event{user='Cary', url='./cart', timestamp=...} rebalance:3> Event{user='Alice', url='./prod?id=1', timestamp=...} rebalance:4> Event{user='Cary', url='./prod?id=2', timestamp=...} rebalance:1> Event{user='Bob', url='./prod?id=2', timestamp=...} rebalance:2> Event{user='Alice', url='./prod?id=1', timestamp=...}
4.3 重缩放分区(rescale)
重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,也就是说,“发牌人”如果有多个,那么 rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。
底层原理:
从底层实现上看,rebalance 和 rescale 的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源。
// - 3、重缩放分区(.rescale()) env.setParallelism(1); // DataStreamSource<Integer> stream03 = env.addSource(new RichParallelSourceFunction<Integer>() { @Override public void run(SourceContext<Integer> ctx) throws Exception { for (int i = 0; i < 8; i++) { // 将奇数发送到索引为1的并行子任务上 // 将偶数发送到索引为0的并行子任务上 // 这里使用了并行数据源的富函数版本 // 这样可以调用 getRuntimeContext 方法来获取运行时上下文的一些信息 if ((i + 1) % 2 == getRuntimeContext().getIndexOfThisSubtask()) { ctx.collect(i + 1); } } } @Override public void cancel() { } }).setParallelism(2); // 执行重缩放分区 stream03.rescale().print().setParallelism(4); env.execute();
执行结果:
4> 3 3> 1 1> 2 1> 6 3> 5 4> 7 2> 4 2> 8
4.4 广播(broadcast)
这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。
// - 4、广播() env.setParallelism(1); // 读取数据源,并行度为1 DataStreamSource<Event> stream04 = env.addSource(new MyClickSource()); // 经广播后打印输出,并行度为4 stream04.broadcast().print("broadcast").setParallelism(4); env.execute();
输出结果:(数据被复制然后广播到了下游的所有并行任务中去了)
broadcast:3> Event{user='Mary', url='./cart', timestamp=...} broadcast:1> Event{user='Mary', url='./cart', timestamp=...} broadcast:4> Event{user='Mary', url='./cart', timestamp=...} broadcast:2> Event{user='Mary', url='./cart', timestamp=...} broadcast:2> Event{user='Alice', url='./fav', timestamp=...} broadcast:1> Event{user='Alice', url='./fav', timestamp=...} broadcast:3> Event{user='Alice', url='./fav', timestamp=...} broadcast:4> Event{user='Alice', url='./fav', timestamp=...}
4.5 全局分区(global)
全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。
// - 5、全局分区 stream04.global().print();
4.6 自定义分区(Custom)
当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 , 我 们 可 以 通 过 使 用partitionCustom()方法来自定义分区策略。
在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector。
案例:我们可以对一组自然数按照奇偶性进行重分区。
// 将自然数按奇偶进行分区 DataStreamSource<Integer> stream06 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8); stream06.partitionCustom(new Partitioner<Integer>() { // 自定义分区规则 @Override public int partition(Integer key, int numPartitions) { return key % 2; } }, new KeySelector<Integer, Integer>() { @Override public Integer getKey(Integer value) throws Exception { return value; } }) .print().setParallelism(2); env.execute();
四、输出算子(Sink)
Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持,本节将主要讲解 Flink 中的 Sink 操作。我们已经了解了 Flink 程序如何对数据进行读取、转换等操作,最后一步当然就应该将结果数据保存或输出到外部系统了。
1、连接到外部系统
为了避免这样的问题,Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 算 子完成的。
Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。不论怎样理解,Sink 在 Flink 中代表了将结果数据收集起来、输出到外部的意思,所以我们这里统一把它直观地叫作“输出算子”。
之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。查看源码可以发现,print 方法返回的就是一个 DataStreamSink。
@PublicEvolving public DataStreamSink<T> print() { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(); return addSink(printFunction).name("Print to Std. Out"); }
与 Source 算子非常类似,除去一些 Flink 预实现的 Sink,一般情况下 Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
Flink 官方目前支持的第三方系统连接器:
2、输出到文件
Flink 为此专门提供了一个流式文件系统的连接器:StreamingFileSink,它继承自抽象类RichSinkFunction,而且集成了 Flink 的检查点(checkpoint)机制,用来保证精确一次(exactly once)的一致性语义。
StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统。它可以保证精确一次的状态一致性,大大改进了之前流式文件 Sink 的方式。它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据。
StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink 的静态方法:
- 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。
- 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。
在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。
案例:测试数据直接写入文件:
// -1、输出到文件 // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // 数据源 DataStreamSource<Event> stream01 = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Alice", "./home", 2000L), new Event("Bob", "./home", 3000L), new Event("Mary", "./prod", 4000L) ); // 写出到文件相关配置 StreamingFileSink<String> fileSink = StreamingFileSink .<String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() // 至少包含15分钟数据 .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) // 最近5分钟没有收到新的数据 .withInactivityInterval(TimeUnit.MILLISECONDS.toMillis(5)) // 文件大小已达到1GB。 .withMaxPartSize(1024 * 1024 * 1024) .build()) .build(); // 将event转为String写入文件 stream01.map(Event::toString).addSink(fileSink);
这里我们创建了一个简单的文件 Sink,通过.withRollingPolicy()方法指定了一个“滚动策
略”。“滚动”的概念在日志文件的写入中经常遇到:因为文件会有内容持续不断地写入,所以
我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面
的代码设置了在以下 3 种情况下,我们就会滚动分区文件:
- 至少包含 15 分钟的数据。
- 最近 5 分钟没有收到新的数据。
- 文件大小已达到 1 GB。
3、连接到Kafka
Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据,所以跟Flink“天生一对”,经常会作为 Flink 的输入数据源和输出系统。Flink 官方为 Kafka 提供了 Source和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数据。
Flink 从 Kakfa 的一个 topic 读取消费数据,然后进行处理转换,最终将结果数据写入 Kafka 的另一个 topic——数据从 Kafka 流入、经Flink处理后又流回到 Kafka 去,这就是所谓的“数据管道”应用。
真正让它们密不可分的是,Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。
案例:将用户行为数据保存为文件 clicks.csv,读取后不做转换直接写入 Kafka,主题(topic)命名为clicks
。
// -2、输出到Kafka env.setParallelism(1); // 加载数据文件 DataStreamSource<String> stream02 = env.readTextFile("input/clkicks.csv"); // 加载Kafka配置 Properties properties = new Properties(); properties.put("bootstrap.servers", "hadoop102:9092"); // 执行输出源 stream02.addSink(new FlinkKafkaProducer<String>( "clicks", new SimpleStringSchema(), properties )); env.execute();
可以看到,addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解,因为需要向 Kafka 写入数据,自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提交提供了 Flink 向 Kafka 写入数据的事务性保证,能够真正做到精确一次(exactly once)的状态一致性。
4、输出到Redis
Flink 没有直接提供官方的 Redis 连接器,不过 Bahir 项目还是担任了合格的辅助角色,为我们提供了 Flink-Redis 的连接工具。但版本升级略显滞后,目前连接器版本为 1.0,支持的Scala 版本最新到 2.11。
导入的 Redis 连接器依赖
<!-- Redis--> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
连接器为我们提供了一个 RedisSink,它继承了抽象类 RichSinkFunction,这就是已经实现好的向 Redis 写入数据的 SinkFunction。我们可以直接将 Event 数据输出到 Redis:
// -3、输出到Redis env.setParallelism(1); // 获取数据 DataStreamSource<Event> stream03 = env.addSource(new MyClickSource()); // 配置Redis连接信息 FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").build(); stream03.addSink(new RedisSink<>(conf, new MyRedisMapper())); env.execute();
是定义一个 Redis 的映射类,实现 RedisMapper 接口。
class MyRedisMapper implements RedisMapper<Event> { @Override public RedisCommandDescription getCommandDescription() { // RedisCommand.HSET:存储类型为Hash类型 return new RedisCommandDescription(RedisCommand.HSET, "clicks"); } @Override public String getKeyFromData(Event event) { return event.user; } @Override public String getValueFromData(Event event) { return event.url; } }
这里 RedisSink 的构造方法需要传入两个参数:
- JFlinkJedisConfigBase:Jedis 的连接配置。
- RedisMapper:Redis 映射类接口,说明怎样将数据转换成可以写入 Redis 的类型。
5、输出到ES
ElasticSearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据。ElasticSearch有着简洁的 REST 风格的 API,以良好的分布式特性、速度和可扩展性而闻名,在大数据领域应用非常广泛。
Flink 为 ElasticSearch 专门提供了官方的 Sink 连接器。
pom依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
输出到ES中
// -4、输出到ES DataStreamSource<Event> stream04 = env.fromElements( new Event("Bob", "./cart", 100L), new Event("Alice", "./home", 200L), new Event("Bob", "./cart", 300L), new Event("Alice", "./home", 400L) ); ArrayList<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("hadopp102", 9200, "http")); // 创建一个ES的Function ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() { @Override public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { HashMap<String, String> data = new HashMap<>(); data.put(event.user, event.url); IndexRequest request = Requests.indexRequest().index("clicks") .type("type") .source(data); requestIndexer.add(request); } }; // 绑定输出数据源(ES) stream04.addSink(new ElasticsearchSink.Builder<Event>(httpHosts, elasticsearchSinkFunction).build()); env.execute();
与 RedisSink 类 似 , 连 接 器 也 为 我 们 实 现 了 写 入 到 Elasticsearch 的SinkFunction——ElasticsearchSink。区别在于,这个类的构造方法是私有(private)的,我们需要使用 ElasticsearchSink 的 Builder 内部静态类,调用它的 build()方法才能创建出真正的SinkFunction。
而 Builder 的构造方法中又有两个参数:
- httpHosts:连接到的 Elasticsearch 集群主机列表
- elasticsearchSinkFunction:这并不是我们所说的 SinkFunction,而是用来说明具体处理逻辑、准备数据向 Elasticsearch 发送请求的函数。
具体的操作需要重写中 elasticsearchSinkFunction 中的 process 方法,我们可以将要发送的数据放在一个 HashMap 中,包装成 IndexRequest 向外部发送 HTTP 请求。
6、输出到 MySQL(JDBC)
关系型数据库有着非常好的结构化数据设计、方便的 SQL 查询,是很多企业中业务数据存储的主要形式。MySQL 就是其中的典型代表。
<!-- Mysql--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency>
代码:
// -5、输出到MySql env.setParallelism(1); // 数据源 DataStreamSource<Event> stream05 = env.fromElements( new Event("Mary", "./home", 100L), new Event("Alice", "./var", 200L) ); // sql/statementBuilder/executionOptions/connectionOptions stream05.addSink( JdbcSink.sink("INSERT INTO clicks (user, url) VALUES (?, ?)", (statement, r) -> { statement.setString(1, r.user); statement.setString(2, r.url); }, JdbcExecutionOptions.builder(). withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("com.mysql.cj.jdbc.Driver") .withUsername("username") .withPassword("passwod") .build() ) ); env.execute();
7、自定义 Sink 输出
如果我们想将数据存储到我们自己的存储设备中,而 Flink 并没有提供可以直接使用的连接器,又该怎么办呢?
与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction抽象类,只要实现它,通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外部存储。之前与外部系统的连接,其实都是连接器帮我们实现了 SinkFunction。
我们这里使用了 SinkFunction 的富函数版本,因为这里我们又使用到了生命周期的概念,创建 HBase 的连接以及关闭 HBase 的连接需要分别放在 open()方法和 close()方法中。
open()
:开启链接close()
:关闭连接invoke()
:执行逻辑
pom依赖
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency>
代码:
// -6、自定义sink输出 env.setParallelism(1); // 绑定数据源 DataStreamSource<String> stream06 = env.fromElements("hello", "word"); // 添加自定义输出 stream06.addSink( new RichSinkFunction<String>() { // 管理Hbase的配置信息,这里因为是Configuration的重名问题,将类以完成路径导入 public org.apache.hadoop.conf.Configuration configuration; // 管理Hbase连接 public Connection connection; // 打开连接 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "hadoop102:2181"); connection = ConnectionFactory.createConnection(configuration); } // 执行的步骤 @Override public void invoke(String value, Context context) throws Exception { 表名为test Table table = connection.getTable(TableName.valueOf("test")); // 指定rowKey Put put = new Put("row".getBytes(StandardCharsets.UTF_8)); put.addColumn("info".getBytes(StandardCharsets.UTF_8), 写入的数据 value.getBytes(StandardCharsets.UTF_8), 写入的数据 "1".getBytes(StandardCharsets.UTF_8)); 执行put操作 table.put(put); 将表关闭 table.close(); invoke(value); } // 关闭连接操作 @Override public void close() throws Exception { super.close(); // 关闭连接 connection.close(); } } ); env.execute();