开发者社区> 问答> 正文

apache-flink - 在pyflink1.14中聚合函数无法输出的问题

大家好: 我是一个pyflink初学者,遇到一个flinksql中聚合后无法sink的问题,但是我的代码完全按照官方文档进行,到最后依然无法解决,只能像各位求助。 我的 source语句为: CREATE TABLE random_source ( amount int, course_code string, event_time TIMESTAMP(3) , WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'ding_broadcast_vip_order_test', 'properties.bootstrap.servers' = '192.168.100.135:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) sink为: CREATE TABLE print_sink ( amount BIGINT, course_code string, event_time_start TIMESTAMP(3), event_time_end TIMESTAMP(3) ) WITH ( 'connector' = 'print' ) 而我尝试过如下两种方式聚合,可是都遇到同样的问题: insert into print_sink select count(amount) as amount ,course_code ,tumble_start(event_time, interval '1' minute) as event_time_start ,tumble_end(event_time, interval '1' minute) as event_time_end from random_source group by tumble(event_time, interval '1' minute),course_code 以及: insert into print_sink select sum(amount),course_code,window_start,window_end from table( tumble( table random_source, descriptor(event_time), interval '1' minutes ) ) group by window_start, window_end,,course_code 我的kafka数据结构为 {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:44:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:45:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:46:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:47:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:48:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:49:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:50:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:51:40’} 在我的预期中他应该是按照每分钟作为窗口,然后按照course_code进行聚合,可结果是sink没有任何输出,仿佛没有接受到任何数据。但是在为仅仅是普通的select * from random_source时,sink又能够正常输出, 我实在无能为力,请各位不吝赐教:下面附上我的出问题的代码。 非常感谢各位

from pyflink.table import EnvironmentSettings, StreamTableEnvironment, TableEnvironment

def hello_world(): """ 从随机Source读取数据,然后直接利用PrintSink输出。 """ settings = EnvironmentSettings.in_streaming_mode()

env = StreamExecutionEnvironment.get_execution_environment()

t_env = StreamTableEnvironment.create(s_env)

t_env = TableEnvironment.create(settings) t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar")

source_ddl = """ CREATE TABLE random_source ( amount int, course_code string, event_time TIMESTAMP(3) , WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'ding_broadcast_vip_order_test', 'properties.bootstrap.servers' = '192.168.100.135:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """

f_sequence_ as PROCTIME() # METADATA FROM 'timestamp'

注册source

t_env.execute_sql(source_ddl)

数据提取

""" sink_ddl = """ CREATE TABLE print_sink ( amount BIGINT, course_code string, event_time_start TIMESTAMP(3), event_time_end TIMESTAMP(3) ) WITH ( 'connector' = 'print' ) """

注册sink

t_env.execute_sql(sink_ddl)

t_env.execute_sql('''insert into print_sink select count(amount) as amount ,course_code ,tumble_start(event_time, interval '1' minute) as event_time_start ,tumble_end(event_time, interval '1' minute) as event_time_end from random_source group by tumble(event_time, interval '1' minute),course_code ''').wait()

tab.insert_into('print_sink')

tab.execute_insert('print_sink', overwrite=False).wait()

执行作业

t_env.execute("Flink Hello World")

if name == 'main': hello_world()*来自志愿者整理的flink

展开
收起
雪哥哥 2021-12-08 19:36:27 1551 0
1 条回答
写回答
取消 提交回答
  • 可能是 watermark 问题:并发多,测试数据少,有些并发没有数据,导致watermark没有增长。

    如果是这个原因的话,有两种解决办法: 1)t_env.get_config().get_configuration().set_string("parallelism.default", "1") 2)t_env.get_config().get_configuration().set_string("table.exec.source.idle-timeout", "5000 ms")*来自志愿者整理的flink

    2021-12-08 19:43:02
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink技术进阶 立即下载
Apache Spark: Cloud and On-Prem 立即下载
Hybrid Cloud and Apache Spark 立即下载

相关镜像