说明: flink版本:flink 1.15 运行环境:flink on yarn的应用模式 代码模式:在java中写的flink sql
处理逻辑描述: 1.从kafka读取数据 2.经过flink sql汇总处理后sink到mysql,代码如下: String insertSinkDDL ="insert into eol_test_report_xt_test"+ " ("+ " report_no," + " report_count" + " )" + " select " + " t01.report_no, "+ " count() as report_count" + " from source_kafka_table_report_info t01" + " where t01.collect_time<LOCALTIMESTAMP" + " group by t01.report_no " + " having count()<3"+ ""; tableEnv.executeSql(insertSinkDDL);
问题描述: 逻辑中有一段 where t01.collect_time<LOCALTIMESTAMP判断,意思是当kafka来的数据的时间小于当前日期才插入mysql。 现在kafka来一条数据就触发一下这个sink的逻辑,我想实现这种种效果:kafka没有数据来的时候,晚上零点的时候触发一次这个写入mysql的逻辑,这种可以吗?
可以通过设置 Flink 的定时任务来实现定时触发写入 MySQL 的逻辑。
具体实现方法如下:
首先,需要在代码中添加一个定时任务,例如:
env.addSource(new RichSourceFunction<String>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (isRunning) {
// 每隔一段时间触发一次写入 MySQL 的逻辑
Thread.sleep(24 * 60 * 60 * 1000); // 单位为毫秒,这里设置为 24 小时
sourceContext.collect("trigger"); // 发送一个触发信号
}
}
@Override
public void cancel() {
isRunning = false;
}
})
.setParallelism(1)
.name("TimerSource");
这里使用了一个 RichSourceFunction
,每隔一段时间发送一个触发信号到下游。
然后,在 SQL 中添加一个 join
,将触发信号和数据流连接在一起,例如:
String insertSinkDDL = "insert into eol_test_report_xt_test" +
" (" +
" report_no," +
" report_count" +
" )" +
" select " +
" t01.report_no, "+
" count() as report_count" +
" from source_kafka_table_report_info t01" +
" join TimerSource t02 on t02.f0 = 'trigger'" + // 这里连接触发信号
" where t01.collect_time<LOCALTIMESTAMP" +
" group by t01.report_no " +
" having count()<3";
tableEnv.executeSql(insertSinkDDL);
这里使用了一个 TimerSource
,将触发信号和数据流连接在一起。当触发信号到达时,join
将触发写入 MySQL 的逻辑。
这样,就可以实现每隔一段时间触发一次写入 MySQL 的逻辑,而不需要等待 Kafka 有数据到来。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。