Flink 必知必会经典课程6:PyFlink 快速上手

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文介绍了PyFlink项目的目标和发展历程,以及PyFlink目前的核心功能,包括Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依赖管理和Python UDF执行优化,同时也针对功能展示了相关demo。

作者|付典

本文介绍了PyFlink项目的目标和发展历程,以及PyFlink目前的核心功能,包括Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依赖管理和Python UDF执行优化,同时也针对功能展示了相关demo。本文主要分为4个部分:

  1. PyFlink介绍
  2. PyFlink相关功能
  3. PyFlink功能演示
  4. PyFlink下一步规划

PyFlink介绍

PyFlink是Flink的一个子模块,也是整个Flink项目的一部分,主要目的是提供Flink的Python语言支持。因为在机器学习和数据分析等领域,Python语言非常重要,甚至是最主要的开发语言。所以,为了满足更多用户需求,拓宽Flink的生态,我们启动了PyFlink项目。

image.png

PyFlink项目的目标主要有两点,第一点是将Flink的计算能力输出给Python用户,也就是我们会在Flink中提供一系列的Python API,方便对Python语言比较熟悉的用户开发Flink作业。

第二点,就是将Python生态基于Flink进行分布式化。虽然我们会在Flink中提供一系列的Python API来给Python用户来使用,但这对用户来说是有学习成本的,因为用户要学习怎么使用Flink的Python API,了解每一个API的用途。所以我们希望用户能在API层使用他们比较熟悉的 Python库的API,但是底层的计算引擎使用Flink,从而降低他们的学习成本。这是我们未来要做的事情,目前处于启动阶段。

image.png

下图是PyFlink项目的发展情况,目前发布了3个版本,支持的内容也越来越丰富。

image.png

PyFlink相关功能介绍

我们主要介绍PyFlink以下功能,Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依赖管理和Python UDF执行优化。

image.png

Python Table API

Python Table API的目的是为了让用户可以使用Python语言来开发Flink作业。Flink里面有三种类型的API,Process、Function和Table API,前两者是较为底层的API,基于Process和Function开发的作业,其逻辑会严格按照用户定义的行为进行执行,而Table API是较为高层的API,基于Table API开发的作业,其逻辑会经过一系列的优化之后进行执行。

Python Table API,顾名思义就是提供 Table API的Python语言支持。

image.png

以下是Python Table API开发的一个Flink作业,作业逻辑是读取文件,计算word count,然后再把计算结果写到文件中去。这个例子虽然简单,但包括了开发一个Python Table API作业的所有基本流程。

首先我们需要定义作业的执行模式,比如说是批模式还是流模式,作业的并发度是多少?作业的配置是什么。接下来我们需要定义source表和sink表,source表定义了作业的数据源来源于哪里,数据的格式是什么;sink表定义了作业的执行结果写到哪里去,数据格式是什么。最后我们需要定义作业的执行逻辑,在这个例子中是计算写过来的count。

image.png

以下是Python Table API的部分截图,可以看到它的数量和功能都比较齐全。

image.png

Python UDF

Python Table API是一种关系型的API,其功能可以类比成SQL,而SQL里自定义函数是非常重要的功能,可以极大地扩展SQL的使用范围。Python UDF的主要目的就是允许用户使用Python语言来开发自定义函数,从而扩展Python Table API的使用场景。同时,Python UDF除了可以用在Python Table API作业中之外,还可以用在Java Table API作业以及SQL作业中。

image.png

在PyFlink中我们支持多种方式来定义Python UDF。用户可以定义一个Python类,继承ScalarFunction,也可以定义一个普通的Python函数或者Lambda函数,实现自定义函数的逻辑。除此之外,我们还支持通过Callable Function和Partial Function定义Python UDF。用户可以根据自己的需要选择最适合自己的方式。

image.png

PyFlink里面提供了多种Python UDF的使用方式,包括Python Table API、Java table API和SQL,我们一一介绍。

