Flink SQL支持在一个程序中写多个insert语句,可以将一个查询的结果插入到多个表中,也可以将多个查询的结果插入到不同的表中。有两种方式可以执行多个insert语句:
以下是一个使用StatementSet对象执行多个insert语句的示例:
// 注册一个 "Orders" 源表,和 "RubberOrders" 和 "GlassOrders" 结果表
tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
tEnv.executeSql("CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...)");
tEnv.executeSql("CREATE TABLE GlassOrders (product STRING, amount INT) WITH (...)");
// 运行多个 INSERT 语句,将原表数据输出到多个结果表中
StatementSet stmtSet = tEnv.createStatementSet();
// `addInsertSql` 方法每次只接收单条 INSERT 语句
stmtSet.addInsertSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
stmtSet.addInsertSql(
"INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'");
// 执行刚刚添加的所有 INSERT 语句
TableResult tableResult = stmtSet.execute();
"tableEnv.createStatementSet()批量insert 类似这种 tableEnv.createStatementSet() .addInsert(""aaa"", table1) .addInsert(""bbb"", table2) .addInsert(""ccc"", table3) .execute();此回答整理自钉群“【③群】Apache Flink China社区”"
在Flink中,您可以使用多个“insert”语句将数据插入到不同的目标中。例如,您可以将一部分数据插入到MySQL数据库中,将另一部分数据插入到Elasticsearch中。
下面是一个示例,演示如何使用多个“insert”语句将数据插入到不同的目标中:
DataStream<Tuple2<String, Integer>> dataStream = ... DataStreamSink<Tuple2<String, Integer>> mysqlSink = dataStream .addSink(new StreamingFileSink
可以,使用 Flink 的 Table API 或 SQL API 可以实现多个 insert 语句,使用BatchTableEnvironment 对象类。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。