如何在 Apache Flink 1.10 中使用 Python UDF?

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在刚刚发布的 ApacheFlink 1.10 中,PyFlink 添加了对 Python UDFs 的支持。这意味着您可以从现在开始用 Python 编写 UDF 并扩展系统的功能。此外,本版本还支持 Python UDF 环境和依赖管理,因此您可以在 UDF 中使用第三方库,从而利用 Python 生态丰富的第三方库资源。

作者:孙金城(金竹)

在 Apache Flink 1.9 版中,我们引入了 PyFlink 模块,支持了 Python Table API。Python 用户可以完成数据转换和数据分析的作业。但是,您可能会发现在 PyFlink 1.9 中还不支持定义 Python UDFs,对于想要扩展系统内置功能的 Python 用户来说,这可能有诸多不便。

在刚刚发布的 ApacheFlink 1.10 中,PyFlink 添加了对 Python UDFs 的支持。这意味着您可以从现在开始用 Python 编写 UDF 并扩展系统的功能。此外,本版本还支持 Python UDF 环境和依赖管理,因此您可以在 UDF 中使用第三方库,从而利用 Python 生态丰富的第三方库资源。

PyFlink 支持 Python UDFs 的架构

在深入了解如何定义和使用 Python UDFs 之前,我们将解释 UDFs 在 PyFlink 中工作的架构和背景,并提供一些有关我们底层实现的细节介绍。

Beam on Flink

Apache Beam 是一个统一编程模型框架,实现了可使用任何语言开发可以运行在任何执行引擎上的批处理和流处理作业,这得益于 Beam 的 Portability Framework,如下图所示:

1.jpg

Portability Framework

上图是 Beam 的 Portability Framework 的体系结构。它描述了 Beam 如何支持多种语言和多种引擎的方式。关于 Flink Runner 部分,我们可以说是 Beam on Flink。那么,这与 PyFlink 支持 Python UDF 有什么关系呢?这将接下来“Flink on Beam”中介绍。

Flink on Beam

Apache Flink 是一个开源项目,因此,它的社区也更多地使用开源。例如,PyFlink 中对 Python UDF 的支持选择了基于 Apache Beam 这辆豪华跑车之上进行构建。:)

2.jpg

Flink on Beam

PyFlink 对 Python UDFs 的支持上,Python 的运行环境管理以及 Python 运行环境 Python VM 和 Java 运行环境 JVM 的通讯至关重要。幸运的是,Apache Beam 的 Portability Framework 完美解决了这个问题。所以才有了如下 PyFlink on Beam Portability Framework 的架构如下:

3.jpg

PyFlink on Beam Portability Framework

Beam Portability Framework 是一个成熟的多语言支持框架,框架高度抽象了语言之间的通信协议(gRPC),定义了数据的传输格式(Protobuf),并且根据通用流计算框架所需要的组件,抽象个各种服务,比如, DataService,StateService,MetricsService 等。

在这样一个成熟的框架下,PyFlink 可以快速的构建自己的 Python 算子,同时重用 Apache Beam Portability Framework 中现有 SDK harness 组件,可以支持多种 Python 运行模式,如:Process,Docker,etc.,这使得 PyFlink 对 Python UDF 的支持变得非常容易,在 Apache Flink 1.10 中的功能也非常的稳定和完整。那么为啥说是 Apache Flink 和 Apache Beam 共同打造呢,是因为我发现目前 Apache Beam Portability Framework 的框架也存在很多优化的空间,所以我在 Beam 社区进行了优化讨论,并且在 Beam 社区也贡献了 30+ 的优化补丁。

JVM 和 Python VM 的通讯

由于 Python UDF 无法直接在 JVM 中运行,因此需要由 Apache Flink 算子在初始化时启动的 Python 进程来准备 Python 执行环境。Python ENV 服务负责启动,管理和终止 Python 进程。如下图 4 所示,Apache Flink 算子和 Python 执行环境之间的通信和涉及多个组件:

4.jpg