在Python Table API中使用Python UDF,在定义完Python UDF之后,用户首先需要注册Python UDF,可以调用table environment register来注册,然后命名,然后就可以在作业中通过这个名字来使用 Python UDF了。

image.png

在Java Table API中它的使用方式也比较相似,但是注册方式不一样,Java Table API作业中需要通过DDL语句来进行注册。

image.png

除此之外,用户也可以在SQL的作业中使用Python UDF。与前面两种方式类似,用户首先需要注册Python UDF,可以在SQL脚本中通过DDL语句来注册,也可以在SQL Client的环境配置文件里面注册。

image.png

Python UDF架构

简单介绍下Python UDF的执行架构。Flink是用Java语言编写的,运行在Java虚拟机中,而Python UDF运行在 Python虚拟机中,所以Java进程和Python进程需要进行数据通信。 除此之外,两者间还需要传输state、log、metrics,它们的传输协议需要支持4种类型。

image.png

向量化Python UDF

向量化Python UDF的主要目的是使 Python用户可以利用Pandas或者Numpy等数据分析领域常用的Python库,开发高性能的Python UDF。

image.png

向量化Python UDF是相对于普通Python UDF而言的,我们可以在下图看到两者的区别。

image.png

下图显示了向量化Python UDF的执行过程。首先在Java端,Java在攒完多条数据之后会转换成Arrow格式,然后发送给Python进程。Python进程在收到数据之后,将其转换成Pandas的数据结构,然后调用用户自定义的向量化Python UDF。同时向量化Python UDF的执行结果会再转化成Arrow格式的数据,再发送给 Java进程。

image.png

在使用方式上,向量化Python UDF与普通Python UDF是类似的,只有以下几个地方稍有不同。首先向量化Python UDF的声明方式需要加一个UDF type,声明这是一个向量化Python UDF,同时UDF的输入输出类型是Pandas Series。

image.png

Python UDF Metrics

前面我们提到 Python UDF有多种定义方式,但是如果需要在Python UDF中使用Metrics,那么Python UDF必须继承ScalarFunction来进行定义。在Python UDF的 open方法里面提供了一个Function Context参数,用户可以通过Function Context参数来注册Metrics,然后就可以通过注册的 Metrics对象来汇报了。

image.png

PyFlink依赖管理

从类型来说,PyFlink依赖主要包括以下几种类型,普通的PyFlink文件、存档文件,第三方的库、PyFlink解释器,或者Java的Jar包等等。从解决方案来看,针对每种类型的依赖,PyFlink提供了两种解决方案,一种是API的解决方案,一种是命令行选项的方式,大家选择其一即可。

image.png

Python UDF执行优化

Python UDF的执行优化主要包括两个方面,执行计划优化和运行时优化。它与SQL非常像,一个包含Python UDF的作业,首先会经过预先定义的规则,生成一个最优的执行计划。在执行计划已经确定的情况下,在实际执行的时候,又可以运用一些其他的优化手段来达到尽可能高的执行效率。

image.png

Python UDF执行计划优化

执行计划的优化主要有以下几个优化思路。一个是不同类型的 UDF的拆分,由于在一个节点中可能同时包含多种类型的UDF,而不同的类型的UDF是不能放在一块执行的;第二个方面是Filter下推,其主要目的是尽可能降低含有Python UDF节点的输入数据量,从而提升整个作业的执行性能;第三个优化思路是Python UDF Chaining,Java进程与Python进程之间的通信开销以及序列化反序列化开销比较大,而Python UDF Chaining可以尽量减少Java进程和Python进程之间的通信开销。

image.png

不同类型UDF的拆分

假如有这样一个作业,它包含了两个UDF,其中add是Python UDF, subtract是向量化Python UDF。默认情况下,这个作业的执行计划会有一个project节点,这两个 UDF同时位于这一project的节点里面。这个执行计划的主要问题是,普通Python UDF每次处理一条数据,而向量化Python UDF,每次处理多条数据,所以这样的一个执行计划是没有办法执行的。

