Flink 1.11:更好用的流批一体 SQL 引擎

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在保证优秀性能的同时,易用性是 1.11 版本 Flink SQL 的重头戏。易用性的提升主要体现在以下几个方面:更方便的追加或修改表定义、灵活的声明动态的查询参数、加强和统一了原有 TableEnv 上的 SQL 接口、简化了 connector 的属性定义、对 Hive 的 DDL 做了原生支持、加强了对 python UDF 的支持。

许多的数据科学家,分析师和 BI 用户依赖交互式 SQL 查询分析数据。Flink SQL 是 Flink 的核心模块之一。作为一个分布式的 SQL 查询引擎。Flink SQL 提供了各种异构数据源的联合查询。开发者可以很方便地在一个程序中通过 SQL 编写复杂的分析查询。通过 CBO 优化器、列式存储、和代码生成技术,Flink SQL 拥有非常高的查询效率。同时借助于 Flink runtime 良好的容错和扩展性,Flink SQL 可以轻松处理海量数据。

在保证优秀性能的同时,易用性是 1.11 版本 Flink SQL 的重头戏。易用性的提升主要体现在以下几个方面:

  • 更方便的追加或修改表定义
  • 灵活的声明动态的查询参数
  • 加强和统一了原有 TableEnv 上的 SQL 接口
  • 简化了 connector 的属性定义
  • 对 Hive 的 DDL 做了原生支持
  • 加强了对 python UDF 的支持

下面逐一为大家介绍 ~

Create Table Like

在生产中,用户常常有调整现有表定义的需求。例如用户想在一些外部的表定义(例如 Hive metastore)基础上追加 Flink 特有的一些定义比如 watermark。在 ETL 场景中,将多张表的数据合并到一张表,目标表的 schema 定义其实是上游表的合集,需要一种方便合并表定义的方式。

从 1.11 版本开始,Flink 提供了 LIKE 语法,用户可以很方便的在已有的表定义上追加新的定义。

例如我们可以使用下面的语法给已有表 base_table 追加 watermark 定义:

CREATE [TEMPORARY] TABLE base_table (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
    PRIMARY KEY(id)
) WITH (
    'connector': 'kafka'
)
 
CREATE [TEMPORARY] TABLE derived_table (
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
LIKE base_table;

这里 derived_table 表定义等价于如下定义:

CREATE [TEMPORARY] TABLE derived_table (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
    PRIMARY KEY(id),
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
) WITH (
    ‘connector’: ‘kafka’
)

对比之下,新的语法省去了重复的 schema 定义,用户只需要定义追加属性,非常方便简洁。

多属性策略

有的小伙伴会问,原表和新表的属性只是新增或追加吗?如果我想覆盖或者排除某些属性该如何操作?这是一个好问题,Flink LIKE 语法提供了非常灵活的表属性操作策略。

LIKE 语法支持使用不同的 keyword 对表属性分类:

  • ALL:完整的表定义
  • CONSTRAINTS: primary keys, unique key 等约束
  • GENERATED: 主要指计算列和 watermark
  • OPTIONS: WITH (...) 语句内定义的 table options
  • PARTITIONS: 表分区信息

在不同的属性分类上可以追加不同的属性行为:

  • INCLUDING:包含(默认行为)
  • EXCLUDING:排除
  • OVERWRITING:覆盖

下面这张表格说明了不同的分类属性允许的行为:

image.png

例如下面的语句:

CREATE [TEMPORARY] TABLE base_table (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
    PRIMARY KEY(id)
) WITH (
    'connector': 'kafka',
    'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',
    'format': 'json'
)
 
CREATE [TEMPORARY] TABLE derived_table (
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
WITH (
    'connector.starting-offset': '0'
)
LIKE base_table (OVERWRITING OPTIONS, EXCLUDING CONSTRAINTS);

等价的表属性定义为:

CREATE [TEMPORARY] TABLE derived_table (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
) WITH (
    'connector': 'kafka',
    'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',
    'format': 'json'
)

细节参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table

Dynamic Table Options

在生产中,调整参数是一个常见需求,很多的时候是临时修改(比如通过终端查询和展示),比如下面这张 Kafka 表:

create table kafka_table (
  id bigint,
  age int,
  name STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'employees',
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '123456',
  'format' = 'csv',
  'csv.ignore-parse-errors' = 'false'
)

在之前的版本,如果用户有如下需求:

  • 用户需要指定特性的消费时间戳,即修改 scan.startup.timestamp-millis 属性
  • 用户想忽略掉解析错误,需要将 format.ignore-parse-errors 改为 true

只能使用 ALTER TABLE 这样的语句修改表的定义,从 1.11 开始,用户可以通过动态参数的形式灵活地设置表的属性参数,覆盖或者追加原表的 WITH (...) 语句内定义的 table options。

基本语法为:

table_name /*+ OPTIONS('k1'='v1', 'aa.bb.cc'='v2') */

OPTIONS 内的键值对会覆盖原表的 table options,用户可以在各种 SQL 语境中使用这样的语法,例如:

CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);

-- override table options in query source
select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;

-- override table options in join
select * from
    kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1
    join
    kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2
    on t1.id = t2.id;

-- override table options for INSERT target table
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;

动态参数的使用没有语境限制,只要是引用表的地方都可以追加定义。在指定的表后面追加的动态参数会自动追加到原表定义中,是不是很方便呢 :)

