PyFlink 开发环境利器:Zeppelin Notebook

简介: 在 Zeppelin notebook 里利用 Conda 来创建 Python env 自动部署到 Yarn 集群中。

PyFlink 作为 Flink 的 Python 语言入口,其 Python 语言的确很简单易学,但是 PyFlink 的开发环境却不容易搭建,稍有不慎,PyFlink 环境就会乱掉,而且很难排查原因。今天给大家介绍一款能够帮你解决这些问题的 PyFlink 开发环境利器:Zeppelin Notebook。主要内容为:

  1. 准备工作
  2. 搭建 PyFlink 环境
  3. 总结与未来

GitHub 地址
https://github.com/apache/flink
欢迎大家给 Flink 点赞送 star~

也许你早就听说过 Zeppelin,但是之前的文章都偏重讲述如何在 Zeppelin 里开发 Flink SQL,今天则来介绍下如何在 Zeppelin 里高效的开发 PyFlink Job,特别是解决 PyFlink 的环境问题。

一句来总结这篇文章的主题,就是在 Zeppelin notebook 里利用 Conda 来创建 Python env 自动部署到 Yarn 集群中,你无需手动在集群上去安装任何 PyFlink 的包,并且你可以在一个 Yarn 集群里同时使用互相隔离的多个版本的 PyFlink。最后你能看到的效果就是这样:

1. 能够在 PyFlink 客户端使用第三方 Python 库,比如 matplotlib:

img

2. 可以在 PyFlink UDF 里使用第三方 Python 库,如:

img

接下来看看如何来实现。

一、准备工作

Step 1.

准备好最新版本的 Zeppelin 的搭建,这个就不在这边展开了,如果有问题可以加入 Flink on Zeppelin 钉钉群 (34517043) 咨询。另外需要注意的是,Zeppelin 部署集群需要是 Linux,如果是 Mac 的话,会导致在 Mac 机器上打的 Conda 环境无法在 Yarn 集群里使用 (因为 Conda 包在不同系统间是不兼容的)。

Step 2.

下载 Flink 1.13, 需要注意的是,本文的功能只能用在 Flink 1.13 以上版本,然后:

  • flink-Python-*.jar 这个 jar 包 copy 到 Flink 的 lib 文件夹下;
  • opt/Python 这个文件夹 copy 到 Flink 的 lib 文件夹下。

Step 3.

安装以下软件 (这些软件是用于创建 Conda env 的):

二、搭建 PyFlink 环境

接下来就可以在 Zeppelin 里搭建并且使用 PyFlink 了。

Step 1. 制作 JobManager 上的 PyFlink Conda 环境

因为 Zeppelin 天生支持 Shell,所以可以在 Zeppelin 里用 Shell 来制作 PyFlink 环境。注意这里的 Python 第三方包是在 PyFlink 客户端 (JobManager) 需要的包,比如 Matplotlib 这些,并且确保至少安装了下面这些包:

  • 某个版本的 Python (这里用的是 3.7)
  • apache-flink (这里用的是 1.13.1)
  • jupyter,grpcio,protobuf (这三个包是 Zeppelin 需要的)

剩下的包可以根据需要来指定:

%sh

# make sure you have conda and momba installed.
# install miniconda: https://docs.conda.io/en/latest/miniconda.html
# install mamba: https://github.com/mamba-org/mamba

echo "name: pyflink_env
channels:
  - conda-forge
  - defaults
dependencies:
  - Python=3.7
  - pip
  - pip:
    - apache-flink==1.13.1
  - jupyter
  - grpcio
  - protobuf
  - matplotlib
  - pandasql
  - pandas
  - scipy
  - seaborn
  - plotnine
 " > pyflink_env.yml
    
mamba env remove -n pyflink_env
mamba env create -f pyflink_env.yml

运行下面的代码打包 PyFlink 的 Conda 环境并且上传到 HDFS (注意这里打包出来的文件格式是 tar.gz):

%sh

rm -rf pyflink_env.tar.gz
conda pack --ignore-missing-files -n pyflink_env -o pyflink_env.tar.gz

hadoop fs -rmr /tmp/pyflink_env.tar.gz
hadoop fs -put pyflink_env.tar.gz /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_env.tar.gz

Step 2. 制作 TaskManager 上的 PyFlink Conda 环境