image.png

但是通过拆分,我们可以把这一个project的节点拆分成了两个project的节点,其中第一个project的节点只包含普通Python UDF,而第二个节点只包含向量化Python UDF。不同类型的Python UDF拆分到不同的节点之后,每一个节点都只包含了一种类型的UDF,所以算子就可以根据它所包含的UDF的类型选择最合适的执行方式。

image.png

Filter下推到Python UDF之前

Filter下推的主要目的是将过滤算子下推到Python UDF节点之前,尽量减少Python UDF节点的数据量。

假如我们有这样一个作业,作业原始执行计划里面包括了两个Project的节点,一个是add、 subtract,同时还包括一个Filter节点。这个执行计划是可以运行的,但需要更优化。可以看到,因为Python的节点位于Filter节点之前,所以在Filter节点之前Python UDF已经计算完了,但是如果把Filter过滤下,推到Python UDF之前,那么就可以大大降低Python UDF节点的输入数据量。

image.png

Python UDF Chaining

假如我们有这样一个作业,里面包含两种类型的UDF,一个是add,一个是subtract,它们都是普通的Python UDF。在一个执行计划里面包含两个project的节点,其中第一个project的节点先算subtract,然后再传输给第二个project节点进行执行。

它的主要问题是,由于subtract和add位于两个不同的节点,其计算结果需要从Python发送回Java,然后再由Java进程发送给第二个节点的Python进行执行。相当于数据在Java进程和Python进程之间转了一圈,所以它带来了完全没有必要的通信开销和序列化反序列化开销。因此,我们可以将执行计划优化成右图,就是将add节点和subtract节点放在一个节点中运行,subtract节点的结果计算出来之后直接去调用add节点。

image.png

Python UDF运行时优化

目前提高Python UDF运营时的执行效率有三种:一是Cython优化,用它来提高Python代码的执行效率;二是自定义Java进程和Python进程之间的序列化器和反序列化器,提高序列化和反序列化效率;三是提供向量化Python UDF功能。

image.png

PyFlink相关功能演示

首先大家打开这个页面,里面提供了PyFlink的一些demo,这些demo是运行在docker里面的,所以大家如果要运行这些demo就需要在本机安装docker环境。

image.png

image.png

随后,我们可以运行命令,命令会启动一个PyFlink的集群,后面我们运行的PyFlink的例子都会提交到集群去执行。

image.png

第一个例子是word count,我们首先在里面定义了环境、source、sink等,我们可以运行一下这个作业。

image.png

这是作业的执行结果,可以看到Flink这个单词出现了两次,PyFlink这个单词出现了一次。

image.png

接下来再运行一个Python UDF的例子。这个例子和前面有一些类似,首先我们定义它使用PyFlink,运行在批这种模式下,同时作业的并发度是1。不一样的地方是我们在作业里定义了一个UDF,它的输入包括两个列,都是Bigint类型,而且它输出类型也是对应的。这个UDF的逻辑是把这两个列的相加作为一个结果输出。

image.png

我们执行一下作业,执行结果是3。

image.png

接下来我们再运行一个带有依赖的Python UDF。前面作业的UDF是不包含任何依赖的,直接就把两个输入列相加起来。而在这个例子里,UDF引用了一个第三方的依赖,我们可以通过API set python requirement来执行。

image.png

接下来我们运行作业,它的执行结果和前面是一样的,因为这两个作业的逻辑是类似的。

image.png

接下来我们再看一个向量化Python UDF的例子。在 UDF定义的时候,我们加了一个UDF的type字段,说明说我们是一个向量化的Python UDF,其他的逻辑和普通Python UDF的逻辑类似。最后它的执行结果也是3,因为它的逻辑和前面是一样的,计算两页的之和。

image.png

我们再来看一个例子,在Java的Table作业里面使用Python。在这个作业里面我们又会用到一个Python UDF,它通过DDL语句进行注册,然后在execute SQL语句里面进行使用。

