Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(七)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
实时计算 Flink 版,5000CU*H 3个月
云数据库 Tair(兼容Redis),内存型 2GB
简介: Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面

一个常见的应用场景就是,如果我们希望连接到一个外部数据库进行读写操作,那么将连接操作放在map()中显然不是个好选择——因为每来一条数据就会重新连接一次数据库;所以我们可以在open()中建立连接,在map()中读写数据,而在close()中关闭连接。

publicclass MyFlatMap extends RichFlatMapFunction<IN, OUT>> {
@Override
public void open(Configuration configuration) {
  // 做一些初始化工作
  // 例如建立一个和MySQL的连接
}
@Override
public void flatMap(IN in, Collector<OUT out) {
  // 对数据库进行读写
}
@Override
public void close() {
  // 清理工作,关闭和MySQL数据库的连接。
}
}

9.3.4 物理分区算子(Physical Partitioning)

常见的物理分区策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)。

  1. 随机分区(shuffle)

最简单的重分区方式就是直接“洗牌”。通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。

随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区,如图5-9所示。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。

640.png

经过随机分区之后,得到的依然是一个DataStream。

我们可以做个简单测试:将数据读入之后直接打印到控制台,将输出的并行度设置为2,中间经历一次shuffle。执行多次,观察结果是否相同。

publicclass ShuffleExample {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<Integer> stream = env.fromElements(1, 2, 3, 4).setParallelism(1);
    stream.shuffle().print().setParallelism(2);
    env.execute();
  }
}
  1. 轮询分区(Round-Robin)

轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,如图5-10所示。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

640.png

  1. 重缩放分区(rescale)

重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,如图5-11所示。也就是说,“发牌人”如果有多个,那么rebalance的方式是每个发牌人都面向所有人发牌;而rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。

aa3c1d4d581a119f7363574498f2f1b2.png

从底层实现上看,rebalance和rescale的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而rescale仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源。

  1. 广播(broadcast)

这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

  1. 全局分区(global)

全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的。

  1. 自定义分区(Custom)

当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。

9.4  输出算子(Sink)

5a458a5b3957d229e300b2d72d7aafcf.png

Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

9.4.1 连接到外部系统

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。

与Source算子非常类似,除去一些Flink预实现的Sink,一般情况下Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

stream.addSink(newSinkFunction(…));

addSource的参数需要实现一个SourceFunction接口;类似地,addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

当然,SinkFuntion多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如图5-13所示,列出了Flink官方目前支持的第三方系统连接器:

d443729366fea9d42f50be8d71c1b2ed.png

我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、文件系统(FileSystem)、JDBC等数据存储系统,则只提供了输出写入的sink连接器。

除Flink官方之外,Apache Bahir作为给Spark和Flink提供扩展支持的项目,也实现了一些其他第三方系统与Flink的连接器,如图所示。

0dd8e18db222e13bde78c2045bef4e35.png

除此以外,就需要用户自定义实现sink连接器了。

9.4.2 输出到文件

Flink专门提供了一个流式文件系统的连接器:StreamingFileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

StreamingFileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink的静态方法:

行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。
 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。

下面我们就以行编码为例,将一些测试数据直接写入文件:

publicclass SinkFile {
 public static void main(String[] args) throws Exception {
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.setParallelism(1);
   DataStream<String> input = env.fromElements("hello world", "hello flink");
   final StreamingFileSink<String> sink = StreamingFileSink
       .forRowFormat(new Path("./output"), 
new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(
           DefaultRollingPolicy.builder()
             .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
             .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
             .withMaxPartSize(1024 * 1024 * 1024)
             .build())
       .build();
   input.addSink(sink);
   env.execute();
 }
}

这里我们创建了一个简单的文件Sink,通过.withRollingPolicy()方法指定了一个“滚动策略”。因为文件会有内容持续不断地写入,所以我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面的代码设置了在以下3种情况下,我们就会滚动分区文件:

至少包含15分钟的数据
最近5分钟没有收到新的数据
文件大小已达到1 GB

9.4.3 输出到Kafka

  • 添加Kafka 连接器依赖

由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。

  • 启动Kafka集群
  • 编写输出到Kafka的示例代码

我们可以直接将用户行为数据保存为文件clicks.csv,读取后不做转换直接写入Kafka,主题(topic)命名为“clicks”。

publicclass SinkKafka {
 public static void main(String[] args) throws Exception {
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.setParallelism(1);
   Properties properties = new Properties();
   properties.put("bootstrap.servers", "hadoop102:9092");
   DataStreamSource<String> stream = env.readTextFile("input/clicks.csv");
   stream
       .addSink(new FlinkKafkaProducer<String>(
           "clicks",
           new SimpleStringSchema(),
           properties
       ));
   env.execute();
 }
}
  • 运行代码,在Linux主机启动一个消费者, 查看是否收到数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic clicks

9.4.4 输出到Redis

Flink没有直接提供官方的Redis连接器,不过Bahir项目还是担任了合格的辅助角色,为我们提供了Flink-Redis的连接工具。但版本升级略显滞后,目前连接器版本为1.0,支持的Scala版本最新到2.11。由于我们的测试不涉及到Scala的相关版本变化,所以并不影响使用。在实际项目应用中,应该以匹配的组件版本运行。

具体测试步骤如下:

  • 导入的Redis连接器依赖
<dependency>
 <groupId>org.apache.bahir</groupId>
 <artifactId>flink-connector-redis_2.11</artifactId>
 <version>1.0</version>
</dependency>
  • 启动Redis集群

这里我们为方便测试,只启动了单节点Redis。

  • 编写输出到Redis的示例代码

连接器为我们提供了一个RedisSink,它继承了抽象类RichSinkFunction,这就是已经实现好的向Redis写入数据的SinkFunction。我们可以直接将Event数据输出到Redis:

publicclass SinkRedis {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop200").build();
    env.addSource(new ClickSource())
        .addSink(new RedisSink<Event>(conf, new MyRedisMapper()));
    env.execute();
  }
}

这里RedisSink的构造方法需要传入两个参数:

JFlinkJedisConfigBase:Jedis的连接配置
 RedisMapper:Redis映射类接口,说明怎样将数据转换成可以写入Redis的类型

接下来主要就是定义一个Redis的映射类,实现RedisMapper接口。

publicstaticclass MyRedisMapper implements RedisMapper<Event> {
  @Override
  public String getKeyFromData(Event e) {
    return e.user;
  }
  @Override
  public String getValueFromData(Event e) {
    return e.url;
  }
  @Override
  public RedisCommandDescription getCommandDescription() {
    returnnew RedisCommandDescription(RedisCommand.HSET, "clicks");
  }
}

在这里我们可以看到,保存到Redis时调用的命令是HSET,所以是保存为哈希表(hash),表名为“clicks”;保存的数据以user为key,以url为value,每来一条数据就会做一次转换。

  • 运行代码,Redis查看是否收到数据。

9.4.5 输出到Elasticsearch

Flink为ElasticSearch专门提供了官方的Sink 连接器。

写入数据的ElasticSearch的测试步骤如下。

  • 添加Elasticsearch 连接器依赖
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
  • 启动Elasticsearch集群
  • 编写输出到Elasticsearch的示例代码
publicclass SinkToEs {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    ArrayList<HttpHost> httpHosts = new ArrayList<>();
    httpHosts.add(new HttpHost("hadoop102", 9200, "http"));
    ElasticsearchSink.Builder<Event> esBuilder = new ElasticsearchSink.Builder<>(
        httpHosts,
        new ElasticsearchSinkFunction<Event>() {
          @Override
          public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
            HashMap<String, String> data = new HashMap<>();
            data.put(event.user, event.url);
            IndexRequest indexRequest = Requests
                .indexRequest()
                .index("clicks")
               .type("type")
                .source(data);
            requestIndexer.add(indexRequest);
          }
        }
    );
    DataStream<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L));
    stream.addSink(esBuilder.build());
    env.execute();
  }
}

与RedisSink类似,连接器也为我们实现了写入到Elasticsearch的SinkFunction——ElasticsearchSink。区别在于,这个类的构造方法是私有(private)的,我们需要使用ElasticsearchSink的Builder内部静态类,调用它的build()方法才能创建出真正的SinkFunction。

