Flink教程(23)- Flink高级特性(Streaming File Sink)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink教程(23)- Flink高级特性(Streaming File Sink)

01 引言

在前面的博客,我们学习了FlinkEnd-to-End Exactly-Once了,有兴趣的同学可以参阅下:

本文主要讲解Flink的高级特性其中之一的Streaming File Sink。

02 Streaming File Sink 概述

参考: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html

2.1 场景描述

StreamingFileSink是Flink1.7中推出的新特性,是为了解决如下的问题:

  • 大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中。
  • StreamingFileSink就可以用来将分区文件写入到支持 Flink FileSystem 接口的文件系统中,支持Exactly-Once语义。
  • 这种sink实现的Exactly-Once都是基于Flink checkpoint来实现的两阶段提交模式来保证的,主要应用在实时数仓、topic拆分、基于小时分析处理等场景下。

2.2 Bucket和SubTask、PartFile

Bucket: StreamingFileSink可向由Flink FileSystem抽象支持的文件系统写入分区文件(因为是流式写入,数据被视为无界)。该分区行为可配,默认按时间,具体来说每小时写入一个Bucket,该Bucket包括若干文件,内容是这一小时间隔内流中收到的所有record。

PartFile: 每个Bukcket内部分为多个PartFile来存储输出数据,该Bucket生命周期内接收到数据的sink的每个子任务至少有一个PartFile。

而额外文件滚动由可配的滚动策略决定,默认策略是根据文件大小和打开超时(文件可以被打开的最大持续时间)以及文件最大不活动超时等决定是否滚动。

Bucket和SubTask、PartFile关系如图所示:

03 案例演示

需求:编写Flink程序,接收socket的字符串数据,然后将接收到的数据流式方式存储到hdfs

开发步骤:

1.初始化流计算运行环境
2.设置Checkpoint(10s)周期性启动
3.指定并行度为1
4.接入socket数据源,获取数据
5.指定文件编码格式为行编码格式
6.设置桶分配策略
7.设置文件滚动策略
8.指定文件输出配置
9.将streamingfilesink对象添加到环境
10.执行任务

实现代码:

/**
 * StreamFileSink
 *
 * @author : YangLinWei
 * @createTime: 2022/3/9 9:00 上午
 */
public class StreamFileSinkDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10));
        env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        //2.source
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
        //3.sink
        //设置sink的前缀和后缀
        //文件的头和文件扩展名
        //prefix-xxx-.txt
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("prefix")
                .withPartSuffix(".txt")
                .build();
        //设置sink的路径
        String outputPath = "hdfs://node1:8020/FlinkStreamFileSink/parquet";
        //创建StreamingFileSink
        final StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(
                        new Path(outputPath),
                        new SimpleStringEncoder<String>("UTF-8"))
                /**
                 * 设置桶分配政策
                 * DateTimeBucketAssigner --默认的桶分配政策,默认基于时间的分配器,每小时产生一个桶,格式如下yyyy-MM-dd--HH
                 * BasePathBucketAssigner :将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)
                 */
                .withBucketAssigner(new DateTimeBucketAssigner<>())
                /**
                 * 有三种滚动政策
                 *  CheckpointRollingPolicy
                 *  DefaultRollingPolicy
                 *  OnCheckpointRollingPolicy
                 */
                .withRollingPolicy(
                        /**
                         * 滚动策略决定了写出文件的状态变化过程
                         * 1. In-progress :当前文件正在写入中
                         * 2. Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
                         * 3. Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
                         *
                         * 观察到的现象
                         * 1.会根据本地时间和时区,先创建桶目录
                         * 2.文件名称规则:part-<subtaskIndex>-<partFileIndex>
                         * 3.在macos中默认不显示隐藏文件,需要显示隐藏文件才能看到处于In-progress和Pending状态的文件,因为文件是按照.开头命名的
                         *
                         */
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.SECONDS.toMillis(2)) //设置滚动间隔
                                .withInactivityInterval(TimeUnit.SECONDS.toMillis(1)) //设置不活动时间间隔
                                .withMaxPartSize(1024 * 1024 * 1024) // 最大尺寸
                                .build())
                .withOutputFileConfig(config)
                .build();
        lines.addSink(sink).setParallelism(1);
        env.execute();
    }
}

