开发者社区> 问答> 正文

[flink-1.10.2] 异步IO结果DataStream 该如何注册为table??

Flink版本:1.10.2

使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。

本地测试的结果是一直重复输出数据。

请问一下DataStream 处理之后,怎么才能注册为 Table。


代码如下:

// 异步redis处理 RedisAsyncFunction asyncFunction = new RedisAsyncFunction(node, aggProcessorArgs);

// 获取异步处理流 DataStream result = AsyncDataStream.orderedWait( dataStream, asyncFunction, 60L, TimeUnit.SECONDS, 100).returns(outRowTypeInfo);

// 注册为临时 table tabEnv.createTemporaryView("test_table", result, outRowFields.stream().collect(Collectors.joining(",")));

// result.print("out_table>>"); Table test_table = tabEnv.sqlQuery("select * from test_table");

// 查询临时table tabEnv.toAppendStream(test_table, Row.class).print("test_table");

--


tili

***************************************来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-06 12:15:06 721 0
1 条回答
写回答
取消 提交回答
  • 我看你这不是注册进去了么? 有报什么错么?

    最后提交作业执行记得调用 StreamExecutionEnvironment.execute()*来自志愿者整理的flink邮件归档

    2021-12-06 13:18:51
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载