说明: flink版本:flink 1.15 运行环境:flink on yarn的应用模式 代码模式:在java中写的flink sql
处理逻辑描述: 1.从kafka读取数据 2.经过flink sql汇总处理后sink到mysql,代码如下: String insertSinkDDL ="insert into eol_test_report_xt_test"+ " ("+ " report_no," + " report_count" + " )" + " select " + " t01.report_no, "+ " count() as report_count" + " from source_kafka_table_report_info t01" + " where t01.collect_time<LOCALTIMESTAMP" + " group by t01.report_no " + " having count()<3"+ ""; tableEnv.executeSql(insertSinkDDL);
问题描述: 逻辑中有一段 where t01.collect_time<LOCALTIMESTAMP判断,意思是当kafka来的数据的时间小于当前日期才插入mysql。 现在kafka来一条数据就触发一下这个sink的逻辑,我想实现这种种效果:kafka没有数据来的时候,晚上零点的时候触发一次这个写入mysql的逻辑,这种可以吗?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
可以通过设置 Flink 的定时任务来实现定时触发写入 MySQL 的逻辑。
具体实现方法如下:
首先,需要在代码中添加一个定时任务,例如:
env.addSource(new RichSourceFunction<String>() {
   private volatile boolean isRunning = true;
   @Override
   public void run(SourceContext<String> sourceContext) throws Exception {
       while (isRunning) {
           // 每隔一段时间触发一次写入 MySQL 的逻辑
           Thread.sleep(24 * 60 * 60 * 1000); // 单位为毫秒,这里设置为 24 小时
           sourceContext.collect("trigger"); // 发送一个触发信号
       }
   }
   @Override
   public void cancel() {
       isRunning = false;
   }
})
.setParallelism(1)
.name("TimerSource");
这里使用了一个 RichSourceFunction,每隔一段时间发送一个触发信号到下游。
然后,在 SQL 中添加一个 join,将触发信号和数据流连接在一起,例如:
String insertSinkDDL = "insert into eol_test_report_xt_test" +
       " (" +
       " report_no," +
       " report_count" +
       " )" +
       " select " +
       " t01.report_no, "+
       " count() as report_count" +
       " from source_kafka_table_report_info t01" +
       " join TimerSource t02 on t02.f0 = 'trigger'" + // 这里连接触发信号
       " where t01.collect_time<LOCALTIMESTAMP" +
       " group by t01.report_no " +
       " having count()<3";
tableEnv.executeSql(insertSinkDDL);
这里使用了一个 TimerSource,将触发信号和数据流连接在一起。当触发信号到达时,join 将触发写入 MySQL 的逻辑。
这样,就可以实现每隔一段时间触发一次写入 MySQL 的逻辑,而不需要等待 Kafka 有数据到来。