由于可能对查询结果有影响,动态参数功能默认是关闭的, 使用下面的方式开启该功能:

// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.dynamic-table-options.enabled", "true");

细节参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/hints.html

SQL API 改进

随着 Flink SQL 支持的语句越来越丰富,老的 API 容易引起一些困惑:

  • 原先的 sqlUpdate() 方法传递 DDL 语句会立即执行,而 INSERT INTO 语句在调用 execute 方法时才会执行
  • Table 程序的执行入口不够清晰,像 TableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 都可以触发 table 程序执行
  • execute 方法没有返回值。像 SHOW TABLES 这样的语句没有很好地方式返回结果。另外,sqlUpdate 方法加入了越来越多的语句导致接口定义不清晰,sqlUpdate 可以执行 SHOW TABLES 就是一个反例
  • 在 Blink planner 一直提供多 sink 优化执行的能力,但是在 API 层没有体现出来

1.11 重新梳理了 TableEnv 上的 sql 相关接口,提供了更清晰的执行语义,同时执行任意 sql 语句现在都有返回值,用户可以通过新的 API 灵活的组织多行 sql 语句一起执行。

更清晰的执行语义

新的接口 TableEnvironment#executeSql 统一返回抽象 TableResult,用户可以迭代 TableResult 拿到执行结果。根据执行语句的不同,返回结果的数据结构也有变化,比如 SELECT 语句会返回查询结果,而 INSERT 语句会异步提交作业到集群。

组织多条语句一起执行

新的接口 TableEnvironment#createStatementSet 允许用户添加多条 INSERT 语句并一起执行,在多 sink 场景,Blink planner 会针对性地对执行计划做优化。

新旧 API 对比

一张表格感受新老 API 的变化:

image.png

详情参见:https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

Hive 语法兼容加强

从 1.11 开始,Flink SQL 将 Hive parser 模块独立出来,用以兼容 Hive 的语法,目前 DDL 层面,DB、Table、View、Function 相关的语法均已支持。搭配 HiveCatalog,Hive 的同学可以直接使用 Hive 的语法来进行相关的操作。

在使用 hive 语句之前需要设置正确的 Dialect:

EnvironmentSettings settings = EnvironmentSettings.newInstance()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// to use hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// use the hive catalog
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());

之后我们便可以使用 Hive 的语法来执行一些 DDL,例如最常见的建表操作:

create external table tbl1 (
  d decimal(10,0),
  ts timestamp)
partitioned by (p string)
location '%s'
tblproperties('k1'='v1');
  
create table tbl2 (s struct<ts:timestamp,bin:binary>) stored as orc;

create table tbl3 (
  m map<timestamp,binary>
)
partitioned by (p1 bigint, p2 tinyint)
row format serde 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe';

create table tbl4 (
  x int,
  y smallint)
row format delimited fields terminated by '|' lines terminated by '\n';

对于 DQL 的 Hive 语法兼容已经在规划中,1.12 版本会兼容更多 query 语法 ~

详情参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html

更简洁的 connector 属性

1.11 重新规范了 connector 的属性定义,新的属性 key 更加直观简洁,和原有的属性 key 相比主要做了如下改动:

  • 使用 connector 作为 connector 的类型 key,connector 版本信息直接放到 value 中,比如 0.11 的 kafka 为 kafka-0.11
  • 去掉了其余属性中多余的 connector 前缀
  • 使用 scan 和 sink 前缀标记 source 和 sink 专有属性
  • format.type 精简为 format ,同时 format 自身属性使用 format 的值作为前缀,比如 csv format 的自身属性使用 csv 统一作前缀

例如,1.11 Kafka 表的定义如下:

CREATE TABLE kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'scan.startup.mode' = 'earliest-offset'
)

