我想在一个包含group by语句的表中在Flink SQL中进行简单查询。但是在结果中,group by语句中指定的列存在重复的行。那是因为我使用的是流媒体环境并且它不记得状态吗?
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// configure Kafka consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
props.setProperty("group.id", "flink-consumer"); // Consumer group ID
FlinkKafkaConsumer011 flinkBlocksTransactionsConsumer = new FlinkKafkaConsumer011<>(args[0], new BlocksTransactionsSchema(), props);
flinkBlocksTransactionsConsumer.setStartFromEarliest();
DataStream blocksTransactions = env.addSource(flinkBlocksTransactionsConsumer);
tableEnv.registerDataStream("blocksTransactionsTable", blocksTransactions);
Table sqlResult
= tableEnv.sqlQuery(
"SELECT block_hash, count(tx_hash) " +
"FROM blocksTransactionsTable " +
"GROUP BY block_hash");
DataStream resultStream = tableEnv
.toRetractStream(sqlResult, Row.class)
.map(t -> {
Row r = t.f1;
String field2 = r.getField(0).toString();
long count = Long.valueOf(r.getField(1).toString());
return new Test(field2, count);
})
.returns(Test.class);
resultStream.print();
resultStream.addSink(new FlinkKafkaProducer011<>("localhost:9092", "TargetTopic", new TestSchema()));
env.execute();
我对block_hash列使用group by语句,但我有几次相同的block_hash。这是print()的结果:
测试{FIELD2 = '0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1',计数= 1}测试{FIELD2 = '0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1',计数= 1}测试{FIELD2 = '0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1',计数= 2}测试{FIELD2 = '0x780aadc08c294da46e174fa287172038bba7afacf2dff41fdf0f6def03906e60',计数= 1}测试{ field2 ='0x182d31bd491527e1e93c4e44686057207ee90c6a8428308a2bd7b6a4d2e10e53',count = 1}测试{field2 ='0x182d31bd491527e1e93c4e44686057207ee90c6a8428308a2bd7b6a4d2e10e53',count = 1}
如何在不使用BatchEnvironment的情况下解决此问题?
GROUP BY在流上运行的查询必须生成更新。请考虑以下示例:
SELECT user, COUNT(*) FROM clicks GROUP BY user;
每次clicks表格都会收到一个新行,相应的计数user需要递增和更新。
将a转换Table为a时DataStream,必须在流中对这些更新进行编码。Flink使用撤消并添加消息来执行此操作。通过调用tEnv.toRetractStream(table, Row.class),您将转换Table table为DataStream。该Boolean标志很重要,表示Row是从结果表中添加还是撤消。
给定上述示例查询和输入表clicks作为
Bob | ...
Liz | ...
Bob | ...
您将收到以下撤消流
(+, (Bob, 1)) // add first result for Bob
(+, (Liz, 1)) // add first result for Liz
(-, (Bob, 1)) // remove outdated result for Bob
(+, (Bob, 2)) // add updated result for Bob
您需要自己主动维护结果,并Boolean按照撤消流的标志的指示添加和删除行。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。