而Builder的构造方法中又有两个参数:

httpHosts:连接到的Elasticsearch集群主机列表
 elasticsearchSinkFunction:这并不是我们所说的SinkFunction,而是用来说明具体处理逻辑、准备数据向Elasticsearch发送请求的函数

具体的操作需要重写中elasticsearchSinkFunction中的process方法,我们可以将要发送的数据放在一个HashMap中,包装成IndexRequest向外部发送HTTP请求。

  • 运行代码,访问Elasticsearch查看是否收到数据。

9.4.6 输出到MySQL(JDBC)

写入数据的MySQL的测试步骤如下。

  • 添加依赖
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.47</version>
</dependency>
  • 启动MySQL,在test库下建表clicks
mysql> createtable clicks(
  -> uservarchar(20) notnull,
  -> urlvarchar(100) notnull);
  • 编写输出到MySQL的示例代码
publicclass SinkToMySQL {
public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(1);
  env
      .fromElements(
          new Event("Mary", "./home", 1000L),
          new Event("Bob", "./cart", 2000L)
      )
      .addSink(
          JdbcSink.sink(
              "INSERT INTO clicks (user, url) VALUES (?, ?)",
              (statement, r) -> {
                statement.setString(1, r.user);
                statement.setString(2, r.url);
              },
              JdbcExecutionOptions.builder()
                  .withBatchSize(1000)
                  .withBatchIntervalMs(200)
                  .withMaxRetries(5)
                  .build(),
              new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                  .withUrl("jdbc:mysql://localhost:3306/test")
                  // 使用MySQL 5.7的话,没有cj
                  .withDriverName("com.mysql.cj.jdbc.Driver")
                  .withUsername("username")
                  .withPassword("password")
                  .build()
          )
      );
  env.execute();
}
}
  • 运行代码,用客户端连接MySQL,查看是否成功写入数据。

9.4.7 自定义Sink输出

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());

在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

十、Flink中的时间和窗口

在流数据处理应用中,一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来我们就深入了解一下Flink中的时间语义和窗口的应用。

10.1 时间语义

10.1.1 Flink中的时间语义

da511c701734b7f91cc378fb3ccd2625.png

如图所示,在事件发生之后,生成的数据被收集起来,首先进入分布式消息队列,然后被Flink系统中的Source算子读取消费,进而向下游的转换算子(窗口算子)传递,最终由窗口算子进行计算处理。

很明显,这里有两个非常重要的时间点:一个是数据产生的时间,我们把它叫作“事件时间”(Event Time);另一个是数据真正被处理的时刻,叫作“处理时间”(Processing Time)。我们所定义的窗口操作,到底是以那种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有所滞后。

  1. 处理时间(Processing Time)

处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。处理时间是最简单的时间语义。

  1. 事件时间(Event Time)
事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。
数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实就是这条数据记录的“时间戳”(Timestamp)。
在事件时间语义下,我们对于时间的衡量,就不看任何机器的系统时间了,而是依赖于数据本身。由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在Flink中把它叫作事件时间的“水位线”(Watermarks)。

10.1.2 哪种时间语义更重要

  1. 从《星球大战》说起为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例子:电影《星球大战》。

a63415710a52edf283e7b0c0d78f1e05.png

如图所示,我们会发现,看电影其实就是处理影片中数据的过程,所以影片的上映时间就相当于“处理时间”;而影片的数据就是所描述的故事,它所发生的背景时间就相当于“事件时间”。两种时间语义都有各自的用途,适用于不同的场景。

  1. 数据处理系统中的时间语义

在计算机系统中,考虑数据处理的“时代变化”是没什么意义的,我们更关心的,显然是数据本身产生的时间。

所以在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。

在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从1.12版本开始,Flink已经将事件时间作为了默认的时间语义。

10.2 水位线(Watermark)

10.2.1 事件时间和窗口

ded9add8a71fe974cd2e52257b9caf6d.png

在窗口的处理过程中,我们可以基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。

