生产实践 | 基于 Flink 的短视频生产消费监控-阿里云开发者社区

开发者社区> 凌云时刻> 正文

生产实践 | 基于 Flink 的短视频生产消费监控

简介: 本文详细介绍了实时监控类指标的数据流转链路以及技术方案,大多数的实时监控类指标都可按照本文中的几种方案实现。

本文详细介绍了实时监控类指标的数据流转链路以及技术方案,大多数的实时监控类指标都可按照本文中的几种方案实现。

短视频生产消费监控

短视频带来了全新的传播场域和节目形态,小屏幕、快节奏成为行业潮流的同时,也催生了新的用户消费习惯,为创作者和商户带来收益。而多元化的短视频也可以为品牌方提供营销机遇。

其中对于垂类生态短视频的生产消费热点的监控分析目前成为了实时数据处理很常见的一个应用场景,比如对某个圈定的垂类生态下的视频生产或者视频消费进行监控,对热点视频生成对应的优化推荐策略,促进热点视频的生产或者消费,构建整个生产消费数据链路的闭环,从而提高创作者收益以及消费者留存。

本文将完整分析垂类生态短视频生产消费数据的整条链路流转方式,并基于 Flink 提供几种对于垂类视频生产消费监控的方案设计。通过本文,你可以了解到:

垂类生态短视频生产消费数据链路闭环

实时监控短视频生产消费的方案设计

不同监控量级场景下的代码实现

flink 学习资料

项目简介

垂类生态短视频生产消费数据链路流转架构图如下,此数据流转图也适用于其他场景:

image.png

链路

在上述场景中,用户生产和消费短视频,从而客户端、服务端以及数据库会产生相应的行为操作日志,这些日志会通过日志抽取中间件抽取到消息队列中,我们目前的场景中是使用 Kafka 作为消息队列;然后使用 flink 对垂类生态中的视频进行生产或消费监控(内容生产通常是圈定垂类作者 id 池,内容消费通常是圈定垂类视频 id 池),最后将实时聚合数据产出到下游;下游可以以数据服务,实时看板的方式展现,运营同学或者自动化工具最终会帮助我们分析当前垂类下的生产或者消费热点,从而生成推荐策略。

方案设计

image.png

架构

其中数据源如下:

Kafka 为全量内容生产和内容消费的日志。

Rpc/Http/Mysql/配置中心/Redis/HBase 为需要监控的垂类生态内容 id 池(内容生产则为作者 id 池,内容消费则为视频 id 池),其主要是提供给运营同学动态配置需要监控的 id 范围,其可以在 flink 中进行实时查询,解析运营同学想要的监控指标范围,以及监控的指标和计算方式,然后加工数据产出,可以支持随时配置,实时数据随时计算产出。

其中数据汇为聚类好的内容生产或者消费热点话题或者事件指标:

Redis/HBase 主要是以低延迟(Redis 5ms p99,HBase 100ms p99,不同公司的服务能力不同)并且高 QPS 提供数据服务,给 Server 端或者线上用户提供低延迟的数据查询。

Druid/Mysql 可以做为 OLAP 引擎为 BI 分析提供灵活的上卷下钻聚合分析能力,供运营同学配置可视化图表使用。

Kafka 可以以流式数据产出,从而提供给下游继续消费或者进行特征提取。

废话不多说,我们直接上方案和代码,下述几种方案按照监控 id 范围量级区分,不同的量级对应着不同的方案,其中的代码示例为 ProcessWindowFunction,也可以使用 AggregateFunction 代替,其中主要监控逻辑都相同。

方案 1

适合监控 id 数据量小的场景(几千 id),其实现方式是在 flink 任务初始化时将需要监控的 id 池或动态配置中心的 id 池加载到内存当中,之后只需要在内存中判断内容生产或者消费数据是否在这个监控池当中。

ProcessWindowFunction p = new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {
    
    // 配置中心动态 id 池
    private Config<Set<Long>> needMonitoredIdsConfig;

    @Override
    public void open(Configuration parameters) throws Exception {
        this.needMonitoredIdsConfig = ConfigBuilder
                .buildSet("needMonitoredIds", Long.class);
    }

    @Override
    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
        Set<Long> needMonitoredIds = needMonitoredIdsConfig.get();
        /**
         * 判断 commonModel 中的 id 是否在 needMonitoredIds 池中
         */
    }
}

