Flink 实时数仓(一)【实时数仓&离线数仓对比】(2)

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
实时计算 Flink 版,5000CU*H 3个月
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: Flink 实时数仓(一)【实时数仓&离线数仓对比】

Flink 实时数仓(一)【实时数仓&离线数仓对比】(1)https://developer.aliyun.com/article/1534237

3、项目架构设计准备

我们接下来会创建一个普通 Maven 项目 gmall2024-realtime ,并创建四个 module:

  1. realtime-common:用于引入公共的第三方依赖,编写工具类、实体类等。
  2. realtime-dim:用于编写DIM层业务代码。
  3. realtime-dwd:用于编写DWD层业务代码。
  4. realtime-dws:用于编写DWS层业务代码。

其中,后三个module统称为业务模块,业务模块都要将realtime-common模块作为依赖引入。

3.1、创建父工程 gmall2024-realtime

在父项目中声明一些依赖,这些依赖不会打包到 jar 包里,但是我们在本地运行的时候需要提供:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>org.lyh</groupId>
    <artifactId>gmall2024-realtime</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>realtime-common</module>
        <module>realtime-dim</module>
        <module>realtime-dwd</module>
        <module>realtime-dws</module>
    </modules>
 
    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.1</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.3.4</hadoop.version>
        <flink-cdc.vesion>2.4.0</flink-cdc.vesion>
        <fastjson.version>1.2.83</fastjson.version>
        <hbase.version>2.4.11</hbase.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
 
        <dependency><!--在 idea 运行的时候,可以打开 web 页面-->
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
 
        <!--如果保存检查点到hdfs上,需要引入此依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>
 
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-reload4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
 
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
            <scope>provided</scope>
        </dependency>
 
        <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
            <scope>provided</scope>
        </dependency>
 
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
            <scope>provided</scope>
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
 
 
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>${fastjson.version}</version>
            </dependency>
 
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>${flink.version}</version>
            </dependency>
 
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-connector-mysql-cdc</artifactId>
                <version>${flink-cdc.vesion}</version>
            </dependency>
 
            <!-- hbase 依赖-->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>${hbase.version}</version>
            </dependency>
 
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-auth</artifactId>
                <version>${hadoop.version}</version>
                <scope>provided</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-reload4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
 
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-hbase-2.2</artifactId>
                <version>${flink.version}</version>
            </dependency>
 
            <!-- https://mvnrepository.com/artifact/org.apache.doris/flink-doris-connector-1.17 -->
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.17</artifactId>
                <version>1.4.0</version>
            </dependency>
 
            <dependency>
                <groupId>commons-beanutils</groupId>
                <artifactId>commons-beanutils</artifactId>
                <version>1.9.4</version>
            </dependency>
 
            <dependency>
                <groupId>com.janeluo</groupId>
                <artifactId>ikanalyzer</artifactId>
                <version>2012_u6</version>
            </dependency>
 
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>3.3.0</version>
            </dependency>
 
            <dependency>
                <groupId>io.lettuce</groupId>
                <artifactId>lettuce-core</artifactId>
                <version>6.2.4.RELEASE</version>
            </dependency>
 
        </dependencies>
 
    </dependencyManagement>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <!--原本是 3.1.1-->
                <version>3.5.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                    <exclude>org.apache.hadoop:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常-->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
 
                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <!-- connector和format依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决-->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
 
 
</project>

dependencyManagement 标签下的依赖同样不会被打包进 jar 包,它在这里只是起到一个管理版本的作用。

3.2、创建公共模块 realtime-common

新建子模块 realtime-common ,导入依赖,这里的依赖不需要写版本号,因为我们都是继承自父工程 gmall2024-realtime 的,它帮我们进行了版本管理。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>gmall2024-realtime</artifactId>
        <groupId>org.lyh</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
 
    <artifactId>realtime-common</artifactId>
 
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
 
    <dependencies>
 
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
        </dependency>
 
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
        </dependency>
 
        <!-- hbase 依赖-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hbase-2.2</artifactId>
        </dependency>
 
        <!-- https://mvnrepository.com/artifact/org.apache.doris/flink-doris-connector-1.17 -->
        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.17</artifactId>
        </dependency>
 
        <dependency>
            <groupId>commons-beanutils</groupId>
            <artifactId>commons-beanutils</artifactId>
        </dependency>
 
        <dependency>
            <groupId>com.janeluo</groupId>
            <artifactId>ikanalyzer</artifactId>
        </dependency>
 
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>
 
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </dependency>
    </dependencies>
 
</project>

       在 realtime-common 模块中,除了要把一些公共的包导进来,还需要把一些全局使用的东西配置一下,比如 log4j 和一些公共的类库:

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n
log4j.rootLogger=error,stdout

3.3、创建其它子模块

新建子模块 realtime-dim、这些模块都不再需要导入所以依赖了,只需要继承导入 realtime-common 模块即可。

    <dependencies>
        <dependency>
            <groupId>org.lyh</groupId>
            <artifactId>realtime-common</artifactId>
            <version>1.0-SNAPSHOT</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