运行下面的代码来创建 TaskManager 上的 PyFlink Conda 环境,TaskManager 上的 PyFlink 环境至少包含以下 2 个包:

  • 某个版本的 Python (这里用的是 3.7)
  • apache-flink (这里用的是 1.13.1)

剩下的包是 Python UDF 需要依赖的包,比如这里指定了 pandas:

echo "name: pyflink_tm_env
channels:
  - conda-forge
  - defaults
dependencies:
  - Python=3.7
  - pip
  - pip:
    - apache-flink==1.13.1
  - pandas
 " > pyflink_tm_env.yml
    
mamba env remove -n pyflink_tm_env
mamba env create -f pyflink_tm_env.yml

运行下面的代码打包 PyFlink 的 conda 环境并且上传到 HDFS (注意这里使用的是 zip 格式)

%sh

rm -rf pyflink_tm_env.zip
conda pack --ignore-missing-files --zip-symlinks -n pyflink_tm_env -o pyflink_tm_env.zip

hadoop fs -rmr /tmp/pyflink_tm_env.zip
hadoop fs -put pyflink_tm_env.zip /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_tm_env.zip

Step 3. 在 PyFlink 中使用 Conda 环境

接下来就可以在 Zeppelin 中使用上面创建的 Conda 环境了,首先需要在 Zeppelin 里配置 Flink,主要配置的选项有:

  • flink.execution.mode 为 yarn-application, 本文所讲的方法只适用于 yarn-application 模式;
  • 指定 yarn.ship-archives,zeppelin.pyflink.Python 以及 zeppelin.interpreter.conda.env.name 来配置 JobManager 侧的 PyFlink Conda 环境;
  • 指定 Python.archives 以及 Python.executable 来指定 TaskManager 侧的 PyFlink Conda 环境;
  • 指定其他可选的 Flink 配置,比如这里的 flink.jm.memory 和 flink.tm.memory。
%flink.conf


flink.execution.mode yarn-application

yarn.ship-archives /mnt/disk1/jzhang/zeppelin/pyflink_env.tar.gz
zeppelin.pyflink.Python pyflink_env.tar.gz/bin/Python
zeppelin.interpreter.conda.env.name pyflink_env.tar.gz

Python.archives hdfs:///tmp/pyflink_tm_env.zip
Python.executable  pyflink_tm_env.zip/bin/Python3.7

flink.jm.memory 2048
flink.tm.memory 2048

接下来就可以如一开始所说的那样在 Zeppelin 里使用 PyFlink 以及指定的 Conda 环境了。有 2 种场景:

  • 下面的例子里,可以在 PyFlink 客户端 (JobManager 侧) 使用上面创建的 JobManager 侧的 Conda 环境,比如下边使用了 Matplotlib。

    img

  • 下面的例子是在 PyFlink UDF 里使用上面创建的 TaskManager 侧 Conda 环境里的库,比如下面在 UDF 里使用 Pandas。

    img

三、总结与未来

本文内容就是在 Zeppelin notebook 里利用 Conda 来创建 Python env 自动部署到 Yarn 集群中,无需手动在集群上去安装任何 Pyflink 的包,并且可以在一个 Yarn 集群里同时使用多个版本的 PyFlink。

每个 PyFlink 的环境都是隔离的,而且可以随时定制更改 Conda 环境。可以下载下面这个 note 并导入到 Zeppelin,就可以复现今天讲的内容:http://23.254.161.240/#/notebook/2G8N1WTTS

此外还有很多可以改进的地方:

  • 目前我们需要创建 2 个 conda env ,原因是 Zeppelin 支持 tar.gz 格式,而 Flink 只支持 zip 格式。等后期两边统一之后,只要创建一个 conda env 就可以;
  • apache-flink 现在包含了 Flink 的 jar 包,这就导致打出来的 conda env 特别大,yarn container 在初始化的时候耗时会比较长,这个需要 Flink 社区提供一个轻量级的 Python 包 (不包含 Flink jar 包),就可以大大减小 conda env 的大小。

希望了解更多 Flink on Zeppelin 使用的同学可以加入下面的钉钉群来讨论。

img


第三届 Apache Flink 极客挑战赛报名开始!
30 万奖金等你来!

伴随着海量数据的冲击,数据处理分析能力在业务中的价值与日俱增,各行各业对于数据处理时效性的探索也在不断深入,作为主打实时计算的计算引擎 - Apache Flink 应运而生。

