大家好: 我是一个pyflink初学者,遇到一个flinksql中聚合后无法sink的问题,但是我的代码完全按照官方文档进行,到最后依然无法解决,只能像各位求助。 我的 source语句为: CREATE TABLE random_source ( amount int, course_code string, event_time
TIMESTAMP(3) , WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'ding_broadcast_vip_order_test', 'properties.bootstrap.servers' = '192.168.100.135:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) sink为: CREATE TABLE print_sink ( amount BIGINT, course_code string, event_time_start
TIMESTAMP(3), event_time_end
TIMESTAMP(3) ) WITH ( 'connector' = 'print' ) 而我尝试过如下两种方式聚合,可是都遇到同样的问题: insert into print_sink select count(amount) as amount ,course_code ,tumble_start(event_time, interval '1' minute) as event_time_start ,tumble_end(event_time, interval '1' minute) as event_time_end from random_source group by tumble(event_time, interval '1' minute),course_code 以及: insert into print_sink select sum(amount),course_code,window_start,window_end from table( tumble( table random_source, descriptor(event_time), interval '1' minutes ) ) group by window_start, window_end,,course_code 我的kafka数据结构为 {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:44:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:45:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:46:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:47:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:48:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:49:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:50:40’} {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:51:40’} 在我的预期中他应该是按照每分钟作为窗口,然后按照course_code进行聚合,可结果是sink没有任何输出,仿佛没有接受到任何数据。但是在为仅仅是普通的select * from random_source时,sink又能够正常输出, 我实在无能为力,请各位不吝赐教:下面附上我的出问题的代码。 非常感谢各位
from pyflink.table import EnvironmentSettings, StreamTableEnvironment, TableEnvironment
def hello_world(): """ 从随机Source读取数据,然后直接利用PrintSink输出。 """ settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(settings) t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar")
source_ddl = """ CREATE TABLE random_source ( amount int, course_code string, event_time
TIMESTAMP(3) , WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'ding_broadcast_vip_order_test', 'properties.bootstrap.servers' = '192.168.100.135:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """
t_env.execute_sql(source_ddl)
""" sink_ddl = """ CREATE TABLE print_sink ( amount BIGINT, course_code string, event_time_start
TIMESTAMP(3), event_time_end
TIMESTAMP(3) ) WITH ( 'connector' = 'print' ) """
t_env.execute_sql(sink_ddl)
t_env.execute_sql('''insert into print_sink select count(amount) as amount ,course_code ,tumble_start(event_time, interval '1' minute) as event_time_start ,tumble_end(event_time, interval '1' minute) as event_time_end from random_source group by tumble(event_time, interval '1' minute),course_code ''').wait()
if name == 'main': hello_world()*来自志愿者整理的flink
可能是 watermark 问题:并发多,测试数据少,有些并发没有数据,导致watermark没有增长。
如果是这个原因的话,有两种解决办法: 1)t_env.get_config().get_configuration().set_string("parallelism.default", "1") 2)t_env.get_config().get_configuration().set_string("table.exec.source.idle-timeout", "5000 ms")*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。