Flink版本:1.10.2
使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。
本地测试的结果是一直重复输出数据。
请问一下DataStream 处理之后,怎么才能注册为 Table。
代码如下:
// 异步redis处理 RedisAsyncFunction asyncFunction = new RedisAsyncFunction(node, aggProcessorArgs);
// 获取异步处理流 DataStream result = AsyncDataStream.orderedWait( dataStream, asyncFunction, 60L, TimeUnit.SECONDS, 100).returns(outRowTypeInfo);
// 注册为临时 table tabEnv.createTemporaryView("test_table", result, outRowFields.stream().collect(Collectors.joining(",")));
// result.print("out_table>>"); Table test_table = tabEnv.sqlQuery("select * from test_table");
// 查询临时table tabEnv.toAppendStream(test_table, Row.class).print("test_table");
--
tili
***************************************来自志愿者整理的flink邮件归档
我看你这不是注册进去了么? 有报什么错么?
最后提交作业执行记得调用 StreamExecutionEnvironment.execute()*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。