04 配置详解

4.1 PartFile

前面提到过,每个Bukcket内部分为多个部分文件,该Bucket内接收到数据的sink的每个子任务至少有一个PartFile。而额外文件滚动由可配的滚动策略决定。

关于顺序性:

对于任何给定的Flink子任务,PartFile索引都严格增加(按创建顺序),但是,这些索引并不总是顺序的。当作业重新启动时,所有子任务的下一个PartFile索引将是max PartFile索引+ 1,其中max是指在所有子任务中对所有计算的索引最大值。

4.1.1 PartFile生命周期

输出文件的命名规则和生命周期。由上图可知,部分文件(part file)可以处于以下三种状态之一:

  • In-progress :当前文件正在写入中
  • Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
  • Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态,处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。

注意: 使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。

4.1.2 PartFile生成规则

在每个活跃的Bucket期间,每个Writer的子任务在任何时候都只会有一个单独的In-progress PartFile,但可有多个Peding和Finished状态文件。

一个Sink的两个Subtask的PartFile分布情况实例如下:

初始状态,两个inprogress文件正在被两个subtask分别写入

└── 2020-03-25--12
    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    └── part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575

当part-1-0因文件大小超过阈值等原因发生滚动时,变为Pending状态等待完成,但此时不会被重命名。注意此时Sink会创建一个新的PartFile即part-1-1:

└── 2020-03-25--12
    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

待下次checkpoint成功后,part-1-0完成变为Finished状态,被重命名:

└── 2020-03-25--12
    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── part-1-0
    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

下一个Bucket周期到了,创建新的Bucket目录,不影响之前Bucket内的的in-progress文件,依然要等待文件RollingPolicy以及checkpoint来改变状态:

└── 2020-03-25--12
    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── part-1-0
    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
└── 2020-03-25--13
    └── part-0-2.inprogress.2b475fec-1482-4dea-9946-eb4353b475f1

4.1.3 PartFile命名配置

默认,PartFile命名规则如下:

  • In-progress / Pending
    part–.inprogress.uid
  • Finished
    part–

比如part-1-20表示1号子任务已完成的20号文件。

可以使用OutputFileConfig来改变前缀和后缀,代码示例如下

OutputFileConfig config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build()
StreamingFileSink sink = StreamingFileSink
 .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withOutputFileConfig(config)
 .build()

得到的PartFile示例如下:

└── 2019-08-25--12
    ├── prefix-0-0.ext
    ├── prefix-0-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── prefix-1-0.ext
    └── prefix-1-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

4.2 PartFile序列化编码

StreamingFileSink 支持行编码格式和批量编码格式,比如 Apache Parquet 。这两种变体可以使用以下静态方法创建:

Row-encoded sink:

  • StreamingFileSink.forRowFormat(basePath, rowEncoder)
//行
StreamingFileSink.forRowFormat(new Path(path), new SimpleStringEncoder<T>())
        .withBucketAssigner(new PaulAssigner<>()) //分桶策略
        .withRollingPolicy(new PaulRollingPolicy<>()) //滚动策略
        .withBucketCheckInterval(CHECK_INTERVAL) //检查周期
        .build();

Bulk-encoded sink:

StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)

//列 parquet
StreamingFileSink.forBulkFormat(new Path(path), ParquetAvroWriters.forReflectRecord(clazz))
        .withBucketAssigner(new PaulBucketAssigner<>())
        .withBucketCheckInterval(CHECK_INTERVAL)
        .build();

创建行或批量编码的 Sink 时,我们需要指定存储桶的基本路径和数据的编码

这两种写入格式除了文件格式的不同,另外一个很重要的区别就是回滚策略的不同:

  • forRowFormat行写可基于文件大小、滚动时间、不活跃时间进行滚动,
  • forBulkFormat列写方式只能基于checkpoint机制进行文件滚动,即在执行snapshotState方法时滚动文件,如果基于大小或者时间滚动文件,那么在任务失败恢复时就必须对处于in-processing状态的文件按照指定的offset进行truncate,由于列式存储是无法针对文件offset进行truncate的,因此就必须在每次checkpoint使文件滚动,其使用的滚动策略实现是OnCheckpointRollingPolicy。

