开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请教下大家Flink CDC,flinksql的这个代码要如何修改才能实现动态传参?

请教下大家Flink CDC,flinksql的这个代码要如何修改才能实现动态传参formatted-date这个参数的动态传参?实现临时表 rltest的formatted_date字段动态输出; Table rltest = tableEnv.sqlQuery(
"SELECT " +
", " +
"DATE_FORMAT(create_date, 'yyyy-MM-dd') as formatted_date " +
"FROM temp_table"
);
tableEnv.createTemporaryView("newb_temp_view", rltest);
String formattedDate ="SELECT formatted_date FROM " + rltest ; // 从 rltest 检索
tableEnv.createTemporaryView("selected_create_date",tableEnv.sqlQuery(formattedDate));
Table drrt = tableEnv.sqlQuery(
"SELECT bo.station_id , " +
"ss.station_name, " +
"COUNT(bo.id)/2 AS switch_count, " +
"ROUND(SUM(bov.switch_electric_quantity), 3)/2 AS total_switch_electric_quantity, " +
"ROUND(SUM(bo.pay_amount), 3)/2 AS total_pay_amount, " +
"'(SELECT formatted_date FROM selected_create_date)' AS create_date " +
"FROM border bo " +
"INNER JOIN \n" +
" bs_order_vehicle bov ON bo.id = bov.order_id " +
"INNER JOIN \n" +
" sys_station ss ON bo.station_id = ss.id " +
"WHERE \n" +
" bo.station_id IN (SELECT station_id FROM selected_station_id)" +
" AND DATE_FORMAT(bo.create_date, 'yyyy-MM-dd') IN (SELECT formatted_date FROM selected_create_date) " +
" AND bo.pay_type <> 0 " +
" AND bo.pay_type IS NOT NULL " +
" AND bo.order_status = 3 " +
" AND bo.order_type = 0 " +
" AND bo.del_flag = 0 " +
" GROUP BY bo.station_id, ss.station_name "
);
tableEnv.createTemporaryView("temp_table4", drrt);
Table drt = tableEnv.sqlQuery("SELECT
FROM temp_table4");
tableEnv.toChangelogStream(drt).print("SQL");
env.execute();

展开
收起
cuicuicuic 2024-01-09 11:52:41 35 0
1 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    要实现动态传参formatted-date,可以在查询字符串中使用占位符,然后在执行查询时传入参数。以下是修改后的代码:

    String formattedDatePlaceholder = "'(SELECT formatted_date FROM selected_create_date)'";
    String query = "SELECT bo.station_id, ss.station_name, COUNT(bo.id)/2 AS switch_count, ROUND(SUM(bov.switch_electric_quantity), 3)/2 AS total_switch_electric_quantity, ROUND(SUM(bo.pay_amount), 3)/2 AS total_pay_amount, " + formattedDatePlaceholder + " AS create_date FROM border bo INNER JOIN bs_order_vehicle bov ON bo.id = bov.order_id INNER JOIN sys_station ss ON bo.station_id = ss.id WHERE bo.station_id IN (SELECT station_id FROM selected_station_id) AND DATE_FORMAT(bo.create_date, 'yyyy-MM-dd') IN (SELECT formatted_date FROM selected_create_date) AND bo.pay_type <> 0 AND bo.pay_type IS NOT NULL AND bo.order_status = 3 AND bo.order_type = 0 AND bo.del_flag = 0 GROUP BY bo.station_id, ss.station_name";
    Table rltest = tableEnv.sqlQuery(query);
    tableEnv.createTemporaryView("newb_temp_view", rltest);
    tableEnv.toChangelogStream(rltest).print("SQL");
    env.execute();
    

    这样,formattedDatePlaceholder变量中的占位符将被替换为实际的日期格式查询结果。

    2024-01-09 14:56:22
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载