版本:python3.8,pyflink1.16.1 集群基于standalone搭建,在上边跑批任务跑了几天,频率比较高,发现taskmanager挂了,之后查询原因定位到是使用了udf的情况就会导致Metaspace内存增加。 然后写了个简单的测试代码印证了下,
        from pyflink.table.udf import udf
        @udf(result_type=DataTypes.BIGINT())
        def convert_to_timestamp(cur_date: datetime):
              return cur_date.timestamp() * 1000
        s_env = self.getStreamEnv()
        s_env.set_parallelism(1)
        st_env = self.getTableEnv()
        mysql_source = """
        CREATE TABLE if not exists marketplace_rank_sale_daily_mysql_src (
        `id` INT,
        `category_id` BIGINT  comment '分类ID',
        `begin_rank_scale` INT comment '排名起始区间',
        `rank` FLOAT comment '排名',  
        `quantity` FLOAT  comment '销量',
        `date` DATE  comment '日期',
        `created_time` TIMESTAMP(0) comment '创建时间',
        PRIMARY KEY (`category_id`,`begin_rank_scale`,`date`) NOT ENFORCED
        ) WITH (
        'connector' = 'jdbc',
        'url' = 'xxx',
        'table-name' = 'xxx',
        'username' = 'xxx',
        'password' = 'xxx',
        'driver' = 'com.mysql.cj.jdbc.Driver'
        )
        """
        st_env.execute_sql(mysql_source)
        st_env.create_temporary_function("convert_to_timestamp", convert_to_timestamp)
        st_env.execute_sql("SELECT category_id, `rank`, quantity,`date`,         `created_time`,convert_to_timestamp(created_time) as time_number FROM marketplace_rank_sale_daily_mysql_src").print()
没引入其他jar包,都放在flink_home/lib下,类加载器用的ParentFirstClassLoader,不知道该怎么解决了?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。