详情参见:https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

JDBC catalog

在之前的版本中,用户只能通过显示建表的方式创建关系型数据库的镜像表。用户需要手动追踪 Flink SQL 的表 schema 和数据库的 schema 变更。在 1.11,Flink SQL 提供了一个 JDBC catalog 接口对接各种外部的数据库系统,例如 Postgres、MySQL、MariaDB、AWS Aurora、etc。

当前 Flink 内置了 Postgres 的 catalog 实现,使用下面的代码配置 JDBC catalog:

CREATE CATALOG mypg WITH(
    'type' = 'jdbc',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'base-url' = '...'
);

USE CATALOG mypg;

用户也可以实现 JDBCCatalog 接口定制其他数据库的 catalog ~

详情参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog

Python UDF 增强

1.11 版本的 py-flink 在 python UDF 方面提供了很多增强,包括 DDL 的定义方式、支持了标量的向量化 python UDF,支持全套的 python UDF metrics 定义,以及在 SQL-CLI 中定义 python UDF。

DDL 定义 python UDF

1.10.0 版本引入了对 python UDF 的支持。但是仅仅支持 python table api 的方式。1.11 提供了 SQL DDL 的方式定义 python UDF, 用户可以在 Java/Scala table API 以及 SQL-CLI 场景下使用。

例如,现在用户可以使用如下方式定义 Java table API 程序使用 python UDF:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");

tEnv.toDataSet(table, String.class).collect();

向量化支持

向量化 Python UDF 相较于普通函数大大提升了性能。用户可以使用流行的 python 库例如 Pandas、Numpy 来实现向量化的 python UDF。用户只需在装饰器 udf 中添加额外的参数 udf_type="pandas" 即可。

例如,下面的样例展示了如何定义向量化的 Python 标量函数以及在 python table api 中的应用:

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type="pandas")
def add(i, j):
  return i + j

table_env = BatchTableEnvironment.create(env)

# register the vectorized Python scalar function
table_env.register_function("add", add)

# use the vectorized Python scalar function in Python Table API
my_table.select("add(bigint, bigint)")

# use the vectorized Python scalar function in SQL API
table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")

详情参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/vectorized_python_udfs.html

另外,1.11 对 python UDF 的 metrics 做了全面支持,现在用户可以在 UDF 中方便地定义各种类型的 metrics,由于篇幅关系,这里不作详细描述,见 python UDF metrics。

详情参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/metrics.html

展望后续

在后续版本,易用性仍然是 Flink SQL 的核心主题,比如 schema 的易用性增强,Descriptor API 简化以及更丰富的流 DDL 将会是努力的方向,让我们拭目以待 ~

了解更多 Flink 1.11 重大变更与新增功能特性可点击「阅读原文」~

相关文章
|
27天前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
91 15
|
29天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
56 2
|
29天前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
31 1
|
2月前
|
Java Spring 安全
Spring 框架邂逅 OAuth2:解锁现代应用安全认证的秘密武器,你准备好迎接变革了吗?
【8月更文挑战第31天】现代化应用的安全性至关重要,OAuth2 作为实现认证和授权的标准协议之一,被广泛采用。Spring 框架通过 Spring Security 提供了强大的 OAuth2 支持,简化了集成过程。本文将通过问答形式详细介绍如何在 Spring 应用中集成 OAuth2,包括 OAuth2 的基本概念、集成步骤及资源服务器保护方法。首先,需要在项目中添加 `spring-security-oauth2-client` 和 `spring-security-oauth2-resource-server` 依赖。
42 0
|
2月前
|
消息中间件 数据挖掘 Kafka
揭秘大数据时代的极速王者!Flink:颠覆性流处理引擎,让实时数据分析燃爆你的想象力!
【8月更文挑战第29天】Apache Flink 是一个高性能的分布式流处理框架,适用于高吞吐量和低延迟的实时数据处理。它采用统一执行引擎处理有界和无界数据流,具备精确状态管理和灵活窗口操作等特性。Flink 支持毫秒级处理和广泛生态集成,但学习曲线较陡峭,社区相对较小。通过实时日志分析示例,我们展示了如何利用 Flink 从 Kafka 中读取数据并进行词频统计,体现了其强大功能和灵活性。
33 0
|
11天前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
3月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
731 7
阿里云实时计算Flink在多行业的应用和实践
|
2月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
13天前
|
存储 运维 监控
阿里云实时计算Flink版的评测
阿里云实时计算Flink版的评测
44 15

相关产品

  • 实时计算 Flink版
  • 下一篇
    无影云桌面