flink cdc采集的时候,我自己写了mybatis的查询,查询一些配置数据。 cancel job之后,myabtis的数据库连接没有释放,为什么?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在您使用Flink CDC进行数据采集时,同时自定义了MyBatis查询来获取配置数据,并遇到取消作业(cancel job)后MyBatis的数据库连接未被释放的问题。这可能是由于以下几个原因导致的:
资源管理不当:Flink作业在取消时,其内部的资源通常会进行清理,但自定义的MyBatis查询部分可能没有正确集成到Flink的资源生命周期管理中。这意味着当Flink作业结束时,MyBatis创建的数据库连接可能没有被显式关闭。
连接池配置问题:如果您的MyBatis配置使用了连接池(如上述示例中的POOLED
类型),连接池可能没有配置合适的连接回收策略。即使作业结束,连接池可能仍然持有这些连接,直到达到预设的超时时间才释放。
异常处理缺失:在执行MyBatis查询的过程中,如果遇到未被捕获的异常,可能导致连接没有通过finally块或try-with-resources语句正确关闭。
为了确保MyBatis的数据库连接在Flink作业取消后能够被正确释放,您可以采取以下措施:
集成资源释放逻辑:在Flink作业的生命周期管理中明确地添加代码来管理MyBatis的数据库连接。例如,在open()
方法中初始化数据库连接,在close()
方法中确保所有资源被正确关闭。对于DataStream API,这对应于RichFunction
的生命周期方法;对于Table API,则可能需要自定义函数并实现相应的生命周期接口。
优化连接池配置:
dataSource
配置中的closeConnection
相关参数,确保连接在不再需要时能被及时关闭。增强异常处理:确保所有数据库操作都包裹在适当的异常处理逻辑中,确保即使发生异常也能释放资源。可以使用try-with-resources语句来自动管理资源生命周期。
在Flink作业中使用自定义函数时,确保资源管理得当,如下所示(伪代码示例):
public class MyDatabaseLookupFunction extends RichFlatMapFunction<...> {
private DataSource dataSource;
private MyMapper myMapper;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化数据源和MyBatis Mapper
dataSource = ...; // 根据实际情况初始化数据源
SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(Resources.getResourceAsStream("mybatis-config.xml"));
SqlSession sqlSession = sqlSessionFactory.openSession(dataSource);
myMapper = sqlSession.getMapper(MyMapper.class);
}
@Override
public void flatMap(..., Collector<...> out) throws Exception {
// 执行MyBatis查询逻辑
...
}
@Override
public void close() throws Exception {
if (myMapper != null && myMapper.sqlSession != null) {
myMapper.sqlSession.close();
}
if (dataSource != null) {
// 根据数据源类型,执行适当的关闭逻辑
...
}
super.close();
}
}
通过上述措施,您可以有效解决Flink CDC作业取消后MyBatis数据库连接未释放的问题。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。