在使用Flink1.13.6 jdbc-connector版本是2.11-1.13.6,我们是把Oracle作为Sink表,但是业务比较稀疏,长时间可能没有写入操作,等到十几个小时再要写入的时候,就会报错 closed connection,然后任务失败了,有什么保活机制吗?求大佬们解答
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在使用Flink JDBC Connector时,如果将Oracle作为Sink表,并且业务写入操作较为稀疏,可能会遇到长时间无写入后连接被关闭的问题(closed connection
)。这是由于数据库或网络中间件(如防火墙、代理)在空闲一段时间后会主动关闭连接。为了解决这一问题,可以通过以下几种方式实现保活机制:
Flink JDBC Connector支持通过连接池管理数据库连接。可以通过调整连接池的相关参数来避免连接因长时间空闲而被关闭。
关键参数:
connectionMaxActive
:设置连接池的最大连接数。connectionTimeout
:设置获取连接的超时时间。idleTimeout
:设置连接在连接池中空闲的最大时间。maxLifetime
:设置连接的最大生命周期。示例配置: 在创建JDBC Sink表时,可以通过WITH
参数添加连接池配置:
CREATE TABLE oracle_sink (
id BIGINT,
name VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@//<host>:<port>/<service>',
'table-name' = '<your_table>',
'username' = '<your_username>',
'password' = '<your_password>',
'connection.max-active' = '5', -- 最大连接数
'connection.idle-timeout' = '300000', -- 空闲超时时间(毫秒)
'connection.max-lifetime' = '600000' -- 连接最大生命周期(毫秒)
);
如果数据库支持保活机制,可以通过数据库端的配置来避免连接被关闭。
SQLNET.EXPIRE_TIME
参数来定期发送探测包,保持连接活跃。
sqlnet.ora
文件,添加或修改以下内容:SQLNET.EXPIRE_TIME = 10
上述配置表示每10分钟发送一次探测包,确保连接不被关闭。
Flink JDBC Connector支持写入失败后的重试机制。可以通过配置max-retries
和retry-wait-time
参数来处理连接断开的情况。
关键参数:
sink.max-retries
:写入失败后的最大重试次数,默认值为3。sink.retry-wait-time
:每次重试之间的等待时间(毫秒),默认值为100。示例配置:
CREATE TABLE oracle_sink (
id BIGINT,
name VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@//<host>:<port>/<service>',
'table-name' = '<your_table>',
'username' = '<your_username>',
'password' = '<your_password>',
'sink.max-retries' = '5', -- 最大重试次数
'sink.retry-wait-time' = '5000' -- 每次重试间隔(毫秒)
);
如果上述方法无法完全解决问题,可以通过定期发送心跳查询来保持连接活跃。可以在Flink任务中添加一个定时任务,定期执行简单的SQL查询(如SELECT 1 FROM DUAL
)。
实现方式:
RichSinkFunction
自定义Sink,在open
方法中启动一个定时线程,定期执行心跳查询。public class OracleSinkWithHeartbeat extends RichSinkFunction<Row> {
private Connection connection;
private ScheduledExecutorService scheduler;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = DriverManager.getConnection("<jdbc_url>", "<username>", "<password>");
scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try (Statement stmt = connection.createStatement()) {
stmt.execute("SELECT 1 FROM DUAL");
} catch (SQLException e) {
e.printStackTrace();
}
}, 0, 5, TimeUnit.MINUTES); // 每5分钟发送一次心跳
}
@Override
public void invoke(Row value, Context context) throws Exception {
// 正常写入逻辑
}
@Override
public void close() throws Exception {
if (scheduler != null) {
scheduler.shutdown();
}
if (connection != null) {
connection.close();
}
}
}
Flink 1.13.6是一个较旧的版本,可能存在一些已知问题。建议升级到更高版本(如Flink 1.15及以上),以获得更好的稳定性和性能优化。
针对您的问题,推荐按照以下步骤解决: 1. 优先调整连接池参数,例如connection.idle-timeout
和connection.max-lifetime
。 2. 启用Oracle数据库的保活机制,通过SQLNET.EXPIRE_TIME
参数定期发送探测包。 3. 配置Flink的重试机制,确保在连接断开时能够自动重试。 4. 如果问题仍然存在,可以考虑定期发送心跳查询或升级Flink版本。
以上方法均基于知识库中的相关资料,您可以根据实际业务需求选择合适的解决方案。您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。