这样的好处在于,计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计处理,得到的结果都是正确的。而一般实时流处理的场景中,事件时间可以基本与处理时间保持同步,只是略微有一点延迟,同时保证了窗口计算的正确性。

10.2.2 什么是水位线

在Flink中,用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

d377bb95e85407e387edbd11584edd6e.png

  1. 有序流中的水位线

在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;也就是说,它们处理的过程会保持原先的顺序不变,遵守先来后到的原则。这样的话我们从每个数据中提取时间戳,就可以保证总是从小到大增长的,从而插入的水位线也会不断增长、事件时钟不断向前推进。

实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同,同时涌来的数据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳,如图6-6所示。所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。

02e68bc5aa7b46128754406dae9df7f7.png

  1. 乱序流中的水位线

在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是所谓的“乱序数据”。

这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产生时间而言的。如图6-7所示,一个7秒时产生的数据,生成时间自然要比9秒的数据早;但是经过数据缓存和传输之后,处理任务可能先收到了9秒的数据,之后7秒的数据才姗姗来迟。这时如果我们希望插入水位线,来指示当前的事件时间进展,又该怎么做呢?

266119b99882674eb606a255bb091062.png

解决思路也很简单:我们还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。不过现在的情况是数据乱序,所以插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线,如图6-8所示。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

9ad861f4b39da0849fd1184f3b0519a8.png

如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线,如下图所示。

cb2a2617e1e5f2dd66c10c88948202ec.png

但是这样做会带来一个非常大的问题:我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,我们也可以等上一段时间,比如2秒;也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳,如下图所示。这样的话,9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,这时迟到数据也都已收集齐,0~9秒的窗口就可以正确计算结果了。

979c535937d22f2c786fff687888d5cb.png

  1. 水位线的特性

现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。

总结一下水位线的特性:

水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
水位线是基于数据的时间戳生成的
水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
水位线可以通过设置延迟,来保证正确处理乱序数据
一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t, 这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’ ≤ t的数据
水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。

10.2.3 如何生成水位线

水位线是用来保证窗口处理结果的正确性的,如果不能正确处理所有乱序数据,可以尝试调大延迟的时间。

  1. 生成水位线的总体原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。

如果我们希望计算结果能更加准确,那可以将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了,我们可能为极少数的迟到数据增加了很多不必要的延迟。

如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。

所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

  1. 水位线生成策略(Watermark Strategies)

在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
WatermarkStrategy<T> watermarkStrategy)

具体使用时,直接用DataStream调用该方法即可,与普通的transform方法完全一样。

DataStream<Event> stream = env.addSource(new ClickSource());
DataStream<Event> withTimestampsAndWatermarks = 
stream.assignTimestampsAndWatermarks(*<watermark strategy>*);

.assignTimestampsAndWatermarks()方法需要传入一个WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy中包含了一个“时间戳分配器” TimestampAssigner和一个“水位线生成器” WatermarkGenerator。

publicinterface WatermarkStrategy<T> 
  extends TimestampAssignerSupplier<T>,
      WatermarkGeneratorSupplier<T>{
  @Override
  TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
  @Override
  WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。
在WatermarkGenerator接口中,主要又有两个方法:onEvent()和onPeriodicEmit()。
onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作
onPeriodicEmit:周期性调用的方法,可以由WatermarkOutput发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。
env.getConfig().setAutoWatermarkInterval(60 * 1000L);
  1. Flink内置水位线生成器

WatermarkStrategy这个接口是一个生成水位线策略的抽象,而Flink提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,而且也为我们自定义水位线策略提供了模板。

这两个生成器可以通过调用WatermarkStrategy的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。

  • 有序流

对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。

stream.assignTimestampsAndWatermarks(
    WatermarkStrategy.<Event>forMonotonousTimestamps()
        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
          @Override
          public long extractTimestamp(Event element, long recordTimestamp) {
            return element.timestamp;
          }
        })
);

上面代码中我们调用.withTimestampAssigner()方法,将数据中的timestamp字段提取出来,作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。这样,提取出的数据时间戳,就是我们处理计算的事件时间。