为给行业带来更多实时计算赋能实践的思路,鼓励广大热爱技术的开发者加深对 Flink 的掌握,Apache Flink 社区联手阿里云、英特尔、阿里巴巴人工智能治理与可持续发展实验室 (AAIG)、Occlum 联合举办 "第三届 Apache Flink 极客挑战赛暨 AAIG CUP" 活动,即日起正式启动。

👉 点击了解更多赛事信息 👈

img


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制T恤;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
SQL 机器学习/深度学习 Java
用Python进行实时计算——PyFlink快速入门
Flink 1.9.0及更高版本支持Python,也就是PyFlink。 在最新版本的Flink 1.10中,PyFlink支持Python用户定义的函数,使您能够在Table API和SQL中注册和使用这些函数。但是,听完所有这些后,您可能仍然想知道PyFlink的架构到底是什么?作为PyFlink的快速指南,本文将回答这些问题。
2882 0
用Python进行实时计算——PyFlink快速入门
|
存储 Ubuntu 数据库
Dockerfile(14) - VOLUME 指令详解
Dockerfile(14) - VOLUME 指令详解
5555 0
|
Python 流计算 API
PyFlink 教程(三):PyFlink DataStream API - state & timer
介绍如何在 Python DataStream API 中使用 state & timer 功能。
PyFlink 教程(三):PyFlink DataStream API - state & timer
|
网络架构 Docker 容器
Docker的网络模式和如何跨主机通信
Docker有四种网络模式:Bridge、Host、Container、None,一般常用的是前面两种,默认的是第一种,不安全的是第二种,以下介绍一下四种网络模式,并重点介绍一下如何配置自己的跨主机通信网络。
4534 0
|
11月前
|
传感器 算法 Java
基于 pyflink 的算法工作流设计和改造
本文分享了硕橙科技大数据工程师程兴源在Flink Forward Asia 2024上的演讲内容,围绕工业互联网场景下的Flink应用展开。主要内容包括:为何选择Flink、算法工作流设计、性能优化实践、上下游链路协作思考及未来展望。团队通过Flink处理工业设备数据(如温度、振动等),实现故障预测与分析。文章详细探讨了性能优化路径(如批处理、并行度提升)、KeyBy均衡化、内存管理等技术细节,并介绍了数据补全方法和告警规则的设计。最后,对未来基于Flink的编码强化、CEP模式改进及工业数据归因目标进行了展望。
434 7
基于 pyflink 的算法工作流设计和改造
|
机器学习/深度学习 人工智能 Cloud Native
福利「Flink Forward Asia 2023 」视频合集!
2023 年 12 月 9 日,Flink Forward Asia 2023 在北京圆满结束。本届大会共有 70+ 演讲议题、30+ 一线大厂技术与实践分享。现所有专场回放视频已经出炉,并在开发者社区上线。
6610 2
福利「Flink Forward Asia 2023 」视频合集!
|
分布式计算 资源调度 Hadoop
Hadoop YARN资源管理-容量调度器(Yahoo!的Capacity Scheduler)
详细讲解了Hadoop YARN资源管理中的容量调度器(Yahoo!的Capacity Scheduler),包括队列和子队列的概念、Apache Hadoop的容量调度器默认队列、队列的命名规则、分层队列、容量保证、队列弹性、容量调度器的元素、集群如何分配资源、限制用户容量、限制应用程序数量、抢占申请、启用容量调度器以及队列状态管理等方面的内容。
582 3
|
Cloud Native
云原生架构之X无限延伸:跨AZ、跨Region、跨Cloud,一文让你彻底解锁!
【8月更文挑战第25天】在云原生架构中,可扩展性至关重要,它确保了应用能按需高效调整资源。本文聚焦于三种扩展策略:跨AZ、跨Region及跨云扩展。跨AZ扩展通过在同一云内部不同可用区间部署应用副本增强容错性;跨Region扩展则通过不同地理区域的应用副本部署提升全球访问性能与可靠性;而跨云扩展则利用多云环境进一步加强应用的弹性和覆盖范围。文中提供了基于AWS CloudFormation的具体实践示例,帮助读者深入理解这些扩展机制的实际应用。
870 2
|
SQL 测试技术 API
SqlAlchemy 2.0 中文文档(二十)(1)
SqlAlchemy 2.0 中文文档(二十)
397 0

热门文章

最新文章