Flink 实时数仓(二)【ODS 层开发】

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
实时数仓Hologres,5000CU*H 100GB 3个月
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: Flink 实时数仓(二)【ODS 层开发】

前言

       最近投了不少的实习,也收到不错的反馈,虽然是中小公司偏多,但是毕竟现在这个环境双非进大厂实习可不同当年了。可惜的是学院不放人,无奈啊,遍身罗绮者,不是养蚕人。我累死累活肝了两年了,好不容易找到个不错的实习,可是学院...

       不骂了,没那功夫扯淡。回归主题,实时数仓将来真要是有机会从事的话那真不错,也不枉我当时学Flink的时候花了一整个学期,没暖气没电源的教室跑着三台虚拟机,CPU爆满,每天学俩小时就没电了,但还是花了半个学期学完了。今天开始正式的数仓搭建。

1、ODS 层开发

       在实时数仓这里,当我们把数据采集到 Kafka (topic_log 和 topic_db 主题)的时候,其实 ODS 层的数据存储任务就已经完成了(ODS 层的任务:数据的存储和备份)。接下来我们需要做的就是保持数据的有序:

1.1、Kafka 数据有序

       Kafka 只能保证单分区内有序,并不能保证全局有序。

1.1.1、设置 Kafka 分区默认个数

       这里我们需要设置 Kafka 的分区个数为 4,毕竟实时数仓对数据的吞吐量、并发性能的要求是比较高的,所以我们不能为了数据的有序性而把数据到挤到一个分区中:

vim /opt/module/kafka_2.12-3.4.1/config/server.properties
// 修改配置
num.partition=4

1.1.2、设置 Flink 精准一次

       Flink 程序从 Kafka 消费数据时会启动同属于一个消费者组的四个消费者,Kafka 消费者的默认分区分配策略是 Range + CooperativeSticky,消费者数和分区数相同时,每个消费者消费一个分区的数据。只要单分区数据有序,即可保证 Flink 单个并行度数据有序。

       我们这里的 Kafka 版本是 3.0.0,在 Kafka 1.x 及之后的版本中,保证数据单分区有序,条件如下:

不开启 Kafka 幂等性的情况

max.in.flight.requests.per.connection=1

开启 Kafka 幂等性的情况

// 必须小于等于5
max.in.flight.requests.per.connection=5

       在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。

       默认情况下幂等性是开启的,max.in.flight.requests.per.connection 默认值为 5,所以单分区数据默认是有序的,不需要做任何配置。

综上,我们可以保证 Flink 程序单个并行度的数据有序。

1.2、Maxwell 同步历史维度数据

       我们的实时计算不需要考虑历史的事实数据(比如下单、加购),但要考虑历史维度数据,毕竟没有维度只有业务过程是没法进行计算的。

1.2.1、为什么要同步历史维度数据而不同步历史业务数据

       当一个用户进行下单这个业务过程的时候,比如 user_id 为 1001 的用户在 province_id 为 17 的省份下单了 sku_id 为 10 的商品,并使用了 cupon_id 为 1001 的优惠券。

       当这个数据传到我们实时数仓的时候,我们必须知道用户是谁,它买了什么东西,有没有使用优惠券、使用了什么类型的优惠券、这个省份 id 是什么地方、下单方式是什么。这就涉及到了很多维度数据,所以我们必须提前把维度数据准备好,等到数据来的时候直接拿来用而不是才去业务库同步。

       在 ODS 层这里我们只需要原封不动的把维度数据导入到 Kafka ,等到搭建 DIM 层的时候直接从 ODS 层拿,而不是让 DIM 层去业务数据库中同步。

       在我们这个项目中,我们需要通过 Maxwell 同步下面这些维度表到 Kafka 的

activity_info
activity_rule
activity_sku
base_category1
base_category2
base_category3
base_province
base_region
base_trademark
coupon_info
coupon_range
financial_sku_cost
sku_info
spu_info
user_info

编写同步脚本:

#!/bin/bash
 
# 该脚本的作用是初始化所有的业务数据,只需执行一次
 
MAXWELL_HOME=/opt/module/maxwell
 
import_data() {
    for tab in $@
    do
      $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table ${tab} --config $MAXWELL_HOME/config.properties
    done
}
 
case $1 in
activity_info | activity_rule | activity_sku | base_category1 | base_category2 | base_category3 | base_dic | base_province | base_region | base_trademark | coupon_info | coupon_range | financial_sku_cost | sku_info | spu_info | user_info)
  import_data $1 
  ;;
"all")
  import_data activity_info activity_rule activity_sku base_category1 base_category2 base_category3 base_dic base_province base_region base_trademark coupon_info coupon_range financial_sku_cost sku_info spu_info user_info
  ;;
esac

思考

       我们之前在学习离线数仓的时候, 使用 Maxwell 和 DataX 来同步业务数据,其中的 Maxwell 在离线数仓中其实并没有什么作用,至于削峰解耦在离线数仓中是根本不用考虑的。而如果不使用 Kafka ,我们可以直接通过 Flume 直接采集到 HDFS。

       在离线数仓中使用 Maxwell 的作用完全是为了现在学习实时数仓时,方便 Flink 来直接从 Kafka 去读取数据。但是 Flume 的数据中包含的 Event Header ,它对于实时数仓来说是完全没有用的,所以我们当时为了妥协实时数仓,就把 Flume 数据中的 Header 给去掉了,但是也就引入了零点漂移的问题,毕竟 Event Header 中保存着 timestamp 信息,而它在经过 Kafka 之后,会被 Kafka 给它添加一个 Header ,Header 中的 timestamp 时间默认为 Kafka 处理的时间,所以我们当时又设置了 Flume 拦截器来把 Header 中的 timestamp 值设置为 body 中的时间戳(因为拦截器只能设置在 Source 和 Channel 之间,所以还需要一个 Flume 再从 Kafka 读出来)。