Communication between JVM and Python VM

  • 环境管理服务: 负责启动和终止 Python 执行环境。
  • 数据服务: 负责在 Apache Flink 算子和 Python 执行环境之间传输输入数据和接收用户 UDF 的执行结果。
  • 日志服务: 是记录对用户 UDF 日志输出支持的机制。它可以将用户 UDF 产生的日志传输到 Apache Flink 算子,并与 Apache Flink 的日志系统集成。

说明: 其中 metrics 服务计划在 Apache Flink 1.11 进行支持。

下图描述了从 Java 算子到 Python 进程之间初始化和执行 UDF 的概要流程。

4.jpg

High-level flow between Python VM and JVM

整体流程可以概括为如下两部分:

  • 初始化 Python 执行环境。

    • Python UDF Runner 启动所需的 gRPC 服务,如数据服务、日志服务等。
    • Python UDF Runner 另起进程并启动 Python 执行环境。
    • Python worker 向 PythonUserDefinedFunctionRunner 进行注册。
    • Python UDF Runner 向 Python worker 发送需要在 Python 进程中执行的用户定义函数。
    • Python worker 将用户定义的函数转换为 Beam 执行算子(注意:目前,PyFlink 利用 Beam 的可移植性框架[1]来执行 Python UDF)。
    • Python worker 和 Flink Operator 之间建立 gRPC 连接,如数据连接、日志连接等。
  • 处理输入元素。

    • Python UDF Runner 通过 gRPC 数据服务将输入元素发送给 Python worker 执行。
    • Python 用户定义函数还可以在执行期间通过 gRPC 日志服务和 metrics 服务将日志和 metrics 收集到 Python UDF Runner。
    • 执行结果可以通过 gRPC 数据服务发送到 Python UDF Runner。

如何在 Apache Flink 1.10 的 PyFlink 中使用 UDFs

本节将介绍用户如何定义 UDF,并完整展示了如何安装 PyFlink,如何在 PyFlink 中定义/注册/调用 UDF,以及如何执行作业。

安装 PyFlink

我们需要先安装 PyFlink,可以通过 PyPI 获得,并且可以使用 pip install 进行便捷安装。

注意: 安装和运行 PyFlink 需要 Python 3.5 或更高版本。

$ python -m pip install apache-Apache Flink

定义一个 UDF

除了扩展基类 ScalarFunction 之外,定义 Python UDF 的方法有很多。下面的示例显示了定义 Python UDF 的不同方法,该函数以 BIGINT 类型的两列作为输入参数,并返回它们的和作为结果。

  • Option 1: extending the base class ScalarFunction
class Add(ScalarFunction):
  def eval(self, i, j):
    return i + j

