开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flinksql select id ,count(*) from a group by id .

flinksql select id ,count(*) from a group by id . 重启都是从0开始,是不是要加什么参数

展开
收起
游客3oewgrzrf6o5c 2022-06-29 16:52:48 665 0
1 条回答
写回答
取消 提交回答
  • 十分耕耘,一定会有一分收获!

    楼主你好,在Flink SQL中,使用类似 SELECT id, COUNT(*) FROM a GROUP BY id 的语句进行聚合操作时,如果Flink作业重启,可能会导致聚合结果的丢失。因为Flink作业重启后,内存中的状态会被清空,之前的聚合结果也会被清空。

    为了避免这种情况,可以考虑在Flink SQL中使用状态后端,将聚合结果存储到分布式的状态后端中。Flink支持多种状态后端,包括内存、文件系统、HDFS、RocksDB等。其中,RocksDB是一种性能较好、可靠性较高的状态后端,可以满足大规模聚合操作的需求。

    如果您使用的是RocksDB状态后端,在Flink SQL中进行聚合操作时,需要在作业启动时,指定RocksDB状态后端的相关参数。例如,您可以通过以下方式启动Flink SQL作业:

    ./bin/flink run -s hdfs:///state/checkpoints/chk-42 -d -c org.apache.flink.yarn.YarnApplicationMasterRunner -yid application_123456789_0123 -ynm flink-job -ys 3 -yqu default -yn 5 -yjm 1024m -ytm 4096m -m yarn-cluster -p 2 -yD state.backend=rocksdb -yD state.backend.rocksdb.dir=/tmp/rocksdb -yD state.checkpoints.dir=hdfs:///state/checkpoints -yD state.checkpoints.interval=10s -yD state.checkpoints.min-pause=5s /path/to/your/flink-job.jar arg1 arg2
    

    其中,-yD state.backend=rocksdb -yD state.backend.rocksdb.dir=/tmp/rocksdb 参数指定了RocksDB作为状态后端,-yD state.checkpoints.dir=hdfs:///state/checkpoints -yD state.checkpoints.interval=10s -yD state.checkpoints.min-pause=5s 参数指定了使用HDFS作为检查点存储位置,并设置了检查点的周期和最小暂停时间。

    需要注意的是,使用状态后端会增加系统的复杂度和资源消耗,因此需要根据实际需求进行权衡和调整。如果您的聚合操作比较简单,可以考虑使用内存或文件系统作为状态后端;如果聚合操作比较复杂,或者需要处理大量数据时,可以考虑使用RocksDB作为状态后端,以提高系统性能和可靠性。

    2023-07-23 13:01:42
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
对 2000 多亿条数据做一次 group by 需要多久? 立即下载
对2000多亿条数据做一次Group By 需要多久 立即下载
Show Me The Money! Cost & Reso 立即下载