开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

大佬们,请教,pyflink pandas udf如何提高并行度

已解决

我目前尝试使用pyflink做离线处理,flink版本是1.14.6。逻辑里用到了pyflink特有的函数pandas udf,了解到这是个批处理调用的函数,通过python.fn-execution.arrow.batch.size可以控制进入函数pd.Dataframe的数据量,这个我已经通过本地应用验证的确生效,但是我设置了这些参数,将作业提交到集群上执行,通过yarn per job的形式提交。

可以在执行图上看到pandas的调用作业并行度是1而后续的作业并行度是1000,所以-p 1000是生效的。提交指令如下

flink run -yd -m yarn-cluster \
-ys 8 -p 1000 - yjm 4G - ytm 8G \
-ynm app_pf \
-yD yarn.taskmanager.env.JAVA_HOME=/usr/java8/jdk \
-yD containerized.master.env.JAVA_HOME=/usr/java8/jdk \
-yD containerized.taskmanager.env.JAVA_HOME=/usr/java8/jdk \
-pyarch venv.zip \
-pyexec venv.zip/venv/bin/python \
-pyclientexec venv.zip/venv/bin/python \
-py /opt/modules/packages/scripts/app_pf.py

而且由于是用的table API,用的map算子调用pandas udf,并不能直接设置并行度。只能这样全局设置或者在代码里写死,这个效果应该是一样的。所以请教各位开发者,我该怎么提高pandas udf处理逻辑的并行度。

此外,我使用的是Hive的catalog数据源,使用这个pandas udf前面的逻辑几乎不能有任何其他处理,包括取数的SQL逻辑,SQL嵌套结构、查询字段语句有函数操作(if语句),只要有逻辑,都会报两个奇葩的错误。takes 1 positional argument but x were given,这个x是map算子输入数据table对象的字段个数。或者某一列不存在,这一列通常是做了函数操作,当数据量小的时候(1w以下)as(a as b) 的逻辑可以运行,数据量大的时候,as的操作也会报错某一列不存在。

展开
收起
AceMars9527 2024-04-09 19:39:41 48 0
1 条回答
写回答
取消 提交回答
  • 采纳回答

    这个问题,我自问自答吧,这两个问题,在使用table的row函数和pandas函数,都会遇到。估计这个是当前版本的问题,我通过将table对象转成datastream对象,然后使用stream的map算子调用row函数,可以指定并行度,处理完成后再转回table对象,这样处理后,这两个问题(并行度问题和map函数前有数据处理问题)都能得到解决

    2024-04-25 15:57:24
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
中文:即学即用的Pandas入门与时间序列分析 立即下载
即学即用的Pandas入门与时间序列分析 立即下载
低代码开发师(初级)实战教程 立即下载