一个常见的应用场景就是,如果我们希望连接到一个外部数据库进行读写操作,那么将连接操作放在map()中显然不是个好选择——因为每来一条数据就会重新连接一次数据库;所以我们可以在open()中建立连接,在map()中读写数据,而在close()中关闭连接。
publicclass MyFlatMap extends RichFlatMapFunction<IN, OUT>> { @Override public void open(Configuration configuration) { // 做一些初始化工作 // 例如建立一个和MySQL的连接 } @Override public void flatMap(IN in, Collector<OUT out) { // 对数据库进行读写 } @Override public void close() { // 清理工作,关闭和MySQL数据库的连接。 } }
9.3.4 物理分区算子(Physical Partitioning)
常见的物理分区策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)。
- 随机分区(shuffle)
最简单的重分区方式就是直接“洗牌”。通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。
随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区,如图5-9所示。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。
经过随机分区之后,得到的依然是一个DataStream。
我们可以做个简单测试:将数据读入之后直接打印到控制台,将输出的并行度设置为2,中间经历一次shuffle。执行多次,观察结果是否相同。
publicclass ShuffleExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> stream = env.fromElements(1, 2, 3, 4).setParallelism(1); stream.shuffle().print().setParallelism(2); env.execute(); } }
- 轮询分区(Round-Robin)
轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,如图5-10所示。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。
- 重缩放分区(rescale)
重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,如图5-11所示。也就是说,“发牌人”如果有多个,那么rebalance的方式是每个发牌人都面向所有人发牌;而rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。
从底层实现上看,rebalance和rescale的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而rescale仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源。
- 广播(broadcast)
这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。
- 全局分区(global)
全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的。
- 自定义分区(Custom)
当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。
9.4 输出算子(Sink)
“Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。
”
9.4.1 连接到外部系统
Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
与Source算子非常类似,除去一些Flink预实现的Sink,一般情况下Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(newSinkFunction(…));
addSource的参数需要实现一个SourceFunction接口;类似地,addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
当然,SinkFuntion多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如图5-13所示,列出了Flink官方目前支持的第三方系统连接器:
我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、文件系统(FileSystem)、JDBC等数据存储系统,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir作为给Spark和Flink提供扩展支持的项目,也实现了一些其他第三方系统与Flink的连接器,如图所示。
除此以外,就需要用户自定义实现sink连接器了。
9.4.2 输出到文件
Flink专门提供了一个流式文件系统的连接器:StreamingFileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
StreamingFileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink的静态方法:
行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。
下面我们就以行编码为例,将一些测试数据直接写入文件:
publicclass SinkFile { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<String> input = env.fromElements("hello world", "hello flink"); final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path("./output"), new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024 * 1024) .build()) .build(); input.addSink(sink); env.execute(); } }
这里我们创建了一个简单的文件Sink,通过.withRollingPolicy()方法指定了一个“滚动策略”。因为文件会有内容持续不断地写入,所以我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面的代码设置了在以下3种情况下,我们就会滚动分区文件:
至少包含15分钟的数据 最近5分钟没有收到新的数据 文件大小已达到1 GB
9.4.3 输出到Kafka
- 添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
- 启动Kafka集群
- 编写输出到Kafka的示例代码
我们可以直接将用户行为数据保存为文件clicks.csv,读取后不做转换直接写入Kafka,主题(topic)命名为“clicks”。
publicclass SinkKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.put("bootstrap.servers", "hadoop102:9092"); DataStreamSource<String> stream = env.readTextFile("input/clicks.csv"); stream .addSink(new FlinkKafkaProducer<String>( "clicks", new SimpleStringSchema(), properties )); env.execute(); } }
- 运行代码,在Linux主机启动一个消费者, 查看是否收到数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic clicks
9.4.4 输出到Redis
Flink没有直接提供官方的Redis连接器,不过Bahir项目还是担任了合格的辅助角色,为我们提供了Flink-Redis的连接工具。但版本升级略显滞后,目前连接器版本为1.0,支持的Scala版本最新到2.11。由于我们的测试不涉及到Scala的相关版本变化,所以并不影响使用。在实际项目应用中,应该以匹配的组件版本运行。
具体测试步骤如下:
- 导入的Redis连接器依赖
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
- 启动Redis集群
这里我们为方便测试,只启动了单节点Redis。
- 编写输出到Redis的示例代码
连接器为我们提供了一个RedisSink,它继承了抽象类RichSinkFunction,这就是已经实现好的向Redis写入数据的SinkFunction。我们可以直接将Event数据输出到Redis:
publicclass SinkRedis { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop200").build(); env.addSource(new ClickSource()) .addSink(new RedisSink<Event>(conf, new MyRedisMapper())); env.execute(); } }
这里RedisSink的构造方法需要传入两个参数:
JFlinkJedisConfigBase:Jedis的连接配置 RedisMapper:Redis映射类接口,说明怎样将数据转换成可以写入Redis的类型
接下来主要就是定义一个Redis的映射类,实现RedisMapper接口。
publicstaticclass MyRedisMapper implements RedisMapper<Event> { @Override public String getKeyFromData(Event e) { return e.user; } @Override public String getValueFromData(Event e) { return e.url; } @Override public RedisCommandDescription getCommandDescription() { returnnew RedisCommandDescription(RedisCommand.HSET, "clicks"); } }
在这里我们可以看到,保存到Redis时调用的命令是HSET,所以是保存为哈希表(hash),表名为“clicks”;保存的数据以user为key,以url为value,每来一条数据就会做一次转换。
- 运行代码,Redis查看是否收到数据。
9.4.5 输出到Elasticsearch
Flink为ElasticSearch专门提供了官方的Sink 连接器。
写入数据的ElasticSearch的测试步骤如下。
- 添加Elasticsearch 连接器依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
- 启动Elasticsearch集群
- 编写输出到Elasticsearch的示例代码
publicclass SinkToEs { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); ArrayList<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("hadoop102", 9200, "http")); ElasticsearchSink.Builder<Event> esBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction<Event>() { @Override public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { HashMap<String, String> data = new HashMap<>(); data.put(event.user, event.url); IndexRequest indexRequest = Requests .indexRequest() .index("clicks") .type("type") .source(data); requestIndexer.add(indexRequest); } } ); DataStream<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L)); stream.addSink(esBuilder.build()); env.execute(); } }
与RedisSink类似,连接器也为我们实现了写入到Elasticsearch的SinkFunction——ElasticsearchSink。区别在于,这个类的构造方法是私有(private)的,我们需要使用ElasticsearchSink的Builder内部静态类,调用它的build()方法才能创建出真正的SinkFunction。
而Builder的构造方法中又有两个参数:
httpHosts:连接到的Elasticsearch集群主机列表 elasticsearchSinkFunction:这并不是我们所说的SinkFunction,而是用来说明具体处理逻辑、准备数据向Elasticsearch发送请求的函数
具体的操作需要重写中elasticsearchSinkFunction中的process方法,我们可以将要发送的数据放在一个HashMap中,包装成IndexRequest向外部发送HTTP请求。
- 运行代码,访问Elasticsearch查看是否收到数据。
9.4.6 输出到MySQL(JDBC)
写入数据的MySQL的测试步骤如下。
- 添加依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency>
- 启动MySQL,在test库下建表clicks
mysql> createtable clicks( -> uservarchar(20) notnull, -> urlvarchar(100) notnull);
- 编写输出到MySQL的示例代码
publicclass SinkToMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env .fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L) ) .addSink( JdbcSink.sink( "INSERT INTO clicks (user, url) VALUES (?, ?)", (statement, r) -> { statement.setString(1, r.user); statement.setString(2, r.url); }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://localhost:3306/test") // 使用MySQL 5.7的话,没有cj .withDriverName("com.mysql.cj.jdbc.Driver") .withUsername("username") .withPassword("password") .build() ) ); env.execute(); } }
- 运行代码,用客户端连接MySQL,查看是否成功写入数据。
9.4.7 自定义Sink输出
如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction<String>());
在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。
十、Flink中的时间和窗口
在流数据处理应用中,一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来我们就深入了解一下Flink中的时间语义和窗口的应用。
10.1 时间语义
10.1.1 Flink中的时间语义
如图所示,在事件发生之后,生成的数据被收集起来,首先进入分布式消息队列,然后被Flink系统中的Source算子读取消费,进而向下游的转换算子(窗口算子)传递,最终由窗口算子进行计算处理。
很明显,这里有两个非常重要的时间点:一个是数据产生的时间,我们把它叫作“事件时间”(Event Time);另一个是数据真正被处理的时刻,叫作“处理时间”(Processing Time)。我们所定义的窗口操作,到底是以那种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有所滞后。
- 处理时间(Processing Time)
“处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。处理时间是最简单的时间语义。
”
- 事件时间(Event Time)
事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。 数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实就是这条数据记录的“时间戳”(Timestamp)。 在事件时间语义下,我们对于时间的衡量,就不看任何机器的系统时间了,而是依赖于数据本身。由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在Flink中把它叫作事件时间的“水位线”(Watermarks)。
10.1.2 哪种时间语义更重要
- 从《星球大战》说起为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例子:电影《星球大战》。
如图所示,我们会发现,看电影其实就是处理影片中数据的过程,所以影片的上映时间就相当于“处理时间”;而影片的数据就是所描述的故事,它所发生的背景时间就相当于“事件时间”。两种时间语义都有各自的用途,适用于不同的场景。
- 数据处理系统中的时间语义
在计算机系统中,考虑数据处理的“时代变化”是没什么意义的,我们更关心的,显然是数据本身产生的时间。
所以在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。
在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从1.12版本开始,Flink已经将事件时间作为了默认的时间语义。
10.2 水位线(Watermark)
10.2.1 事件时间和窗口
在窗口的处理过程中,我们可以基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。
这样的好处在于,计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计处理,得到的结果都是正确的。而一般实时流处理的场景中,事件时间可以基本与处理时间保持同步,只是略微有一点延迟,同时保证了窗口计算的正确性。
10.2.2 什么是水位线
在Flink中,用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
- 有序流中的水位线
在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;也就是说,它们处理的过程会保持原先的顺序不变,遵守先来后到的原则。这样的话我们从每个数据中提取时间戳,就可以保证总是从小到大增长的,从而插入的水位线也会不断增长、事件时钟不断向前推进。
实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同,同时涌来的数据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳,如图6-6所示。所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。
- 乱序流中的水位线
在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是所谓的“乱序数据”。
这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产生时间而言的。如图6-7所示,一个7秒时产生的数据,生成时间自然要比9秒的数据早;但是经过数据缓存和传输之后,处理任务可能先收到了9秒的数据,之后7秒的数据才姗姗来迟。这时如果我们希望插入水位线,来指示当前的事件时间进展,又该怎么做呢?
解决思路也很简单:我们还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。不过现在的情况是数据乱序,所以插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线,如图6-8所示。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。
如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线,如下图所示。
但是这样做会带来一个非常大的问题:我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,我们也可以等上一段时间,比如2秒;也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳,如下图所示。这样的话,9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,这时迟到数据也都已收集齐,0~9秒的窗口就可以正确计算结果了。
- 水位线的特性
现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。
总结一下水位线的特性:
水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展 水位线是基于数据的时间戳生成的 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进 水位线可以通过设置延迟,来保证正确处理乱序数据 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t, 这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’ ≤ t的数据 水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。
10.2.3 如何生成水位线
水位线是用来保证窗口处理结果的正确性的,如果不能正确处理所有乱序数据,可以尝试调大延迟的时间。
- 生成水位线的总体原则
完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望计算结果能更加准确,那可以将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了,我们可能为极少数的迟到数据增加了很多不必要的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。
- 水位线生成策略(Watermark Strategies)
在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( WatermarkStrategy<T> watermarkStrategy)
具体使用时,直接用DataStream调用该方法即可,与普通的transform方法完全一样。
DataStream<Event> stream = env.addSource(new ClickSource()); DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(*<watermark strategy>*);
.assignTimestampsAndWatermarks()方法需要传入一个WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy中包含了一个“时间戳分配器” TimestampAssigner和一个“水位线生成器” WatermarkGenerator。
publicinterface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{ @Override TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context); @Override WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); }
TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。 WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。 在WatermarkGenerator接口中,主要又有两个方法:onEvent()和onPeriodicEmit()。 onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作 onPeriodicEmit:周期性调用的方法,可以由WatermarkOutput发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。 env.getConfig().setAutoWatermarkInterval(60 * 1000L);
- Flink内置水位线生成器
WatermarkStrategy这个接口是一个生成水位线策略的抽象,而Flink提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,而且也为我们自定义水位线策略提供了模板。
这两个生成器可以通过调用WatermarkStrategy的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。
- 有序流
对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。
stream.assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) );
上面代码中我们调用.withTimestampAssigner()方法,将数据中的timestamp字段提取出来,作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。这样,提取出的数据时间戳,就是我们处理计算的事件时间。
这里需要注意的是,时间戳和水位线的单位,必须都是毫秒。
- 乱序流
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
代码示例如下:
publicclass WatermarkExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env .addSource(new ClickSource()) // 插入水位线的逻辑 .assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为5s WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { // 抽取时间戳的逻辑 @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ) .print(); env.execute(); } }
上面代码中,我们同样提取了timestamp字段作为时间戳,并且以5秒的延迟时间创建了处理乱序流的水位线生成器。
- 自定义水位线策略
一般来说,Flink内置的水位线生成器就可以满足应用需求了。不过有时我们的业务逻辑可能非常复杂,这时对水位线生成的逻辑也有更高的要求,就必须自定义实现水位线策略WatermarkStrategy了。
在WatermarkStrategy中,时间戳分配器TimestampAssigner都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于WatermarkGenerator的实现。整体说来,Flink有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)。
WatermarkGenerator接口中有两个方法,onEvent()和onPeriodicEmit(),前者是在每个事件到来时调用,而后者由框架周期性调用。周期性调用的方法中发出水位线,自然就是周期性生成水位线;而在事件触发的方法中发出水位线,自然就是断点式生成了。两种方式的不同就集中体现在这两个方法的实现上。
- 周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
下面是一段自定义周期性生成水位线的代码:
import com.atguigu.bean.Event; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; // 自定义水位线的产生 publicclass CustomPeriodicWatermarkExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env .addSource(new ClickSource()) .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) .print(); env.execute(); } publicstaticclass CustomWatermarkStrategy implements WatermarkStrategy<Event> { @Override public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) { returnnew SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段 } }; } @Override public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { returnnew CustomBoundedOutOfOrdernessGenerator(); } } publicstaticclass CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> { private Long delayTime = 5000L; // 延迟时间 private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳 @Override public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) { // 每来一条数据就调用一次 maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳 } @Override public void onPeriodicEmit(WatermarkOutput output) { // 发射水位线,默认200ms调用一次 output.emitWatermark(new Watermark(maxTs - delayTime - 1L)); } } }
我们在onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认200ms一次。
- 断点式水位线生成器(Punctuated Generator)
断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过onPeriodicEmit()发出水位线。
自定义的断点式水位线生成器代码如下:
publicclass PunctuatedGenerator implements WatermarkGenerator<Event> { @Override public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) { // 只有在遇到特定的itemId时,才发出水位线 if (r.user.equals("Mary")) { output.emitWatermark(new Watermark(r.timestamp - 1)); } } @Override public void onPeriodicEmit(WatermarkOutput output) { // 不需要做任何事情,因为我们在onEvent方法中发射了水位线 } }
我们在onEvent()中判断当前事件的user字段,只有遇到“Mary”这个特殊的值时,才调用output.emitWatermark()发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。
- 在自定义数据源中发送水位线
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。示例程序如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import java.sql.Timestamp; import java.util.Calendar; import java.util.Random; publicclass EmitWatermarkInSourceFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.addSource(new ClickSource()).print(); env.execute(); } // 泛型是数据源中的类型 publicstaticclass ClickSource implements SourceFunction<Event> { privateboolean running = true; @Override public void run(SourceContext<Event> sourceContext) throws Exception { Random random = new Random(); String[] userArr = {"Mary", "Bob", "Alice"}; String[] urlArr = {"./home", "./cart", "./prod?id=1"}; while (running) { long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳 String username = userArr[random.nextInt(userArr.length)]; String url = urlArr[random.nextInt(urlArr.length)]; Event event = new Event(username, url, currTs); // 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段 sourceContext.collectWithTimestamp(event, event.timestamp); // 发送水位线 sourceContext.emitWatermark(new Watermark(event.timestamp - 1L)); Thread.sleep(1000L); } } @Override public void cancel() { running = false; } } }
在自定义水位线中生成水位线相比assignTimestampsAndWatermarks方法更加灵活,可以任意的产生周期性的、非周期性的水位线,以及水位线的大小也完全由我们自定义。所以非常适合用来编写Flink的测试程序,测试Flink的各种各样的特性。
10.2.4 水位线的传递
在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收刀多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
如图所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三个并行子任务,所以会向三个分区发出水位线。具体过程如下:
(1)上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线”(Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。 (2)当有一个新的水位线(第一分区的4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的3,于是当前任务时钟就推进到了3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务。 (3)再次收到新的水位线(第二分区的7)后,执行同样的处理流程。首先将第二个分区时钟更新为7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。 (4)同样道理,当又一次收到新的水位线(第三分区的6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的4,所以当前任务的时钟推进到4,并发出时间戳为4的水位线,广播到下游各个分区任务。 水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完