生产实践 | 基于 Flink 的短视频生产消费监控

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 本文详细介绍了实时监控类指标的数据流转链路以及技术方案,大多数的实时监控类指标都可按照本文中的几种方案实现。

本文详细介绍了实时监控类指标的数据流转链路以及技术方案,大多数的实时监控类指标都可按照本文中的几种方案实现。

短视频生产消费监控

短视频带来了全新的传播场域和节目形态,小屏幕、快节奏成为行业潮流的同时,也催生了新的用户消费习惯,为创作者和商户带来收益。而多元化的短视频也可以为品牌方提供营销机遇。

其中对于垂类生态短视频的生产消费热点的监控分析目前成为了实时数据处理很常见的一个应用场景,比如对某个圈定的垂类生态下的视频生产或者视频消费进行监控,对热点视频生成对应的优化推荐策略,促进热点视频的生产或者消费,构建整个生产消费数据链路的闭环,从而提高创作者收益以及消费者留存。

本文将完整分析垂类生态短视频生产消费数据的整条链路流转方式,并基于 Flink 提供几种对于垂类视频生产消费监控的方案设计。通过本文,你可以了解到:

垂类生态短视频生产消费数据链路闭环

实时监控短视频生产消费的方案设计

不同监控量级场景下的代码实现

flink 学习资料

项目简介

垂类生态短视频生产消费数据链路流转架构图如下,此数据流转图也适用于其他场景:

image.png

链路

在上述场景中,用户生产和消费短视频,从而客户端、服务端以及数据库会产生相应的行为操作日志,这些日志会通过日志抽取中间件抽取到消息队列中,我们目前的场景中是使用 Kafka 作为消息队列;然后使用 flink 对垂类生态中的视频进行生产或消费监控(内容生产通常是圈定垂类作者 id 池,内容消费通常是圈定垂类视频 id 池),最后将实时聚合数据产出到下游;下游可以以数据服务,实时看板的方式展现,运营同学或者自动化工具最终会帮助我们分析当前垂类下的生产或者消费热点,从而生成推荐策略。

方案设计

image.png

架构

其中数据源如下:

Kafka 为全量内容生产和内容消费的日志。

Rpc/Http/Mysql/配置中心/Redis/HBase 为需要监控的垂类生态内容 id 池(内容生产则为作者 id 池,内容消费则为视频 id 池),其主要是提供给运营同学动态配置需要监控的 id 范围,其可以在 flink 中进行实时查询,解析运营同学想要的监控指标范围,以及监控的指标和计算方式,然后加工数据产出,可以支持随时配置,实时数据随时计算产出。

其中数据汇为聚类好的内容生产或者消费热点话题或者事件指标:

Redis/HBase 主要是以低延迟(Redis 5ms p99,HBase 100ms p99,不同公司的服务能力不同)并且高 QPS 提供数据服务,给 Server 端或者线上用户提供低延迟的数据查询。

Druid/Mysql 可以做为 OLAP 引擎为 BI 分析提供灵活的上卷下钻聚合分析能力,供运营同学配置可视化图表使用。

Kafka 可以以流式数据产出,从而提供给下游继续消费或者进行特征提取。

废话不多说,我们直接上方案和代码,下述几种方案按照监控 id 范围量级区分,不同的量级对应着不同的方案,其中的代码示例为 ProcessWindowFunction,也可以使用 AggregateFunction 代替,其中主要监控逻辑都相同。

方案 1

适合监控 id 数据量小的场景(几千 id),其实现方式是在 flink 任务初始化时将需要监控的 id 池或动态配置中心的 id 池加载到内存当中,之后只需要在内存中判断内容生产或者消费数据是否在这个监控池当中。

ProcessWindowFunction p = new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {
    
    // 配置中心动态 id 池
    private Config<Set<Long>> needMonitoredIdsConfig;

    @Override
    public void open(Configuration parameters) throws Exception {
        this.needMonitoredIdsConfig = ConfigBuilder
                .buildSet("needMonitoredIds", Long.class);
    }

    @Override
    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
        Set<Long> needMonitoredIds = needMonitoredIdsConfig.get();
        /**
         * 判断 commonModel 中的 id 是否在 needMonitoredIds 池中
         */
    }
}

监控的 id 池可以按照固定或者可配置从而分出两种获取方式:第一种是在 flink 任务开始时就全部加载进内存中,这种方式适合监控 id 池不变的情况;第二种是使用动态配置中心,每次都从配置中心访问到最新的监控 id 池,其可以满足动态配置或者更改 id 池的需求,并且这种实现方式通常可以实时感知到配置更改,几乎无延迟。

方案 2

适合监控 id 数据量适中(几十万 id),监控数据范围会不定时发生变动的场景。其实现方式是在 flink 算子中定时访问接口获取最新的监控 id 池,以获取最新监控数据范围。

