实时计算在「阿里影业实时报表业务」技术解读

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 阿里影业实时报表开始做法也是按照传统型报表做法一样,直接从阿里云rds写sql查询,随着数据量越来越大,这种做法已经没有办法满足业务扩张,带来的问题响应时间变慢,吞吐量低,我们急需要一种技术方案能满足未来2-3年随着影院增加,数据增长,而报表功能还能很好的满足客户需求技术方案。

需求背景

影业实时报表开始做法也是按照传统型报表做法一样,直接从阿里云rds写sql查询,随着数据量越来越大,这种做法已经没有办法满足业务扩张,带来的问题响应时间变慢,吞吐量低,我们急需要一种技术方案能满足未来2-3年随着影院增加,数据增长,而报表功能还能很好的满足客户需求技术方案。

业务目标

时间:平均1分钟,像销售报表最差5分钟返回结果。 数据准确性:数据不丢失并且跟业务库保持一直,数据正确性要100%。 数据校对和回溯:数据不准确的时候,能够手动修复报表。

技术挑战

数据幂等:ETL(精卫/Blink) 如何保障业务明细数据变更时间顺序,即:对同一条记录进行 update,如何保障 update 的顺序是正确的;同样对 delete/insert 一样要求。 数据校对:目前试运行阶段,一方面采用新旧报表对比,另外一方面,采用“数据回溯”,如每隔 5 分钟从业务库同步最近 5 分钟的变动数据。 数据查询性能:HybirdDB for Mysql存储中如何保障根据不同条件查询的性能,包括单表查询和多表关联查询。

技术方案

数据架构图

通过上图可以看出我们新旧报表方案,根据报表目标梳理出要处理的细节点:
  • 集团内部有很多种blink的数据源,如tt、datahub、metaq、notify,而我们在弹外使用了很久metaq,并且实时性也没什么问题,所以就用了metaq作为blink的数据源。
  • 精卫发送数据到Metaq,而Blink订阅Metaq数据,整个解析模型现在Blink不支持,这个需要自己Blink UDX函数去转换,这个也需要去了解精卫发送到Metaq的协议,UDX自定义函数。
  • 精卫发送数据到Metaq,如下图,目前只支持增量方式,并不支持全量到Metaq,而全量支持只有RDS到RDS,这个也需要去了解精卫发送到Metaq的协议。

整体配置流程

选择自主消费模式
选择TDDL数据源和配置表
选择链路类型
精卫服务配置完成
订阅metaq
编写作业代码
上线运维作业

精卫和metaq消息传输协议

为什么要去了解metaq和精卫之间传输的协议,是由于目前精卫官方提供的功能,没办法满足我们的业务需求,他提供了全量功能是针对db对db,没有办法db对metaq,并且我们还要兼容blink脚本共用,要满足这个功能就要查看精卫现在是怎么做的,通过查看文档和代码3.0.12之前的版本是dbsync协议而这个协议本身基于对象是DBMSRowChange,而之后的版本是EventMessage,现在容器代码也是返回DataMessage其结构跟EventMessage是一样,但是通过读取代码发现核心的还是DBMSRowChange通过这里面的数据组装到thirft协议中去,核心代码如下:

精卫发送metaq关键代码

com.alibaba.middleware.jingwei.core.applier.BatchMetaq3Applier
MessageBuilder messageBuilder = new PartitionMessageBuilder();
ThriftHeader header = null;
List<Message> messageFlush = new ArrayList<Message>(messages.size());
for (com.alibaba.middleware.jingwei.externalApi.message.Message message : messages) {
    DbSyncMessage dbSyncMessage = (DbSyncMessage) message;
    DBMSRowChange dbmsRowChange = (DBMSRowChange) dbSyncMessage.getDbMessage();
    try {
        header = buildThriftHeader(dbSyncMessage);
        // 省去部分代码
        for (Message thiftMessage : messageList) {
            messageFlush.add(thiftMessage);
        }
    } catch (Throwable e) {
        throw new JingWeiException(builder.toString(), e);
    }
}

