开发者社区> 问答> 正文

pyflink 运行提示,到底什么原因?

flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on yarn,per-job模式

程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中

主要代码

t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size', '128m')

t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")

t_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")

t_env.add_python_archive("venv.zip")

t_env.get_config().set_python_executable("venv.zip/venv/bin/python")

@udf(input_types=[DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.INT())

def judge_ip(src_ip, dst_ip):

import IPy

.....

t_env.register_function('judge_ip', judge_ip)

下面是主要报错信息

Traceback (most recent call last):

File "traffic-tuple-sf.py", line 59, in

t_env.register_function('judge_ip', judge_ip)

File "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 876, in register_function

File "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in call

File "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco

File "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o5.registerFunction.

: org.apache.flink.table.api.ValidationException: Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable. Make sure that the class is self-contained (i.e. no references to outer classes) and all inner fields are serializable as well.

at org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349)

at org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204)

at org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357)

at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)

at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:567)

at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.base/java.lang.Thread.run(Thread.java:831)

Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @1311d9fb

at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)

at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)

at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177)

at java.base/java.lang.reflect.Field.setAccessible(Field.java:171)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:104)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)

at org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:346)

... 14 more

麻烦各位大佬给看看是哪里有问题呀,应该如何修改~ 感谢*来自志愿者整理的FLINK邮件归档

展开
收起
又出bug了-- 2021-12-03 16:57:27 696 0
1 条回答
写回答
取消 提交回答
  • 你要不先用local的方式跑一下试试?先缩小一下范围。从报错看,编译的时候报错的,应该在你现在提交作业的机器上就可以复现出来。*来自志愿者整理的FLINK邮件归档

    2021-12-03 17:53:18
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Monitoring the Dynamic Resource Usage of Scala and Python Spark Jobs in Yarn 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载