ProcessWindowFunction p = new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {

    private long lastRefreshTimestamp;

    private Set<Long> needMonitoredIds;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.refreshNeedMonitoredIds(System.currentTimeMillis());
    }

    @Override
    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
        long windowStart = context.window().getStart();
        this.refreshNeedMonitoredIds(windowStart);
        /**
         * 判断 commonModel 中的 id 是否在 needMonitoredIds 池中
         */
    }

    public void refreshNeedMonitoredIds(long windowStart) {
        // 每隔 10 秒访问一次
        if (windowStart - this.lastRefreshTimestamp >= 10000L) {
            this.lastRefreshTimestamp = windowStart;
            this.needMonitoredIds = Rpc.get(...)
        }
    }
}

根据上述代码实现方式,按照时间间隔的方式刷新 id 池,其缺点在于不能实时感知监控 id 池的变化,所以刷新时间可能会和需求场景强耦合(如果 id 池会频繁更新,那么就需要缩小刷新时间间隔)。也可根据需求场景在每个窗口开始前刷新 id 池,这样可保证每个窗口中的 id 池中的数据一直保持更新。

方案 3

方案 3 对方案 2 的一个优化(几十万 id,我们生产环境中最常用的)。其实现方式是在 flink 中使用 broadcast 算子定时访问监控 id 池,并将 id 池以广播的形式下发给下游参与计算的各个算子。其优化点在于:比如任务的并行度为 500,每 1s 访问一次,采用方案 2 则访问监控 id 池接口的 QPS 为 500,在使用 broadcast 算子之后,其访问 QPS 可以减少到 1,可以大大减少对接口的访问量,减轻接口压力。

public class Example {

    @Slf4j
    static class NeedMonitorIdsSource implements SourceFunction<Map<Long, Set<Long>>> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Map<Long, Set<Long>>> sourceContext) throws Exception {
            while (!this.isCancel) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    Set<Long> needMonitorIds = Rpc.get(...);
                    // 可以和上一次访问的数据做比较查看是否有变化,如果有变化,才发送出去
                    if (CollectionUtils.isNotEmpty(needMonitorIds)) {
                        sourceContext.collect(new HashMap<Long, Set<Long>>() {{
                            put(0L, needMonitorIds);
                        }});
                    }
                } catch (Throwable e) {
                    // 防止接口访问失败导致的错误导致 flink job 挂掉
                    log.error("need monitor ids error", e);
                }
            }
        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }
    }

    public static void main(String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        InputParams inputParams = new InputParams(parameterTool);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        final MapStateDescriptor<Long, Set<Long>> broadcastMapStateDescriptor = new MapStateDescriptor<>(
                "config-keywords",
                BasicTypeInfo.LONG_TYPE_INFO,
                TypeInformation.of(new TypeHint<Set<Long>>() {
                }));

        /********************* kafka source *********************/
        BroadcastStream<Map<Long, Set<Long>>> broadcastStream = env
                .addSource(new NeedMonitorIdsSource()) // redis photoId 数据广播
                .setParallelism(1)
                .broadcast(broadcastMapStateDescriptor);

        DataStream<CommonModel> logSourceDataStream = SourceFactory.getSourceDataStream(...);

        /********************* dag *********************/
        DataStream<CommonModel> resultDataStream = logSourceDataStream
                .keyBy(KeySelectorFactory.getStringKeySelector(CommonModel::getKeyField))
                .connect(broadcastStream)
                .process(new KeyedBroadcastProcessFunction<String, CommonModel, Map<Long, Set<Long>>, CommonModel>() {

                    private Set<Long> needMonitoredIds;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        this.needMonitoredIds = Rpc.get(...)
                    }

                    @Override
                    public void processElement(CommonModel commonModel, ReadOnlyContext readOnlyContext, Collector<CommonModel> collector) throws Exception {
                        // 判断 commonModel 中的 id 是否在 needMonitoredIds 池中
                    }

                    @Override
                    public void processBroadcastElement(Map<Long, Set<Long>> longSetMap, Context context, Collector<CommonModel> collector) throws Exception {
                        // 需要监控的字段
                        Set<Long> needMonitorIds = longSetMap.get(0L);
                        if (CollectionUtils.isNotEmpty(needMonitorIds)) {
                            this.needMonitoredIds = needMonitorIds;
                        }
                    }
                });

        /********************* kafka sink *********************/
        SinkFactory.setSinkDataStream(...);
        
        env.execute(inputParams.jobName);
    }

}

方案 4

适合于超大监控范围的数据(几百万,我们自己的生产实践中使用扩量到 500 万)。其原理是将监控范围接口按照 id 按照一定规则分桶。flink 消费到日志数据后将 id 按照 监控范围接口 id 相同的分桶方法进行分桶 keyBy,这样在下游算子中每个算子中就可以按照桶变量值,从接口中拿到对应桶的监控 id 数据,这样 flink 中并行的每个算子只需要获取到自己对应的桶的数据,可以大大减少请求的压力。