监控的 id 池可以按照固定或者可配置从而分出两种获取方式:第一种是在 flink 任务开始时就全部加载进内存中,这种方式适合监控 id 池不变的情况;第二种是使用动态配置中心,每次都从配置中心访问到最新的监控 id 池,其可以满足动态配置或者更改 id 池的需求,并且这种实现方式通常可以实时感知到配置更改,几乎无延迟。

方案 2

适合监控 id 数据量适中(几十万 id),监控数据范围会不定时发生变动的场景。其实现方式是在 flink 算子中定时访问接口获取最新的监控 id 池,以获取最新监控数据范围。

ProcessWindowFunction p = new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {

    private long lastRefreshTimestamp;

    private Set<Long> needMonitoredIds;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.refreshNeedMonitoredIds(System.currentTimeMillis());
    }

    @Override
    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
        long windowStart = context.window().getStart();
        this.refreshNeedMonitoredIds(windowStart);
        /**
         * 判断 commonModel 中的 id 是否在 needMonitoredIds 池中
         */
    }

    public void refreshNeedMonitoredIds(long windowStart) {
        // 每隔 10 秒访问一次
        if (windowStart - this.lastRefreshTimestamp >= 10000L) {
            this.lastRefreshTimestamp = windowStart;
            this.needMonitoredIds = Rpc.get(...)
        }
    }
}

根据上述代码实现方式,按照时间间隔的方式刷新 id 池,其缺点在于不能实时感知监控 id 池的变化,所以刷新时间可能会和需求场景强耦合(如果 id 池会频繁更新,那么就需要缩小刷新时间间隔)。也可根据需求场景在每个窗口开始前刷新 id 池,这样可保证每个窗口中的 id 池中的数据一直保持更新。

方案 3

方案 3 对方案 2 的一个优化(几十万 id,我们生产环境中最常用的)。其实现方式是在 flink 中使用 broadcast 算子定时访问监控 id 池,并将 id 池以广播的形式下发给下游参与计算的各个算子。其优化点在于:比如任务的并行度为 500,每 1s 访问一次,采用方案 2 则访问监控 id 池接口的 QPS 为 500,在使用 broadcast 算子之后,其访问 QPS 可以减少到 1,可以大大减少对接口的访问量,减轻接口压力。

public class Example {

    @Slf4j
    static class NeedMonitorIdsSource implements SourceFunction<Map<Long, Set<Long>>> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Map<Long, Set<Long>>> sourceContext) throws Exception {
            while (!this.isCancel) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    Set<Long> needMonitorIds = Rpc.get(...);
                    // 可以和上一次访问的数据做比较查看是否有变化,如果有变化,才发送出去
                    if (CollectionUtils.isNotEmpty(needMonitorIds)) {
                        sourceContext.collect(new HashMap<Long, Set<Long>>() {{
                            put(0L, needMonitorIds);
                        }});
                    }
                } catch (Throwable e) {
                    // 防止接口访问失败导致的错误导致 flink job 挂掉
                    log.error("need monitor ids error", e);
                }
            }
        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }
    }

    public static void main(String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        InputParams inputParams = new InputParams(parameterTool);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        final MapStateDescriptor<Long, Set<Long>> broadcastMapStateDescriptor = new MapStateDescriptor<>(
                "config-keywords",
                BasicTypeInfo.LONG_TYPE_INFO,
                TypeInformation.of(new TypeHint<Set<Long>>() {
                }));

        /********************* kafka source *********************/
        BroadcastStream<Map<Long, Set<Long>>> broadcastStream = env
                .addSource(new NeedMonitorIdsSource()) // redis photoId 数据广播
                .setParallelism(1)
                .broadcast(broadcastMapStateDescriptor);

        DataStream<CommonModel> logSourceDataStream = SourceFactory.getSourceDataStream(...);

        /********************* dag *********************/
        DataStream<CommonModel> resultDataStream = logSourceDataStream
                .keyBy(KeySelectorFactory.getStringKeySelector(CommonModel::getKeyField))
                .connect(broadcastStream)
                .process(new KeyedBroadcastProcessFunction<String, CommonModel, Map<Long, Set<Long>>, CommonModel>() {

                    private Set<Long> needMonitoredIds;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        this.needMonitoredIds = Rpc.get(...)
                    }

                    @Override
                    public void processElement(CommonModel commonModel, ReadOnlyContext readOnlyContext, Collector<CommonModel> collector) throws Exception {
                        // 判断 commonModel 中的 id 是否在 needMonitoredIds 池中
                    }

                    @Override
                    public void processBroadcastElement(Map<Long, Set<Long>> longSetMap, Context context, Collector<CommonModel> collector) throws Exception {
                        // 需要监控的字段
                        Set<Long> needMonitorIds = longSetMap.get(0L);
                        if (CollectionUtils.isNotEmpty(needMonitorIds)) {
                            this.needMonitoredIds = needMonitorIds;
                        }
                    }
                });

        /********************* kafka sink *********************/
        SinkFactory.setSinkDataStream(...);
        
        env.execute(inputParams.jobName);
    }

}