add = udf(Add(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
  • Option 2: Python function
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
  return i + j
  • Option 3: lambda function
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
  • Option 4: callable function
class CallableAdd(object):
  def __call__(self, i, j):
    return i + j

add = udf(CallableAdd(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
  • Option 5: partial function
return i + j + k

add = udf(functools.partial(partial_add, k=1), [DataTypes.BIGINT(), DataTypes.BIGINT()],
          DataTypes.BIGINT())

注册一个UDF

  • register the Python function
table_env.register_function("add", add)
  • Invoke a Python UDF
my_table.select(```js
"add(a, b)")
  • Example Code

下面是一个使用 Python UDF 的完整示例。

from PyFlink.table import StreamTableEnvironment, DataTypes
from PyFlink.table.descriptors import Schema, OldCsv, FileSystem
from PyFlink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

t_env.register_function("add", udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT()))

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field('sum', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('sum', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

t_env.from_path('mySource')\
    .select("add(a, b)") \
    .insert_into('mySink')

t_env.execute("tutorial_job")
  • 提交作业

首先,您需要在“ / tmp / input”文件中准备输入数据。例如,

$ echo "1,2" > /tmp/input

接下来,您可以在命令行上运行此示例:

$ python python_udf_sum.py

通过该命令可在本地小集群中构建并运行 Python Table API 程序。您还可以使用不同的命令行将 Python Table API 程序提交到远程集群。

最后,您可以在命令行上查看执行结果:

$ cat /tmp/output
3

Python UDF 的依赖管理

在许多情况下,您可能希望在 Python UDF 中导入第三方依赖。下面的示例将指导您如何管理依赖项。

假设您想使用 mpmath 来执行上述示例中两数的和。Python UDF 逻辑可能如下:

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
    from mpmath import fadd # add third-party dependency
    return int(fadd(1, 2))

要使其在不包含依赖项的工作节点上运行,可以使用以下 API 指定依赖项:

# echo mpmath==1.1.0 > requirements.txt
# pip download -d cached_dir -r requirements.txt --no-binary :all:
t_env.set_python_requirements("/path/of/requirements.txt", "/path/of/cached_dir")

用户需要提供一个 requirements.txt 文件,并且在里面申明使用的第三方依赖。如果无法在群集中安装依赖项(网络问题),则可以使用参数“requirements_cached_dir”,指定包含这些依赖项的安装包的目录,如上面的示例所示。依赖项将上传到群集并脱机安装。

下面是一个使用依赖管理的完整示例:

from PyFlink.datastream import StreamExecutionEnvironment
from PyFlink.table import StreamTableEnvironment, DataTypes
from PyFlink.table.descriptors import Schema, OldCsv, FileSystem
from PyFlink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
    from mpmath import fadd
    return int(fadd(1, 2))

t_env.set_python_requirements("/tmp/requirements.txt", "/tmp/cached_dir")
t_env.register_function("add", add)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field('sum', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('sum', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

t_env.from_path('mySource')\
    .select("add(a, b)") \
    .insert_into('mySink')

t_env.execute("tutorial_job")
  • 提交作业

首先,您需要在“/ tmp / input”文件中准备输入数据。例如,

echo "1,2" > /tmp/input
1
2

其次,您可以准备依赖项需求文件和缓存目录:

$ echo "mpmath==1.1.0" > /tmp/requirements.txt
$ pip download -d /tmp/cached_dir -r /tmp/requirements.txt --no-binary :all:

接下来,您可以在命令行上运行此示例:

$ python python_udf_sum.py

最后,您可以在命令行上查看执行结果:

$ cat /tmp/output
3

快速上手

PyFlink 为大家提供了一种非常方便的开发体验方式 - PyFlink Shell。当成功执行 python -m pip install apache-flink 之后,你可以直接以 pyflink-shell.sh local 来启动一个 PyFlink Shell 进行开发体验,如下所示:

图片.gif

更多场景

不仅仅是简单的 ETL 场景支持,PyFlink 可以完成很多复杂场的业务场景需求,比如我们最熟悉的双 11 大屏的场景,如下:

6.jpg

关于上面示例的更多详细请查阅:

https://enjoyment.cool/2019/12/05/Apache-Flink-说道系列-如何在PyFlink-1-10中自定义Python-UDF/

总结和未来规划

在本博客中,我们介绍了 PyFlink 中 Python UDF 的架构,并给出了如何定义、注册、调用和运行 UDF 的示例。随着 1.10 的发布,它将为 Python 用户提供更多的可能来编写 Python 作业逻辑。同时,我们一直积极与社区合作,不断改进 PyFlink 的功能和性能。今后,我们计划在标量和聚合函数中引入对 Pandas 的支持;通过 SQL 客户端增加对 Python UDF 使用的支持,以扩展 Python UDF 的使用范围;并做更多的性能改进。近期,邮件列表上有一个关于新功能支持的讨论,您可以查看并找到更多详细信息。

在社区贡献者的不断努力之下,PyFlink 的功能可以如上图一样可以迅速从幼苗变成大树:

640.gif

PyFlink 需要你的加入

PyFlink 是一个新组件,仍然需要做很多工作。因此,热诚欢迎每个人加入对 PyFlink 的贡献,包括提出问题,提交错误报告,提出新功能,加入讨论,贡献代码或文档。期望在 PyFlink 见到你!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
52 1
|
20天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
308 33
The Past, Present and Future of Apache Flink
|
3月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
874 13
Apache Flink 2.0-preview released
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
104 3
|
4月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
5月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
304 2
|
5月前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
66 3
|
5月前
|
消息中间件 运维 Kafka
Apache Flink 实践问题之达到网卡的最大速度如何解决
Apache Flink 实践问题之达到网卡的最大速度如何解决
60 2
|
5月前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
|
4月前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
440 31
Apache Flink 流批融合技术介绍

相关产品

  • 实时计算 Flink版