Flink SQL:在GROUP BY查询结果中重复分组键-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

Flink SQL:在GROUP BY查询结果中重复分组键

2018-12-11 15:46:25 8133 2

我想在一个包含group by语句的表中在Flink SQL中进行简单查询。但是在结果中,group by语句中指定的列存在重复的行。那是因为我使用的是流媒体环境并且它不记得状态吗?

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// configure Kafka consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
props.setProperty("group.id", "flink-consumer"); // Consumer group ID

FlinkKafkaConsumer011 flinkBlocksTransactionsConsumer = new FlinkKafkaConsumer011<>(args[0], new BlocksTransactionsSchema(), props);
flinkBlocksTransactionsConsumer.setStartFromEarliest();

DataStream blocksTransactions = env.addSource(flinkBlocksTransactionsConsumer);

tableEnv.registerDataStream("blocksTransactionsTable", blocksTransactions);

Table sqlResult

    = tableEnv.sqlQuery(
            "SELECT block_hash, count(tx_hash) " +
            "FROM blocksTransactionsTable " +
            "GROUP BY block_hash");

DataStream resultStream = tableEnv

    .toRetractStream(sqlResult, Row.class)
    .map(t -> {
        Row r = t.f1;
        String field2 = r.getField(0).toString();
        long count = Long.valueOf(r.getField(1).toString());
        return new Test(field2, count);
    })
    .returns(Test.class);

resultStream.print();

resultStream.addSink(new FlinkKafkaProducer011<>("localhost:9092", "TargetTopic", new TestSchema()));

env.execute();
我对block_hash列使用group by语句,但我有几次相同的block_hash。这是print()的结果:

测试{FIELD2 = '0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1',计数= 1}测试{FIELD2 = '0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1',计数= 1}测试{FIELD2 = '0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1',计数= 2}测试{FIELD2 = '0x780aadc08c294da46e174fa287172038bba7afacf2dff41fdf0f6def03906e60',计数= 1}测试{ field2 ='0x182d31bd491527e1e93c4e44686057207ee90c6a8428308a2bd7b6a4d2e10e53',count = 1}测试{field2 ='0x182d31bd491527e1e93c4e44686057207ee90c6a8428308a2bd7b6a4d2e10e53',count = 1}

如何在不使用BatchEnvironment的情况下解决此问题?

取消 提交回答
全部回答(2)
  • 游客gpmud5qvg7z52
    2020-08-04 15:31:09

    我也遇到这个问题了,group by之后分组字段还能重复, 楼主怎么解决的?

    1 0
  • 社区小助手
    2019-07-17 23:19:47

    GROUP BY在流上运行的查询必须生成更新。请考虑以下示例:

    SELECT user, COUNT(*) FROM clicks GROUP BY user;
    每次clicks表格都会收到一个新行,相应的计数user需要递增和更新。

    将a转换Table为a时DataStream,必须在流中对这些更新进行编码。Flink使用撤消并添加消息来执行此操作。通过调用tEnv.toRetractStream(table, Row.class),您将转换Table table为DataStream。该Boolean标志很重要,表示Row是从结果表中添加还是撤消。

    给定上述示例查询和输入表clicks作为

    user | ...

    Bob | ...
    Liz | ...
    Bob | ...
    您将收到以下撤消流

    (+, (Bob, 1)) // add first result for Bob
    (+, (Liz, 1)) // add first result for Liz
    (-, (Bob, 1)) // remove outdated result for Bob
    (+, (Bob, 2)) // add updated result for Bob
    您需要自己主动维护结果,并Boolean按照撤消流的标志的指示添加和删除行。

    0 2
添加回答
相关问答

0

回答

Django与SQL Server数据库连接时出错

2019-12-03 22:30:52 463浏览量 回答数 0

1

回答

SQL Server-批量插入-FIELDQUOTE无法识别双引号

2019-12-03 22:21:37 1016浏览量 回答数 1

1

回答

如何在SQL Server添加一个列的值,但只有在艺术家是相同的?

2019-12-02 23:25:08 152浏览量 回答数 1

1

回答

从PL/SQL Developer连接到MS SQL server

2019-12-02 23:08:25 266浏览量 回答数 1

0

回答

SQL Server:删除4列匹配的重复项,但将具有特定值的重复项保留在另一列中

2019-12-02 22:51:50 327浏览量 回答数 0

0

回答

怎样从SQL Server表自动发送电子邮件给多个用户?(关闭)

2019-12-02 22:50:05 397浏览量 回答数 0

0

回答

SQL Server实例管理页面简介

2017-10-17 17:13:45 1411浏览量 回答数 0

1

回答

SQLServer性能数据解析

2017-06-23 11:00:16 3830浏览量 回答数 1

2

回答

SQL SERVER 查询都有哪些权限

2016-06-08 15:37:06 2904浏览量 回答数 2

1

回答

SQL Server转 MySQL的方法或工具

2016-02-14 15:21:07 2084浏览量 回答数 1
+关注
社区小助手
社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。
文章
问答
问答排行榜
最热
最新
相关电子书
更多
时序数据库TSDB新功能 - 如何用SQL进行时序查询
立即下载
Flink SQL in 2020
立即下载
MaxCompute SQL计算成本调优以及优化方法
立即下载