Flume 拦截器代码

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
 
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
 
public class TimeStampInterceptor implements Interceptor {
 
    private ArrayList<Event> events = new ArrayList<>();
 
    @Override
    public void initialize() {
 
    }
 
    @Override
    public Event intercept(Event event) {
 
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);
 
        JSONObject jsonObject = JSONObject.parseObject(log);
 
        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);
 
        return event;
    }
 
    @Override
    public List<Event> intercept(List<Event> list) {
        events.clear();
        for (Event event : list) {
            events.add(intercept(event));
        }
 
        return events;
    }
 
    @Override
    public void close() {
 
    }
 
    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }
 
        @Override
        public void configure(Context context) {
        }
    }
}

总结

       实时数仓中 ODS 层的工作很简单,我们只需要用 Maxwell 把业务数据库中的维度表进行实时同步即可。至于服务器里的日志数据我们根本不需要实时处理!一个日志数据没必要实时处理!所以实时数仓中我们也就不需要用 Flume 这个工具。

       所以对于实时数仓的 ODS 层,我们主要用的就是 Maxwell 来同步维度数据,而对于事实数据(比如下单、加购),等到 DWD 层再进行处理。

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
20天前
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
220 1
Flink CDC + Hologres高性能数据同步优化实践
|
1月前
|
SQL 消息中间件 Kafka
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
本文介绍了阿里云实时数仓Hologres负责人姜伟华在Flink Forward Asia 2024上的分享,涵盖实时数仓的发展历程、从实时数仓到实时湖仓的演进,以及总结。文章通过三代实时数仓架构的演变,详细解析了Lambda架构、Kafka实时数仓分层+OLAP、Hologres实时数仓分层复用等方案,并探讨了未来从实时数仓到实时湖仓的演进方向。最后,结合实际案例和Demo展示了Hologres + Flink + Paimon在实时湖仓中的应用,帮助用户根据业务需求选择合适的方案。
587 20
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
|
1月前
|
存储 关系型数据库 MySQL
Flink基于Paimon的实时湖仓解决方案的演进
本文整理自阿里云智能集团苏轩楠老师在Flink Forward Asia 2024论坛的分享,涵盖流式湖仓架构的背景介绍、技术演进和未来发展规划。背景部分介绍了ODS、DWD、DWS三层数据架构及关键组件Flink与Paimon的作用;技术演进讨论了全量与增量数据处理优化、宽表构建及Compaction操作的改进;发展规划则展望了Range Partition、Materialized Table等新功能的应用前景。通过这些优化,系统不仅简化了复杂度,还提升了实时与离线处理的灵活性和效率。
390 3
Flink基于Paimon的实时湖仓解决方案的演进
|
1月前
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
本文整理自鹰角网络大数据开发工程师朱正军在Flink Forward Asia 2024上的分享,主要涵盖四个方面:鹰角数据平台架构、数据湖选型、湖仓一体建设及未来展望。文章详细介绍了鹰角如何构建基于Paimon的数据湖,解决了Hudi入湖的痛点,并通过Trino引擎和Ranger权限管理实现高效的数据查询与管控。此外,还探讨了湖仓一体平台的落地效果及未来技术发展方向,包括Trino与Paimon的集成增强、StarRocks的应用以及Paimon全面替换Hive的计划。
196 1
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
|
24天前
|
SQL 消息中间件 Serverless
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
|
24天前
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
|
2月前
|
SQL 监控 关系型数据库
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
本文整理自用友畅捷通数据架构师王龙强在FFA2024上的分享,介绍了公司在Flink上构建实时数仓的经验。内容涵盖业务背景、数仓建设、当前挑战、最佳实践和未来展望。随着数据量增长,公司面临数据库性能瓶颈及实时数据处理需求,通过引入Flink技术逐步解决了数据同步、链路稳定性和表结构差异等问题,并计划在未来进一步优化链路稳定性、探索湖仓一体架构以及结合AI技术推进数据资源高效利用。
475 25
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
|
2月前
|
存储 消息中间件 OLAP
Hologres+Flink企业级实时数仓核心能力介绍-2024实时数仓Hologres线上公开课03
本次分享由阿里云产品经理骆撷冬(观秋)主讲,主题为“Hologres+Flink企业级实时数仓核心能力”,是2024实时数仓Hologres线上公开课的第三期。课程详细介绍了Hologres与Flink结合搭建的企业级实时数仓的核心能力,包括解决实时数仓分层问题、基于Flink Catalog的Streaming Warehouse实践,并通过典型客户案例展示了其应用效果。
75 10
Hologres+Flink企业级实时数仓核心能力介绍-2024实时数仓Hologres线上公开课03
|
24天前
|
存储 关系型数据库 MySQL
Flink基于Paimon的实时湖仓解决方案的演进
Flink基于Paimon的实时湖仓解决方案的演进
|
6月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

热门文章

最新文章