这里需要注意的是,时间戳和水位线的单位,必须都是毫秒。

  • 乱序流

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。

代码示例如下:

publicclass WatermarkExample {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env
        .addSource(new ClickSource())
        // 插入水位线的逻辑
        .assignTimestampsAndWatermarks(
             // 针对乱序流插入水位线,延迟时间设置为5s
              WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
       .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                  // 抽取时间戳的逻辑
                  @Override
                  public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                  }
                })
        )
        .print();
    env.execute();
  }
}

上面代码中,我们同样提取了timestamp字段作为时间戳,并且以5秒的延迟时间创建了处理乱序流的水位线生成器。

  1. 自定义水位线策略

一般来说,Flink内置的水位线生成器就可以满足应用需求了。不过有时我们的业务逻辑可能非常复杂,这时对水位线生成的逻辑也有更高的要求,就必须自定义实现水位线策略WatermarkStrategy了。

在WatermarkStrategy中,时间戳分配器TimestampAssigner都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于WatermarkGenerator的实现。整体说来,Flink有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)。

WatermarkGenerator接口中有两个方法,onEvent()和onPeriodicEmit(),前者是在每个事件到来时调用,而后者由框架周期性调用。周期性调用的方法中发出水位线,自然就是周期性生成水位线;而在事件触发的方法中发出水位线,自然就是断点式生成了。两种方式的不同就集中体现在这两个方法的实现上。

  • 周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。

下面是一段自定义周期性生成水位线的代码:

import com.atguigu.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// 自定义水位线的产生
publicclass CustomPeriodicWatermarkExample {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env
        .addSource(new ClickSource())
        .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
        .print();
    env.execute();
  }
  publicstaticclass CustomWatermarkStrategy implements WatermarkStrategy<Event> {
    @Override
    public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
      returnnew SerializableTimestampAssigner<Event>() {
        @Override
        public long extractTimestamp(Event element, long recordTimestamp) {
          return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
        }
      };
    }
    @Override
    public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
      returnnew CustomBoundedOutOfOrdernessGenerator();
    }
  }
  publicstaticclass CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {
    private Long delayTime = 5000L; // 延迟时间
    private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳
    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
      // 每来一条数据就调用一次
      maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
    }
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
      // 发射水位线,默认200ms调用一次
      output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
    }
  }
}

我们在onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认200ms一次。

  • 断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过onPeriodicEmit()发出水位线。

自定义的断点式水位线生成器代码如下:

publicclass PunctuatedGenerator implements WatermarkGenerator<Event> {
  @Override
  public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
// 只有在遇到特定的itemId时,才发出水位线
    if (r.user.equals("Mary")) {
      output.emitWatermark(new Watermark(r.timestamp - 1));
    }
  }
  @Override
  public void onPeriodicEmit(WatermarkOutput output) {
    // 不需要做任何事情,因为我们在onEvent方法中发射了水位线
  }
}

我们在onEvent()中判断当前事件的user字段,只有遇到“Mary”这个特殊的值时,才调用output.emitWatermark()发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。

  1. 在自定义数据源中发送水位线

我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。示例程序如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Random;
publicclass EmitWatermarkInSourceFunction {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.addSource(new ClickSource()).print();
    env.execute();
  }
  // 泛型是数据源中的类型
  publicstaticclass ClickSource implements SourceFunction<Event> {
    privateboolean running = true;
    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
      Random random = new Random();
      String[] userArr = {"Mary", "Bob", "Alice"};
      String[] urlArr  = {"./home", "./cart", "./prod?id=1"};
      while (running) {
        long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
        String username = userArr[random.nextInt(userArr.length)];
        String url    = urlArr[random.nextInt(urlArr.length)];
        Event event = new Event(username, url, currTs);
        // 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段
        sourceContext.collectWithTimestamp(event, event.timestamp);
        // 发送水位线
        sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
        Thread.sleep(1000L);
      }
    }
    @Override
    public void cancel() {
      running = false;
    }
  }
}

