我目前尝试使用pyflink做离线处理,flink版本是1.14.6。逻辑里用到了pyflink特有的函数pandas udf
,了解到这是个批处理调用的函数,通过python.fn-execution.arrow.batch.size
可以控制进入函数pd.Dataframe
的数据量,这个我已经通过本地应用验证的确生效,但是我设置了这些参数,将作业提交到集群上执行,通过yarn per job的形式提交。
可以在执行图上看到pandas的调用作业并行度是1而后续的作业并行度是1000,所以-p 1000
是生效的。提交指令如下
flink run -yd -m yarn-cluster \
-ys 8 -p 1000 - yjm 4G - ytm 8G \
-ynm app_pf \
-yD yarn.taskmanager.env.JAVA_HOME=/usr/java8/jdk \
-yD containerized.master.env.JAVA_HOME=/usr/java8/jdk \
-yD containerized.taskmanager.env.JAVA_HOME=/usr/java8/jdk \
-pyarch venv.zip \
-pyexec venv.zip/venv/bin/python \
-pyclientexec venv.zip/venv/bin/python \
-py /opt/modules/packages/scripts/app_pf.py
而且由于是用的table API,用的map算子调用pandas udf,并不能直接设置并行度。只能这样全局设置或者在代码里写死,这个效果应该是一样的。所以请教各位开发者,我该怎么提高pandas udf处理逻辑的并行度。
此外,我使用的是Hive的catalog数据源,使用这个pandas udf前面的逻辑几乎不能有任何其他处理,包括取数的SQL逻辑,SQL嵌套结构、查询字段语句有函数操作(if语句),只要有逻辑,都会报两个奇葩的错误。takes 1 positional argument but x were given
,这个x是map算子输入数据table对象的字段个数。或者某一列不存在,这一列通常是做了函数操作,当数据量小的时候(1w以下)as(a as b) 的逻辑可以运行,数据量大的时候,as的操作也会报错某一列不存在。
这个问题,我自问自答吧,这两个问题,在使用table的row函数和pandas函数,都会遇到。估计这个是当前版本的问题,我通过将table对象转成datastream对象,然后使用stream的map算子调用row函数,可以指定并行度,处理完成后再转回table对象,这样处理后,这两个问题(并行度问题和map函数前有数据处理问题)都能得到解决
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。