开发者社区> 问答> 正文

flink sql 能否手动触发动态查询?

说明: flink版本:flink 1.15 运行环境:flink on yarn的应用模式 代码模式:在java中写的flink sql

处理逻辑描述: 1.从kafka读取数据 2.经过flink sql汇总处理后sink到mysql,代码如下: String insertSinkDDL ="insert into eol_test_report_xt_test"+ " ("+ " report_no," + " report_count" + " )" + " select " + " t01.report_no, "+ " count() as report_count" + " from source_kafka_table_report_info t01" + " where t01.collect_time<LOCALTIMESTAMP" + " group by t01.report_no " + " having count()<3"+ ""; tableEnv.executeSql(insertSinkDDL);

问题描述: 逻辑中有一段 where t01.collect_time<LOCALTIMESTAMP判断,意思是当kafka来的数据的时间小于当前日期才插入mysql。 现在kafka来一条数据就触发一下这个sink的逻辑,我想实现这种种效果:kafka没有数据来的时候,晚上零点的时候触发一次这个写入mysql的逻辑,这种可以吗?

展开
收起
游客fuzojzpl5x2bu 2023-06-19 16:02:20 93 0
1 条回答
写回答
取消 提交回答
  • 可以通过设置 Flink 的定时任务来实现定时触发写入 MySQL 的逻辑。

    具体实现方法如下:

    1. 首先,需要在代码中添加一个定时任务,例如:

      env.addSource(new RichSourceFunction<String>() {
         private volatile boolean isRunning = true;
      
         @Override
         public void run(SourceContext<String> sourceContext) throws Exception {
             while (isRunning) {
                 // 每隔一段时间触发一次写入 MySQL 的逻辑
                 Thread.sleep(24 * 60 * 60 * 1000); // 单位为毫秒,这里设置为 24 小时
                 sourceContext.collect("trigger"); // 发送一个触发信号
             }
         }
      
         @Override
         public void cancel() {
             isRunning = false;
         }
      })
      .setParallelism(1)
      .name("TimerSource");
      

      这里使用了一个 RichSourceFunction,每隔一段时间发送一个触发信号到下游。

    2. 然后,在 SQL 中添加一个 join,将触发信号和数据流连接在一起,例如:

      String insertSinkDDL = "insert into eol_test_report_xt_test" +
             " (" +
             " report_no," +
             " report_count" +
             " )" +
             " select " +
             " t01.report_no, "+
             " count() as report_count" +
             " from source_kafka_table_report_info t01" +
             " join TimerSource t02 on t02.f0 = 'trigger'" + // 这里连接触发信号
             " where t01.collect_time<LOCALTIMESTAMP" +
             " group by t01.report_no " +
             " having count()<3";
      tableEnv.executeSql(insertSinkDDL);
      

      这里使用了一个 TimerSource,将触发信号和数据流连接在一起。当触发信号到达时,join 将触发写入 MySQL 的逻辑。

    这样,就可以实现每隔一段时间触发一次写入 MySQL 的逻辑,而不需要等待 Kafka 有数据到来。

    2023-06-19 16:49:26
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载