开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Pyflink1.16.1使用UDF会导致JVM Metaspace内存泄漏

版本:python3.8,pyflink1.16.1 集群基于standalone搭建,在上边跑批任务跑了几天,频率比较高,发现taskmanager挂了,之后查询原因定位到是使用了udf的情况就会导致Metaspace内存增加。 然后写了个简单的测试代码印证了下,

        from pyflink.table.udf import udf

        @udf(result_type=DataTypes.BIGINT())
        def convert_to_timestamp(cur_date: datetime):
              return cur_date.timestamp() * 1000

        s_env = self.getStreamEnv()
        s_env.set_parallelism(1)
        st_env = self.getTableEnv()


        mysql_source = """
        CREATE TABLE if not exists marketplace_rank_sale_daily_mysql_src (
        `id` INT,
        `category_id` BIGINT  comment '分类ID',
        `begin_rank_scale` INT comment '排名起始区间',
        `rank` FLOAT comment '排名',  
        `quantity` FLOAT  comment '销量',
        `date` DATE  comment '日期',
        `created_time` TIMESTAMP(0) comment '创建时间',
        PRIMARY KEY (`category_id`,`begin_rank_scale`,`date`) NOT ENFORCED
        ) WITH (
        'connector' = 'jdbc',
        'url' = 'xxx',
        'table-name' = 'xxx',
        'username' = 'xxx',
        'password' = 'xxx',
        'driver' = 'com.mysql.cj.jdbc.Driver'
        )
        """

        st_env.execute_sql(mysql_source)


        st_env.create_temporary_function("convert_to_timestamp", convert_to_timestamp)
        st_env.execute_sql("SELECT category_id, `rank`, quantity,`date`,         `created_time`,convert_to_timestamp(created_time) as time_number FROM marketplace_rank_sale_daily_mysql_src").print()

没引入其他jar包,都放在flink_home/lib下,类加载器用的ParentFirstClassLoader,不知道该怎么解决了?

展开
收起
vkjr7g4l6phai 2023-05-23 17:25:16 152 0
2 条回答
写回答
取消 提交回答
  • 该版本session集群存在此问题,只能更换成yarn application模式,具体修复版本未知。

    2023-06-15 09:41:26
    赞同 展开评论 打赏
  • 存在即是合理

    猜测可能存在版本不兼容的问题。建议升级或降级相应的依赖库版本再试试。

    2023-05-23 17:36:52
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
云服务器ECS内存增强型实例re6全新发布 立即下载
JVM的GC 立即下载
基于JVM的脚本语言开发、运用实践 立即下载