版本:python3.8,pyflink1.16.1 集群基于standalone搭建,在上边跑批任务跑了几天,频率比较高,发现taskmanager挂了,之后查询原因定位到是使用了udf的情况就会导致Metaspace内存增加。 然后写了个简单的测试代码印证了下,
from pyflink.table.udf import udf
@udf(result_type=DataTypes.BIGINT())
def convert_to_timestamp(cur_date: datetime):
return cur_date.timestamp() * 1000
s_env = self.getStreamEnv()
s_env.set_parallelism(1)
st_env = self.getTableEnv()
mysql_source = """
CREATE TABLE if not exists marketplace_rank_sale_daily_mysql_src (
`id` INT,
`category_id` BIGINT comment '分类ID',
`begin_rank_scale` INT comment '排名起始区间',
`rank` FLOAT comment '排名',
`quantity` FLOAT comment '销量',
`date` DATE comment '日期',
`created_time` TIMESTAMP(0) comment '创建时间',
PRIMARY KEY (`category_id`,`begin_rank_scale`,`date`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'xxx',
'table-name' = 'xxx',
'username' = 'xxx',
'password' = 'xxx',
'driver' = 'com.mysql.cj.jdbc.Driver'
)
"""
st_env.execute_sql(mysql_source)
st_env.create_temporary_function("convert_to_timestamp", convert_to_timestamp)
st_env.execute_sql("SELECT category_id, `rank`, quantity,`date`, `created_time`,convert_to_timestamp(created_time) as time_number FROM marketplace_rank_sale_daily_mysql_src").print()
没引入其他jar包,都放在flink_home/lib下,类加载器用的ParentFirstClassLoader,不知道该怎么解决了?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。