开发者社区> 问答> 正文

pyflink资源优化问题,请教如何解决?

你好: 业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。

现在的使用方式: 1、slide_window = Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w") 2、使用sql语句注册kafka connector, 3、result table使用普通的print: CREATE TABLE sink ( city_id STRING ,

start_time TIMESTAMP ,

end_time TIMESTAMP ,

flag STRING

) with ( 'connector' = 'print' ) 4、通过udaf函数,把source的数据写入csv文件,source.select("write_csv(xxxx)"),然后调用计算函数,读取csv文件内容 5、触发计算逻辑通过select里调用自定义函数strategy_flow_entry的方式:source.window(slide_window).group_by("w, city_id").select("strategy_flow_entry(concat_result)").execute_insert("sink").wait()

这种方式在运行过程中,总是出各种各样问题,比如数据延迟等。

所以想请教一下: 1、针对这个场景,推荐的使用方式是什么?目前的使用方式是不是不太对 2、推荐的任务提交参数要怎么设置?cpu core、内存、并发数、slot等

多谢*来自志愿者整理的flink

展开
收起
雪哥哥 2021-12-05 12:20:40 629 0
1 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载