在自定义水位线中生成水位线相比assignTimestampsAndWatermarks方法更加灵活,可以任意的产生周期性的、非周期性的水位线,以及水位线的大小也完全由我们自定义。所以非常适合用来编写Flink的测试程序,测试Flink的各种各样的特性。

10.2.4 水位线的传递

55544acffd1a5d5563310ab6cfae3896.png

在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收刀多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。

如图所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三个并行子任务,所以会向三个分区发出水位线。具体过程如下:

(1)上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线”(Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。
(2)当有一个新的水位线(第一分区的4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的3,于是当前任务时钟就推进到了3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务。
(3)再次收到新的水位线(第二分区的7)后,执行同样的处理流程。首先将第二个分区时钟更新为7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。
(4)同样道理,当又一次收到新的水位线(第三分区的6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的4,所以当前任务的时钟推进到4,并发出时间戳为4的水位线,广播到下游各个分区任务。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
26天前
|
存储 分布式计算 API
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
72 0
|
26天前
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
53 1
|
3月前
|
存储 监控 Cloud Native
Serverless 应用的监控与调试问题之Flink流批一体在架构层面有什么演进
Serverless 应用的监控与调试问题之Flink流批一体在架构层面有什么演进
|
2天前
|
弹性计算 Kubernetes Cloud Native
云原生架构下的微服务设计原则与实践####
本文深入探讨了在云原生环境中,微服务架构的设计原则、关键技术及实践案例。通过剖析传统单体架构面临的挑战,引出微服务作为解决方案的优势,并详细阐述了微服务设计的几大核心原则:单一职责、独立部署、弹性伸缩和服务自治。文章还介绍了容器化技术、Kubernetes等云原生工具如何助力微服务的高效实施,并通过一个实际项目案例,展示了从服务拆分到持续集成/持续部署(CI/CD)流程的完整实现路径,为读者提供了宝贵的实践经验和启发。 ####
|
25天前
|
缓存 监控 API
探索微服务架构中的API网关模式
【10月更文挑战第5天】随着微服务架构的兴起,企业纷纷采用这一模式构建复杂应用。在这种架构下,应用被拆分成若干小型、独立的服务,每个服务围绕特定业务功能构建并通过HTTP协议协作。随着服务数量增加,统一管理这些服务间的交互变得至关重要。API网关作为微服务架构的关键组件,承担起路由请求、聚合数据、处理认证与授权等功能。本文通过一个在线零售平台的具体案例,探讨API网关的优势及其实现细节,展示其在简化客户端集成、提升安全性和性能方面的关键作用。
70 2
|
29天前
|
存储 缓存 监控
探索微服务架构中的API网关模式
【10月更文挑战第1天】探索微服务架构中的API网关模式
80 2
|
1天前
|
缓存 监控 API
探索微服务架构中的API网关模式
随着微服务架构的兴起,API网关成为管理和服务间交互的关键组件。本文通过在线零售公司的案例,探讨了API网关在路由管理、认证授权、限流缓存、日志监控和协议转换等方面的优势,并详细介绍了使用Kong实现API网关的具体步骤。
11 3
|
2天前
|
运维 NoSQL Java
后端架构演进:微服务架构的优缺点与实战案例分析
【10月更文挑战第28天】本文探讨了微服务架构与单体架构的优缺点,并通过实战案例分析了微服务架构在实际应用中的表现。微服务架构具有高内聚、低耦合、独立部署等优势,但也面临分布式系统的复杂性和较高的运维成本。通过某电商平台的实际案例,展示了微服务架构在提升系统性能和团队协作效率方面的显著效果,同时也指出了其带来的挑战。
25 4
|
1天前
|
存储 缓存 监控
探索微服务架构中的API网关模式
探索微服务架构中的API网关模式
13 2
|
1天前
|
JavaScript 持续交付 Docker
解锁新技能:Docker容器化部署在微服务架构中的应用
【10月更文挑战第29天】在数字化转型中,微服务架构因灵活性和可扩展性成为企业首选。Docker容器化技术为微服务的部署和管理带来革命性变化。本文探讨Docker在微服务架构中的应用,包括隔离性、可移植性、扩展性、版本控制等方面,并提供代码示例。
18 1

热门文章

最新文章