public class Example {

    public static void main(String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        InputParams inputParams = new InputParams(parameterTool);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        final MapStateDescriptor<Long, Set<Long>> broadcastMapStateDescriptor = new MapStateDescriptor<>(
                "config-keywords",
                BasicTypeInfo.LONG_TYPE_INFO,
                TypeInformation.of(new TypeHint<Set<Long>>() {
                }));

        /********************* kafka source *********************/

        DataStream<CommonModel> logSourceDataStream = SourceFactory.getSourceDataStream(...);

        /********************* dag *********************/
        DataStream<CommonModel> resultDataStream = logSourceDataStream
                .keyBy(KeySelectorFactory.getLongKeySelector(CommonModel::getKeyField))
                .timeWindow(Time.seconds(inputParams.accTimeWindowSeconds))
                .process(new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {

                    private long lastRefreshTimestamp;

                    private Set<Long> oneBucketNeedMonitoredIds;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                    }

                    @Override
                    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
                        long windowStart = context.window().getStart();
                        this.refreshNeedMonitoredIds(windowStart, bucket);
                        /**
                         * 判断 commonModel 中的 id 是否在 needMonitoredIds 池中
                         */
                    }

                    public void refreshNeedMonitoredIds(long windowStart, long bucket) {
                        // 每隔 10 秒访问一次
                        if (windowStart - this.lastRefreshTimestamp >= 10000L) {
                            this.lastRefreshTimestamp = windowStart;
                            this.oneBucketNeedMonitoredIds = Rpc.get(bucket, ...)
                        }
                    }
                });

        /********************* kafka sink *********************/
        SinkFactory.setSinkDataStream(...);

        env.execute(inputParams.jobName);
    }
}

总结

本文首先介绍了,在短视频领域中,短视频生产消费数据链路的整个闭环,并且其数据链路闭环一般情况下也适用于其他场景;以及对应的实时监控方案的设计和不同场景下的代码实现,包括:

垂类生态短视频生产消费数据链路闭环:用户操作行为日志的流转,日志上传,实时计算,以及流转到 BI,数据服务,最后数据赋能的整个流程

实时监控方案设计:监控类实时计算流程中各类数据源,数据汇的选型

监控 id 池在不同量级场景下具体代码实现

学习资料

https://github.com/flink-china/flink-training-course/blob/master/README.md
https://ververica.cn/developers-resources/
https://space.bilibili.com/33807709

作者:YangYichao
来源:Flink 微信公众号
原文链接:https://mp.weixin.qq.com/s/t_hbmx_xHly9y0nBcZmtnQ

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
163 15
|
22天前
|
消息中间件 监控 数据可视化
实时计算Flink场景实践和核心功能体验
本文详细评测了阿里云实时计算Flink版,从产品引导、文档帮助、功能满足度等方面进行了全面分析。产品界面设计友好,文档丰富实用,数据开发和运维体验优秀,具备出色的实时性和动态扩展性。同时,提出了针对业务场景的改进建议,包括功能定制化增强、高级分析功能拓展及可视化功能提升。文章还探讨了产品与阿里云内部产品及第三方工具的联动潜力,展示了其在多云架构和跨平台应用中的广阔前景。
54 9
|
23天前
|
运维 监控 安全
实时计算Flink场景实践和核心功能体验
实时计算Flink场景实践和核心功能体验
|
24天前
|
运维 数据可视化 数据处理
实时计算Flink场景实践和核心功能体验 评测
实时计算Flink场景实践和核心功能体验 评测
51 4
|
13天前
|
数据采集 运维 搜索推荐
实时计算Flink场景实践
在数字化时代,实时数据处理愈发重要。本文分享了作者使用阿里云实时计算Flink版和流式数据湖仓Paimon的体验,展示了其在电商场景中的应用,包括数据抽取、清洗、关联和聚合,突出了系统的高效、稳定和低延迟特点。
43 0
|
2月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
536 14
Flink CDC 在货拉拉的落地与实践
|
3月前
|
Oracle 关系型数据库 新能源
Flink CDC 在新能源制造业的实践
本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
445 13
Flink CDC 在新能源制造业的实践
|
1月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
40 0
|
3月前
|
消息中间件 监控 关系型数据库
实时计算 Flink版产品使用问题之运行后,怎么进行监控和报警
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
机器学习/深度学习 监控 Serverless
Serverless 应用的监控与调试问题之Flink在内部使用的未来规划,以及接下来有什么打算贡献社区的创新技术
Serverless 应用的监控与调试问题之Flink在内部使用的未来规划,以及接下来有什么打算贡献社区的创新技术