image.png

接下来我们再看在纯SQL作业中使用Python UDF的例子。在资源文件里面我们声明了一个UDF,名字叫add1,它的类型是Python,同时我们也能看到它的UDF位置。

image.png

接下来我们运行它,执行结果是234。

image.png

PyFlink下一步规划

目前PyFlink只支持了Python Table API,我们计划在下一个版本中支持DataStream API,同时也会支持Python UDAF以及Pandas UDAF,另外,在执行层也会持续优化PyFlink的执行效率。

image.png

这是一些资源的链接,包括PyFlink的文档地址。

  • Python Table API文档

https://ci.apache.org/projects/flink/flink-docs-master/api/python/

  • PyFlink文档

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/

  • PyFlink playground

https://github.com/pyflink/playgrounds/tree/1.11

好的,我们今天的分享就到这里了,欢迎大家继续关注我们的课程。

活动推荐:

仅需99元即可体验阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版!点击下方链接了解活动详情:https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506

社区二维码.jpg

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
存储 消息中间件 SQL
Flink 必知必会经典课程8:Flink Connector 详解
关于Flink Connector的详解,本文将通过四部分展开介绍:1. 连接器;2. Source API;3. Sink API;4. Collector的未来发展。
Flink 必知必会经典课程8:Flink Connector 详解
|
SQL 机器学习/深度学习 Java
用Python进行实时计算——PyFlink快速入门
Flink 1.9.0及更高版本支持Python,也就是PyFlink。 在最新版本的Flink 1.10中,PyFlink支持Python用户定义的函数,使您能够在Table API和SQL中注册和使用这些函数。但是,听完所有这些后,您可能仍然想知道PyFlink的架构到底是什么?作为PyFlink的快速指南,本文将回答这些问题。
2472 0
用Python进行实时计算——PyFlink快速入门
|
1月前
|
SQL 数据处理 开发工具
实时计算 Flink版产品使用合集之PyFlink的优势是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 消息中间件 JSON
实时计算 Flink版产品使用合集之FlinkSQL和PyFlink是否支持整库同步功能
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 Java Kafka
阿里云实时计算Flink部署运行pyflink脚本
最近有个需求需要使用py模型输出数据,本地已完成测试,需要完成在实时计算Flink上完成部署及运行
2291 2
阿里云实时计算Flink部署运行pyflink脚本
|
SQL 搜索推荐 流计算
王炸组合-实时计算 Flink 版 + Hologres,《实时数仓入门训练营》课程配套电子书来啦!
王炸组合-实时计算 Flink 版 + Hologres,《实时数仓入门训练营》课程配套电子书来啦!
3199 0
王炸组合-实时计算 Flink 版 + Hologres,《实时数仓入门训练营》课程配套电子书来啦!
|
SQL 消息中间件 Kafka
Flink 必知必会经典课程2:Stream Processing with Apache Flink
本篇内容包含三部分展开介绍Stream Processing with Apache Flink:1、并行处理和编程范式;2、DataStream API概览及简单应用;3、 Flink 中的状态和时间。
Flink 必知必会经典课程2:Stream Processing with Apache Flink
|
SQL 大数据 Apache
Apache Flink 2021 最新入门课程 | 图谱精选课程
轻松收获 Flink 生产环境开发技能
Apache Flink 2021 最新入门课程 | 图谱精选课程
|
存储 机器学习/深度学习 算法
Flink 必知必会经典课程4:Fault-tolerance in Flink
本文由 Apache Flink PMC , 阿里巴巴高级技术专家李钰分享,主要从有状态的流计算、全局一致性快照 、Flink的容错机制、Flink的状态管理 四个方面介绍 Flink 的容错机制原理。
Flink 必知必会经典课程4:Fault-tolerance in Flink
|
7天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
578 0

热门文章

最新文章

相关产品

  • 实时计算 Flink版