开发者社区> 问答> 正文

flink 1.11 executeSql查询kafka表print没有输出

本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的), sql如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

Catalog catalog = new HiveCatalog("x", "default", "D:\conf", "1.1.0");

tEnv.registerCatalog("x", catalog);

TableResult execute = tEnv.executeSql("select * from x.ods.ods_binlog_test_trip_create_t_order_1");

execute.print();

建表语句如下:

CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 (

_type STRING,

order_no STRING,

order_time STRING,

dt as TO_TIMESTAMP(order_time),

proctime as PROCTIME(),

WATERMARK FOR dt AS dt - INTERVAL '5' SECOND

) WITH (

'connector.type' = 'kafka',

'connector.properties.bootstrap.servers' = '***',

'connector.properties.zookeeper.connect' = '****',

'connector.version' = 'universal',

'format.type' = 'json',

'connector.properties.group.id' = 'testGroup',

'connector.startup-mode' = 'group-offsets',

'connector.topic' = 'test'

)

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-06 15:11:41 1224 0
1 条回答
写回答
取消 提交回答
  • 1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下, 都是exactly once语义,需要配置checkpoint才能得到结果。

    *来自志愿者整理的flink邮件归档

    2021-12-06 16:56:53
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载