开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请教个问题,有个需求,要定时从api中获取数据,插入中间表,在中间表数据插入完毕后再从中间表查询数据

请教个问题,有个需求,要定时从api中获取数据,插入中间表,在中间表数据插入完毕后再从中间表查询数据进行指标计算,用flink 改怎么进行设计呢?

展开
收起
十一0204 2023-04-11 09:33:05 404 0
3 条回答
写回答
取消 提交回答
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    在阿里云Flink中,您可以使用 Flink 的 DataStream API 来实现定时从 API 中获取数据、插入中间表、从中间表查询数据进行指标计算的功能。

    具体来说,您可以按照以下步骤进行设计:

    1. 定义数据源:使用 Flink 的 DataStream API 中的 addSource 方法定义一个数据源,从 API 中定时拉取数据。您可以使用 Flink 的 TimerService 来实现定时拉取数据的功能。

    2. 插入中间表:将从 API 中拉取的数据插入到中间表中。您可以使用 Flink 的 JDBCOutputFormat 或其他适合的输出格式来实现将数据插入到中间表中的功能。

    3. 查询中间表:使用 Flink 的 DataStream API 中的 JDBCInputFormat 或其他适合的输入格式从中间表中查询数据。如果需要计算的数据较多,您可以考虑使用 Flink 的 Table APISQL 来实现数据查询和计算。

    4. 计算指标:使用 Flink 的 DataStream API 或 Table API 对查询的数据进行指标计算。您可以使用 Flink 的 WindowKeyedStream 等功能来对数据进行分组和处理。

    2023-04-26 22:27:10
    赞同 展开评论 打赏
  • 值得去的地方都没有捷径

    针对这个需求,你可以考虑使用Apache Flink中的DataStream和Table/SQL API,将输入的数据流转换为Table,并在Table上进行查询和计算操作。整个流程可以分为以下几个步骤进行设计:

    数据提取:使用Flink的DataStream API从API接口中实时获取数据,并将数据流转换为Table。

    中间表插入:使用Flink的Table API或SQL API向中间表中插入数据。你可以使用Flink的流批一体支持,将批处理中的数据直接写入中间表的方式实现。

    中间表查询:使用Flink的Table API或SQL API从中间表中查询数据,并将其转换为Table。

    指标计算:使用Flink的Table API或SQL API对表数据进行计算,并将计算结果输出到指定的位置,比如写入消息队列等。

    在具体实现的过程中,你需要确定具体的数据提取方式和中间表的结构设计。同时,还需要考虑到数据的实时性和容错性等问题。如果要实现更高效的处理,可以考虑使用Flink的stateful Stream Processing,对中间表进行增量计算。

    2023-04-17 16:41:27
    赞同 展开评论 打赏
  • 坚持这件事孤独又漫长。

    可以使用Flink中的AsyncDataStreamAsyncFunction实现异步的数据查询和获取。

    • 首先可以使用AsyncDataStream将输入数据流转换为异步数据流,然后使用AsyncFunction实现异步查询和获取中间表数据。具体步骤如下:

      1. 使用AsyncDataStream将输入数据流转换为异步数据流
    AsyncDataStream.unorderedWait(inputDataStream, new AsyncFunction<输入类型, 输出类型>() {
        @Override
        public void asyncInvoke(输入类型 input, ResultFuture<输出类型> resultFuture) {
            // 异步查询中间表数据,并将结果放入resultFuture中
        }
    }, 1000, TimeUnit.MILLISECONDS, 100);
    

    其中,第二个参数为AsyncFunction,用于实现中间表的异步查询和获取;第三个参数为最大等待时间,用于指定查询中间表的超时时间;第四个参数为最大并发请求数,用于限制查询中间表的并发数。

    1. AsyncFunction中实现异步的中间表查询和数据插入
    public class QueryFromMiddleTableFunction extends AsyncFunction<输入类型, 输出类型> {
        @Override
        public void asyncInvoke(输入类型 input, ResultFuture<输出类型> resultFuture) {
            // 异步查询中间表数据,并将结果放入resultFuture中
            FutureCallback<中间表数据类型> callback = new FutureCallback<中间表数据类型>() {
                @Override
                public void onSuccess(@Nullable 中间表数据类型 result) {
                    // 将中间表数据插入到目标数据流中,并emit出去
                    resultFuture.complete(Collections.singleton(result));
                }
                @Override
                public void onFailure(Throwable t) {
                    resultFuture.complete(Collections.emptyList());
                }
            };
            
            // 使用异步的方式查询中间表数据
            ListenableFuture<中间表数据类型> future = // 异步查询中间表数据
            Futures.addCallback(future, callback, MoreExecutors.directExecutor());
        }
    }
    

    asyncInvoke方法中,可以使用异步方式查询中间表数据。如果查询成功,则将查询结果插入到ResultFuture中,然后再emit出去;如果查询失败,则插入空结果并emit出去。在插入数据时,需要注意将中间表数据转换为目标数据流的数据类型。

    1. 在主函数中使用异步数据流并设置输出
    DataStream<输出类型> outputDataStream = AsyncDataStream.unorderedWait(inputDataStream, new QueryFromMiddleTableFunction(), 1000, TimeUnit.MILLISECONDS, 100);
    

    在主函数中,使用AsyncDataStream将输入数据流转换为异步数据流,并使用QueryFromMiddleTableFunction实现异步中间表查询和数据插入。最后将输出数据流设置为异步数据流即可。

    2023-04-11 11:55:51
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

热门讨论

热门文章

相关电子书

更多
Spring Boot2.0实战Redis分布式缓存 立即下载
CUDA MATH API 立即下载
API PLAYBOOK 立即下载