forBulkFormat只能和 OnCheckpointRollingPolicy 结合使用,每次做 checkpoint 时滚动文件。

4.2.1 Row Encoding

此时,StreamingFileSink会以每条记录为单位进行编码和序列化。

必须配置项:

  • 输出数据的BasePath
  • 序列化每行数据写入PartFile的Encoder

使用RowFormatBuilder可选配置项:

  • 自定义RollingPolicy:默认使用DefaultRollingPolicy来滚动文件,可自定义
    bucketCheckInterval
  • 默认1分钟。该值单位为毫秒,指定按时间滚动文件间隔时间

例子如下:

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
// 1. 构建DataStream
DataStream input  = ...
// 2. 构建StreamingFileSink,指定BasePath、Encoder、RollingPolicy
StreamingFileSink sink  = StreamingFileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
            .withMaxPartSize(1024 * 1024 * 1024)
            .build())
    .build()
// 3. 添加Sink到InputDataSteam即可
input.addSink(sink)

以上例子构建了一个简单的拥有默认Bucket构建行为(继承自BucketAssigner的DateTimeBucketAssigner)的StreamingFileSink,每小时构建一个Bucket,内部使用继承自RollingPolicy的DefaultRollingPolicy,以下三种情况任一发生会滚动PartFile:

  • PartFile包含至少15分钟的数据
  • 在过去5分钟内没有接收到新数据
  • 在最后一条记录写入后,文件大小已经达到1GB

除了使用DefaultRollingPolicy,也可以自己实现RollingPolicy接口来实现自定义滚动策略。

4.2.2 Bulk Encoding

要使用批量编码,请将StreamingFileSink.forRowFormat()替换为StreamingFileSink.forBulkFormat(),注意此时必须指定一个BulkWriter.Factory而不是行模式的Encoder。BulkWriter在逻辑上定义了如何添加、fllush新记录以及如何最终确定记录的bulk以用于进一步编码。

需要注意的是,使用Bulk Encoding时,Filnk1.9版本的文件滚动就只能使用OnCheckpointRollingPolicy的策略,该策略在每次checkpoint时滚动part-file。

Flink有三个内嵌的BulkWriter:

  • ParquetAvroWriters
    有一些静态方法来创建ParquetWriterFactory。
  • SequenceFileWriterFactory
  • CompressWriterFactory

Flink有内置方法可用于为Avro数据创建Parquet writer factory。

要使用ParquetBulkEncoder,需要添加以下Maven依赖:

<!-- streaming File Sink所需要的jar包-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-parquet_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-avro</artifactId>
    <version>1.12.0</version>
</dependency>

4.3 桶分配策略

桶分配策略定义了将数据结构化后写入基本输出目录中的子目录,行格式和批量格式都需要使用。

具体来说,StreamingFileSink使用BucketAssigner来确定每条输入的数据应该被放入哪个Bucket,默认情况下,DateTimeBucketAssigner 基于系统默认时区每小时创建一个桶:

  • 格式如下:yyyy-MM-dd–HH。日期格式(即桶的大小)和时区都可以手动配置。
  • 我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner。

Flink 有两个内置的 BucketAssigners :

  • DateTimeBucketAssigner:默认基于时间的分配器
  • BasePathBucketAssigner:将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)

4.3.1 DateTimeBucketAssigner