try {
    for (Message thiftMessage : messageBuilder.flush(header)) {
        messageFlush.add(thiftMessage);
    }
} catch (TException e) {
    String err = "Thrift serialization error: " + e.getMessage();
    logger.error(err, e);
    throw new JingWeiException(err, e);
}
msgSender.send(messageFlush);
代码主要是讲经过binlog的数据转换成Message对象,Message把响应的数据丢到Thrift协议中去,最核心就是把数据转成Thrift然后丢到metaq。

精卫解析metaq核心代码

com.alibaba.middleware.jingwei.client.util.MetaqToJingweiMsgParser
ThriftHeader thriftHeader;
try {
    thriftHeader = ThriftHelper.loadThrift(msg.getBody(), eventSet);
} catch (Exception e) {
    if (DynamicConfig.isSKIP_ALL_EXCEPTION(taskName)) {
        logger.warn("SKIP_ALL_EXCEPTION is true, will skip the msg : " + msg, e);
        continue;
    } else {
        throw new JingWeiException("ThriftHelper.loadThrift occur error" + ", msg : " + msg, e);
    }
}

for (ThriftEvent event : eventSet) {
   //省去部分代码
}
上面代码是精简出来的,解析Thrift处理过的metaq的数据,如果针对上面ThirftEvent感兴趣可以仔细看看他们内部怎么处理,通过测试这种协议反解析出来的速度很快,都是毫秒级别。

blink解析metaq模型

解析metaq发送过来的核心代码

public static<T> List<T> parseMetaQMessage(byte[] bytes, Class c) throws Exception {
    List<T> sourceList = new ArrayList<>();
    List<ThriftEvent> eventSet = new ArrayList<>();
    ThriftHelper.loadThrift(bytes, eventSet);
    //协议解包
    for (ThriftEvent event : eventSet) {
        // 省去部分代码
    }
    return sourceList;
}

配置流程

可以参考上面整体配置流程。

精卫回溯方案

为什么不自己开发一套代码,直接把消息丢到metaq不就行了吗? 是因为精卫是按照任务纬度来划分,一个任务有多个表,如果自己再开发一套就需要把现有的任务跟表的关系关联在一起,这样造成很大的工作量,并且通过查看官方文档和vone咨询那边的开发,他们现在没有办法支持全量同步到metaq,只能通过精卫容器模式自己编写代码来实现。

全量发送metaq核心代码

Metaq3ApplierVO metaq3ApplierVO = new Metaq3ApplierVO();
metaq3ApplierVO.setCompressionType("NONE");
BatchMetaq3Applier outerClass = new BatchMetaq3Applier();
outerClass.setMetaq3ApplierVO(metaq3ApplierVO);
BatchMetaq3Applier.PartitionMessageBuilder messageBuilder = outerClass.new PartitionMessageBuilder();
ThriftHeader header = null;
String topic = entryMessageData.getKey();
List<Message> messageFlush = new ArrayList<Message>(messages.size());
for (DataMessage message : entryMessageData.getValue()) {
	DBMSRowChange dbmsRowChange = null;
	List<DBMSColumn> columns = new ArrayList<>();
	int columnIndex = 0;
	// 省去部分代码
}
基于上面分析的协议,而现在返回DataMessage对象是精卫封装后的,为了兼容一套Blink协议转换,需要把转换后的对象再组装成Thrift协议,在组装过程有很多依赖,有些依赖没有考虑全面报了很多错误,然后去适配,还有个细节由于每个任务对应不同的表,而不同的表是有不同的topic,所以在代码上把接受到数据做了一层分组,这样按照精卫生成的topic,就能一套代码运行N个服务。

回朔配置流程

选择精卫自主消费

选择据链路类型

上传代码到容器

petadata技术

  • petadata百万查询都是毫秒级别,不过不能分页过多,因为页数过多多从多个分区拿数据,然后再组装很耗时。