总结

       至此,准备工作基本完成了,这一节最重要的就是学习离线数仓和实时数仓的一些区别了,比如每一层的存储方式,在离线数仓中我们不需要考虑时效性,所以存到 HDFS 当中即可,但是实时数仓考虑到时效性,我们的数据尽可能以一个流的形式被处理,所以我们的实时数仓主要借助 Kafka 以及 HBase、Redis 等一些快速或者在大数据场景下相对快速的工具进行数据存储。

相关实践学习
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
6月前
|
分布式计算 运维 监控
Dataphin离线数仓搭建深度测评:数据工程师的实战视角
作为一名金融行业数据工程师,我参与了阿里云Dataphin智能研发版的评测。通过《离线数仓搭建》实践,体验了其在数据治理中的核心能力。Dataphin在环境搭建、管道开发和任务管理上显著提效,如测试环境搭建从3天缩短至2小时,复杂表映射效率提升50%。产品支持全链路治理、智能提效和架构兼容,帮助企业降低40%建设成本,缩短60%需求响应周期。建议加强行业模板库和移动适配功能,进一步提升使用体验。
|
8月前
|
SQL 监控 关系型数据库
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
本文整理自用友畅捷通数据架构师王龙强在FFA2024上的分享,介绍了公司在Flink上构建实时数仓的经验。内容涵盖业务背景、数仓建设、当前挑战、最佳实践和未来展望。随着数据量增长,公司面临数据库性能瓶颈及实时数据处理需求,通过引入Flink技术逐步解决了数据同步、链路稳定性和表结构差异等问题,并计划在未来进一步优化链路稳定性、探索湖仓一体架构以及结合AI技术推进数据资源高效利用。
631 25
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
|
7月前
|
SQL 运维 BI
湖仓分析|浙江霖梓基于 Doris + Paimon 打造实时/离线一体化湖仓架构
浙江霖梓早期基于 Apache Doris 进行整体架构与表结构的重构,并基于湖仓一体和查询加速展开深度探索与实践,打造了 Doris + Paimon 的实时/离线一体化湖仓架构,实现查询提速 30 倍、资源成本节省 67% 等显著成效。
314 3
湖仓分析|浙江霖梓基于 Doris + Paimon 打造实时/离线一体化湖仓架构
|
8月前
|
存储 消息中间件 OLAP
Hologres+Flink企业级实时数仓核心能力介绍-2024实时数仓Hologres线上公开课03
本次分享由阿里云产品经理骆撷冬(观秋)主讲,主题为“Hologres+Flink企业级实时数仓核心能力”,是2024实时数仓Hologres线上公开课的第三期。课程详细介绍了Hologres与Flink结合搭建的企业级实时数仓的核心能力,包括解决实时数仓分层问题、基于Flink Catalog的Streaming Warehouse实践,并通过典型客户案例展示了其应用效果。
220 10
Hologres+Flink企业级实时数仓核心能力介绍-2024实时数仓Hologres线上公开课03
|
SQL 监控 大数据
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
|
3月前
|
分布式计算 运维 监控
Fusion 引擎赋能:流利说如何用阿里云 Serverless Spark 实现数仓计算加速
本文介绍了流利说与阿里云合作,利用EMR Serverless Spark优化数据处理的全过程。流利说是科技驱动的教育公司,通过AI技术提升用户英语水平。原有架构存在资源管理、成本和性能等痛点,采用EMR Serverless Spark后,实现弹性资源管理、按需计费及性能优化。方案涵盖数据采集、存储、计算到查询的完整能力,支持多种接入方式与高效调度。迁移后任务耗时减少40%,失败率降低80%,成本下降30%。未来将深化合作,探索更多行业解决方案。
133 1
|
4月前
|
SQL 关系型数据库 MySQL
客户说|保险极客引入阿里云AnalyticDB,多业务场景效率大幅提升
“通过引入AnalyticDB,我们在复杂数据查询和实时同步方面取得了显著突破,其分布式、弹性与云计算的优势得以充分体现,帮助企业快速响应业务变化,实现降本增效。AnalyticDB的卓越表现保障了保险极客数据服务的品质和效率。”
|
7月前
|
存储 分布式计算 物联网
美的楼宇科技基于阿里云 EMR Serverless Spark 构建 LakeHouse 湖仓数据平台
美的楼宇科技基于阿里云 EMR Serverless Spark 建设 IoT 数据平台,实现了数据与 AI 技术的有效融合,解决了美的楼宇科技设备数据量庞大且持续增长、数据半结构化、数据价值缺乏深度挖掘的痛点问题。并结合 EMR Serverless StarRocks 搭建了 Lakehouse 平台,最终实现不同场景下整体性能提升50%以上,同时综合成本下降30%。
540 58
|
7月前
|
SQL 存储 OLAP
阿里云 EMR Serverless StarRocks3.x,极速统一的湖仓新范式
阿里云 EMR Serverless StarRocks3.x,极速统一的湖仓新范式
173 0

热门文章

最新文章