1.概述
需求:读取Kafka数据源,引入py模型,输出预测结果
实时计算Flink环境:已预装了Python 3.7.9,预装Pandas、NumPy、PyArrow等常用的Python库
脚本:
1.pyPredict.py:读取Kafka,写入Kafka
2.xg-model.pkl:xgboost模型,输入特征因子,输出结果
思路:将模型文件以pandas udf形式进行调用
2.部署过程
2.1上传依赖及脚本
1.在资源上传界面点击上传资源将pyPredict.py和xg-model.pkl上传上去
2.因要读取kafka,需要kafka-connect依赖,在maven上下载flink-connector-kafka依赖jar包flink-connector-kafka_2.12-1.13.6.jar
https://repo.maven.apache.org/maven2/org/apache/flink/
2.2创建作业
1.将依赖及作业填入下方,提交作业点击运行
2.在作业启动日志里出现错误:ERROR org.apache.flink.client.python.PythonDriver [] - Run python process failed
java.lang.RuntimeException:Python process exits with code:1
3.怀疑是使用了错误版本的jar包,后来注意到阿里云FLink有使用限制:仅支持开源Scala V2.11版本,如果Python作业中依赖第三方JAR包,请确保使用Scala V2.11对应的JAR包依赖
4.将jar包换成flink-connector-kafka_2.11-1.13.6.jar版本,运行作业后又分别出现如下错误:ModuleNotFoundError:No module named 'joblib';ModuleNotFoundError:No module named 'xgboost';ModuleNotFoundError:No module named 'scipy';
上述报错应该是模型文件xg-model里需要的,参考阿里云文档
将对应的model包下载下来并上传至资源列表中scipy-1.1.1-cp37-cp37m-manylinux1_x86_64.whl;xgboost-1.6.2-py3-none-manylinux2014_x86_64.whl;joblib-1.2.0-py3-none-any.whl;
5.将上述作业提交运行之后报:ImportError:cannot import name '_ccallback_c' from 'scipy._lib',经过各种尝试解决问题都不行还是报这个错误,参考StackOverFlow上的建议也是没解决https://stackoverflow.com/questions/64658954/importerror-cannot-import-name-ccallback-c-from-scipy-lib
后咨询一位阿里大佬,让我用如下方式编译包进行使用
https://help.aliyun.com/document_detail/207351.html
6.参考阿里的文档在一台装有docker的服务器上(仅需docker就行,不需要服务器有py)
a.编译第三方Python包。
i.在本地准备requirements.txt文件,其内容如下。
scipy joblib xgboost
ii.在本地准备build.sh脚本,其内容如下。
#!/bin/bash set -e -x yum install -y zip PYBIN=/opt/python/cp37-cp37m/bin "${PYBIN}/pip" install --target __pypackages__ -r requirements.txt cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd .. rm -rf __pypackages__
iii.执行如下命令。
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 /bin/bash build.sh
该命令执行完后,会生成一个名字为deps.zip的文件,该文件为编译之后的第三方Python包。
b.将deps.zip打包上传至资源列表,资源过大则采用OSS上传
7.重新运行作业日志中又出现如下错误:AttributeError:'super' object has no attribute 'get_params'。经与大佬沟通怀疑是py版本兼容问题,接下来采用自定义Python虚拟环境把py版本换成3.8版本,因为模型文件xg-model.pkl是3.8版本生成的
2.3自定义虚拟Python环境
自定义Python虚拟环境需要使用vvr-6.x版本引擎,目前实时计算Flink最高版本引擎为vvr-6.0.2-flink-1.15
- 准备Python 3.8的虚拟环境。
- 在本地准备setup-pyflink-virtual-env.sh脚本,其内容如下。
set -e # 下载Python 3.8 miniconda.sh脚本。 wget "https://repo.continuum.io/miniconda/Miniconda3-py38_4.12.0-Linux-x86_64.sh" -O "miniconda.sh" # 为Python 3.8 miniconda.sh脚本添加执行权限。 chmod +x miniconda.sh # 创建Python的虚拟环境。 ./miniconda.sh -b -p venv # 激活Conda Python虚拟环境。 source venv/bin/activate "" # 安装PyFlink依赖。须使用1.15版本的flink # update the PyFlink version if needed pip install "apache-flink==1.15.2" # 安装模型文件依赖保持与本地环境版本一致 pip install "scipy==1.6.2" pip install "joblib==1.0.1" pip install "xgboost==1.6.1" # 关闭Conda Python虚拟环境。 conda deactivate # 删除缓存的包。 rm -rf venv/pkgs # 将准备好的Conda Python虚拟环境打包。 zip -r venv.zip venv
b. 在本地准备build.sh脚本,其内容如下。
#!/bin/bash set -e -x yum install -y zip wget cd /root/ bash /build/setup-pyflink-virtual-env.sh mv venv.zip /build/
c. 在命令行,执行如下命令,完成python 3.8虚拟环境的安装。
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 ./build.sh
执行完该命令后,会生成一个名字为venv.zip的文件,即为Python 3.8的虚拟环境。
2.在Python作业中使用Python 3.8虚拟环境。
a.将venv.zip文件上传至资源列表,文件过大采用OSS上传,因虚拟环境已经将模型文件所需的包打进去,这里只需要venv.zip文件即可;因使用flink1.15版本,则需要1.15版本的kafka-connector包flink-sql-connector-kafka-1.15.2.jar(1.15去scala化了,没有2.11的后缀了)
jar下载路径https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/kafka/
b.单击右侧的高级配置,在更多Flink配置项,添加配置信息
python.executable venv.zip/venv/bin/python python.client.executable venv.zip/venv/bin/python
3.运行作业,运行日志出现:java.lang.NoSuchMethodError:org.apache.flink.metrics.groups.SinkWriterMetricGroup.GetNumBytesSendCounter()Lorg/apache/flink/metrics/Counter;
经咨询阿里kakfa大佬,这是一个已知问题。用vvr内部的kafka可以先解决。社区的得到1.15.3才能解决;
4.vvr内部可以直接这么用:
在右侧高级配置里添加
pipeline.classpaths'file:///flink/usrlib/ververica-connector-kafka-1.15-vvr-6.0.2-3-SNAPSHOT-jar-with-dependencies.jar;file:///flink/usrlib/ververica-connector-common-1.15-vvr-6.0.2-3-SNAPSHOT-jar-with-dependencies.jar'
或者代码里添加
table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///flink/usrlib/ververica-connector-kafka-1.15-vvr-6.0.2-3-SNAPSHOT-jar-with-dependencies.jar;file:///flink/usrlib/ververica-connector-common-1.15-vvr-6.0.2-3-SNAPSHOT-jar-with-dependencies.jar")
5.重新运行作业出现AttributeError:'super' object has no attribute 'get_params'错误,和上面直接使用阿里云环境出现同样的问题。现在可以排除是Py版本的问题了,查看报错原因,是因为缺少cikit-learn包
在上述setup-pyflink-virtual-env.sh脚本中添加,重新打包
pip install "scikit-learn"
6.重新运行,作业正常启动,也有结果输出
2.4使用阿里云环境直接运行
1.参考2.2中编译Python包,在requirements.txt文件中添加scikit-learn,重新打包
2.使用vvr-6.x引擎直接运行,发现作业正常,有数据输出
3.总结
部署py作业实属不易,期间出现很多问题,也走了不少弯路,前前后后折腾了3天,才把问题解决,期间还得多谢阿里大佬的支持
打铁还需自身硬啊,兄弟们,得多多学习啦
这期就到这里了,拜了个拜