方案 4

适合于超大监控范围的数据(几百万,我们自己的生产实践中使用扩量到 500 万)。其原理是将监控范围接口按照 id 按照一定规则分桶。flink 消费到日志数据后将 id 按照 监控范围接口 id 相同的分桶方法进行分桶 keyBy,这样在下游算子中每个算子中就可以按照桶变量值,从接口中拿到对应桶的监控 id 数据,这样 flink 中并行的每个算子只需要获取到自己对应的桶的数据,可以大大减少请求的压力。

public class Example {

    public static void main(String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        InputParams inputParams = new InputParams(parameterTool);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        final MapStateDescriptor<Long, Set<Long>> broadcastMapStateDescriptor = new MapStateDescriptor<>(
                "config-keywords",
                BasicTypeInfo.LONG_TYPE_INFO,
                TypeInformation.of(new TypeHint<Set<Long>>() {
                }));

        /********************* kafka source *********************/

        DataStream<CommonModel> logSourceDataStream = SourceFactory.getSourceDataStream(...);

        /********************* dag *********************/
        DataStream<CommonModel> resultDataStream = logSourceDataStream
                .keyBy(KeySelectorFactory.getLongKeySelector(CommonModel::getKeyField))
                .timeWindow(Time.seconds(inputParams.accTimeWindowSeconds))
                .process(new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {

                    private long lastRefreshTimestamp;

                    private Set<Long> oneBucketNeedMonitoredIds;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                    }

                    @Override
                    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
                        long windowStart = context.window().getStart();
                        this.refreshNeedMonitoredIds(windowStart, bucket);
                        /**
                         * 判断 commonModel 中的 id 是否在 needMonitoredIds 池中
                         */
                    }

                    public void refreshNeedMonitoredIds(long windowStart, long bucket) {
                        // 每隔 10 秒访问一次
                        if (windowStart - this.lastRefreshTimestamp >= 10000L) {
                            this.lastRefreshTimestamp = windowStart;
                            this.oneBucketNeedMonitoredIds = Rpc.get(bucket, ...)
                        }
                    }
                });

        /********************* kafka sink *********************/
        SinkFactory.setSinkDataStream(...);

        env.execute(inputParams.jobName);
    }
}

总结

本文首先介绍了,在短视频领域中,短视频生产消费数据链路的整个闭环,并且其数据链路闭环一般情况下也适用于其他场景;以及对应的实时监控方案的设计和不同场景下的代码实现,包括:

垂类生态短视频生产消费数据链路闭环:用户操作行为日志的流转,日志上传,实时计算,以及流转到 BI,数据服务,最后数据赋能的整个流程

实时监控方案设计:监控类实时计算流程中各类数据源,数据汇的选型

监控 id 池在不同量级场景下具体代码实现

学习资料

https://github.com/flink-china/flink-training-course/blob/master/README.md
https://ververica.cn/developers-resources/
https://space.bilibili.com/33807709

作者:YangYichao
来源:Flink 微信公众号
原文链接:https://mp.weixin.qq.com/s/t_hbmx_xHly9y0nBcZmtnQ

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
+ 订阅

凌云时刻签约作者是为阿里经济体内云布道师在技术文章、图书出版、视频作品等培养优秀人才的通道,通过报名和审核的方式,加入凌云时刻成为签约作者,持续为布道的道路上答疑解惑、技术传承而提供好内容。

官方博客
官网链接