Row格式和Bulk格式编码都使用DateTimeBucketAssigner作为默认BucketAssigner。 默认情况下,DateTimeBucketAssigner 基于系统默认时区每小时以格式yyyy-MM-dd–HH来创建一个Bucket,Bucket路径为/{basePath}/{dateTimePath}/。

  • basePath是指StreamingFileSink.forRowFormat(new Path(outputPath)时的路径
  • dateTimePath中的日期格式和时区都可在初始化DateTimeBucketAssigner时配置
public class DateTimeBucketAssigner<IN> implements BucketAssigner<IN, String> {
private static final long serialVersionUID = 1L;
  // 默认的时间格式字符串
  private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
  // 时间格式字符串
  private final String formatString;
  // 时区
  private final ZoneId zoneId;
  // DateTimeFormatter被用来通过当前系统时间和DateTimeFormat来生成时间字符串
  private transient DateTimeFormatter dateTimeFormatter;
  /**
   * 使用默认的`yyyy-MM-dd--HH`和系统时区构建DateTimeBucketAssigner
   */
  public DateTimeBucketAssigner() {
    this(DEFAULT_FORMAT_STRING);
  }
  /**
   * 通过能被SimpleDateFormat解析的时间字符串和系统时区
   * 来构建DateTimeBucketAssigner
   */
  public DateTimeBucketAssigner(String formatString) {
    this(formatString, ZoneId.systemDefault());
  }
  /**
   * 通过默认的`yyyy-MM-dd--HH`和指定的时区
   * 来构建DateTimeBucketAssigner
   */
  public DateTimeBucketAssigner(ZoneId zoneId) {
    this(DEFAULT_FORMAT_STRING, zoneId);
  }
  /**
   * 通过能被SimpleDateFormat解析的时间字符串和指定的时区
   * 来构建DateTimeBucketAssigner
   */
  public DateTimeBucketAssigner(String formatString, ZoneId zoneId) {
    this.formatString = Preconditions.checkNotNull(formatString);
    this.zoneId = Preconditions.checkNotNull(zoneId);
  }
  /**
   * 使用指定的时间格式和时区来格式化当前ProcessingTime,以获取BucketId
   */
  @Override
  public String getBucketId(IN element, BucketAssigner.Context context) {
    if (dateTimeFormatter == null) {
      dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
    }
    return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
  }
  @Override
  public SimpleVersionedSerializer<String> getSerializer() {
    return SimpleVersionedStringSerializer.INSTANCE;
  }
  @Override
  public String toString() {
    return "DateTimeBucketAssigner{" +
      "formatString='" + formatString + '\'' +
      ", zoneId=" + zoneId +
      '}';
  }
}

4.3.3 BasePathBucketAssigner

将所有PartFile存储在BasePath中(此时只有单个全局Bucket)。

先看看BasePathBucketAssigner的源码,方便继续学习DateTimeBucketAssigner:

@PublicEvolving
public class BasePathBucketAssigner<T> implements BucketAssigner<T, String> {
  private static final long serialVersionUID = -6033643155550226022L;
  /**
   * BucketId永远为"",即Bucket全路径为用户指定的BasePath
   */
  @Override
  public String getBucketId(T element, BucketAssigner.Context context) {
    return "";
  }
  /**
   * 用SimpleVersionedStringSerializer来序列化BucketId
   */
  @Override
  public SimpleVersionedSerializer<String> getSerializer() {
    // in the future this could be optimized as it is the empty string.
    return SimpleVersionedStringSerializer.INSTANCE;
  }
  @Override
  public String toString() {
    return "BasePathBucketAssigner";
  }
}

4.4 滚动策略

滚动策略 RollingPolicy 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。

Flink 有两个内置的滚动策略:

  • DefaultRollingPolicy
  • OnCheckpointRollingPolicy

需要注意的是,使用Bulk Encoding时,文件滚动就只能使用OnCheckpointRollingPolicy的策略,该策略在每次checkpoint时滚动part-file。

05 文末

本文主要讲解Flink的高级特性其中之一的Streaming File Sink,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
SQL 大数据 API
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
53 0
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
131 0
|
3天前
|
SQL 分布式计算 数据处理
Structured Streaming和Flink实时计算框架的对比
本文对比了Structured Streaming和Flink两大流处理框架。Structured Streaming基于Spark SQL,具有良好的可扩展性和容错性,支持多种数据源和输出格式。Flink则以低延迟、高吞吐和一致性著称,适合毫秒级的流处理任务。文章详细分析了两者在编程模型、窗口操作、写入模式、时间语义、API和库、状态管理和生态系统等方面的优劣势。
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
129 0
|
3月前
|
SQL Java Apache
实时计算 Flink版操作报错合集之本地启动时,如何处理报错:The file STDOUT does not exist on the TaskExecutor
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
SQL 关系型数据库 测试技术
实时数仓 Hologres操作报错合集之执行Flink的sink操作时出现报错,是什么原因
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
3月前
|
存储 SQL Java
实时数仓 Hologres产品使用合集之如何使用Flink的sink连接
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
15天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
679 10
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。