总结

  • 精卫发送metaq的数据带毫秒的,格式如:2018-03-10 12:12:12.0,而回溯全量方案获取的dataMessage是直接从数据库拿的值,没有带毫秒。格式如:2018-03-10 12:12:12。
  • petadata不支持group by后再order by。
  • 幂等顺序问题通过LAST_VALUE和group by来解决。
  • blink.state.ttl.ms(默认一天) 和 state.backend.rocksdb.ttl.ms(默认1一天半), 一般是按照这个规则设置两个参数blink.state.ttl.ms <= state.backend.rocksdb.ttl.ms,由于我们报表业务特殊性,是凌晨6点到第二天凌晨6点为一个周日,所以这两个参数最少要设置为2天,才能保证准确性。
  • 有join操作,先过滤、去重,再join。
  • 如果in/outcpu过高,则从core、memory、parallelism,cu几方面扩大调优。
  • 传输数据过多,可以增大blink.miniBatch.size,以减少在计算过程中对IO的操作,但只能对group by生效。
  • 精卫bug:当带上自定义查询条件,如果id是主键,且是varchar类型,精卫拼装sql就会报错,这个已经反馈给精卫开发,正常id都是long,由于我们在弹外的业务是字符串,所以复现这个bug,后续的表一定要按照集团规范,不然使用其他中间件估计也会有问题。

作者:向飞(飞大

简介:技术专家,资深java程序员,喜欢折腾各种技术。


如果您有实时报表/实时数据大屏/实时金融风控/实时电商推荐等相关实时化数据处理需求,可以加入如下钉钉交流群!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8月前
|
监控 Oracle 关系型数据库
Flink CDC(Change Data Capture)是一种用于捕获数据库变更的技术
Flink CDC(Change Data Capture)是一种用于捕获数据库变更的技术
99 8
|
4月前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
450 31
Apache Flink 流批融合技术介绍
|
5月前
|
Cloud Native 安全 调度
Flink 新一代流计算和容错问题之Flink 通过云原生技术改进容错设计要如何操作
Flink 新一代流计算和容错问题之Flink 通过云原生技术改进容错设计要如何操作
|
6月前
|
存储 算法 物联网
海量数据实时计算利器:深入探索Tec(一个假设性技术框架)
总之,Tec作为海量数据实时计算利器,在推动数字化转型、提升业务效率、保障数据安全等方面发挥着重要作用。随着技术的不断进步和应用场景的不断拓展,Tec的未来发展前景将更加广阔。
|
5月前
|
机器学习/深度学习 监控 Serverless
Serverless 应用的监控与调试问题之Flink在内部使用的未来规划,以及接下来有什么打算贡献社区的创新技术
Serverless 应用的监控与调试问题之Flink在内部使用的未来规划,以及接下来有什么打算贡献社区的创新技术
|
5月前
|
机器学习/深度学习 人工智能 运维
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
|
6月前
|
SQL 分布式计算 BI
实时计算 Flink版产品使用问题之基于宽表数据展示实时报表,该如何实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
存储 测试技术 数据处理
阿里云实时计算企业级状态存储引擎 Gemini 技术解读
阿里云实时计算企业级状态存储引擎 Gemini 技术解读
311 57
|
8月前
|
SQL 并行计算 大数据
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
关于Flink服务的搭建与部署,由于其涉及诸多实战操作而理论部分相对较少,小编打算采用一个独立的版本和环境来进行详尽的实战讲解。考虑到文字描述可能无法充分展现操作的细节和流程,我们决定以视频的形式进行分析和介绍。因此,在本文中,我们将暂时不涉及具体的搭建和部署步骤。
569 3
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
|
8月前
|
存储 SQL Java
阿里Flink云服务提供了CDC(Change Data Capture)功能
【2月更文挑战第10天】阿里Flink云服务提供了CDC(Change Data Capture)功能
118 1

相关产品

  • 实时计算 Flink版