基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。

阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。


涉及云产品与资源:

实时计算Flink版免费试用>>>

EMR StarRocks免费试用>>>

对象存储OSS免费试用>>>

image.png

方案架构和优势

架构

传统的离线数仓通过定时调度离线作业的方式,存在延时长和成本高两大问题。离线作业的调度通常每小时甚至每天才进行一次,数据的消费者仅能看到上一小时甚至昨天的数据。同时,数据的更新多以覆写(overwrite)分区的方式进行,需要重新读取分区中原有的数据,才能与新鲜变更合并,产生新的结果数据。

基于实时计算Flink版流式数据湖仓Paimon(使用DLF 2.0作为元数据存储)和EMR StarRocks的OpenLake方案搭建流式湖仓可以解决上述传统离线数仓存在的问题。利用Flink的实时计算能力,数据可以在数仓分层之间实时流动;利用Paimon高效的更新能力,数据变更可以在分钟级的延时内传递给下游消费者;最后由StarRocks提供查询分析服务。Paimon与Flink深度集成,提供一体化的流式湖仓联合解决方案,在延时和成本上具有双重优势。本文搭建流式湖仓的方案架构如下:

  1. Flink将数据源写入Paimon,形成ODS层。
  2. Flink订阅ODS层的变更数据(Changelog)进行加工,形成DWD层再次写入Paimon。
  3. Flink订阅DWD层的Changelog进行加工,形成DWS层再次写入Paimon。
  4. 最后由EMR StarRocks读取Paimon外部表,对外提供应用查询。

优势

该方案有如下优势:

  • Paimon的每一层数据都可以在分钟级的延时内将变更传递给下游,将传统离线数仓的延时从小时级甚至天级降低至分钟级。
  • Paimon的每一层数据都可以直接接受变更数据,无需覆写分区,极大地降低了传统离线数仓数据更新与订正的成本,解决了中间层数据不易查、不易更新、不易修正的问题。
  • 模型统一,架构简化。ETL链路的逻辑是基于Flink SQL实现的;ODS层、DWD层和DWS层的数据统一存储在Paimon中,可以降低架构复杂度,提高数据处理效率。

该方案依赖于Paimon的三个核心能力,详情如下表所示。

Paimon核心能力

详情

主键表更新

Paimon底层使用LSM Tree数据结构,可以实现高效的数据更新。

关于Paimon主键表、Paimon底层数据结构的介绍请参见Primary Key TableFile Layouts

增量数据产生机制(Changelog Producer)

Paimon可以为任意输入数据流产生完整的增量数据(所有的update_after数据都有对应的update_before数据),保证数据变更可以完整地传递给下游。详情请参见增量数据产生机制

数据合并机制(Merge Engine)

当Paimon主键表收到多条具有相同主键的数据时,为了保持主键的唯一性,Paimon结果表会将这些数据合并成一条数据。Paimon支持去重、部分更新、预聚合等丰富多样的数据合并行为,详情请参见数据合并机制

实践场景

本文以某个电商平台为例,通过搭建一套流式湖仓,实现数据的加工清洗,并支持上层应用对数据的查询。流式湖仓实现了数据的分层和复用,并支撑各个业务方的报表查询(交易大屏、行为数据分析、用户画像标签)以及个性化推荐等多个业务场景。

  1. 构建ODS层:业务数据库实时入仓MySQL有orders(订单表),orders_pay(订单支付表)和product_catalog(商品类别字典表)三张业务表,这三张表通过Flink实时写入OSS,并以Paimon格式进行存储,作为ODS层。
  2. 构建DWD层:主题宽表将订单表、商品类别字典表、订单支付表利用Paimon的部分更新(partial-update)合并机制进行打宽,以分钟级延时生成DWD层宽表并产出变更数据(Changelog)。
  3. 构建DWS层:指标计算Flink实时消费宽表的变更数据,利用Paimon的预聚合(aggregation)合并机制产出DWM层dwm_users_shops(用户-商户聚合中间表),并最终产出DWS层dws_users(用户聚合指标表)以及dws_shops(商户聚合指标表)。

前提条件

说明

StarRocks实例、DLF数据目录需要与Flink工作空间处于相同地域。

使用限制

仅实时计算引擎VVR 8.0.9及以上版本支持该流式湖仓方案。

OpenLake数据摄取

准备MySQL数据源

本文以云数据库RDS MySQL版为例,创建数据库名称为order_dw,并创建三张业务表及数据。

  1. 快速创建RDS MySQL实例
    重要
    RDS MySQL版实例需要与Flink工作空间处于同一VPC。不在同一VPC下时请参见如何访问跨VPC的其他服务?
  2. 创建数据库和账号
    创建名称为order_dw的数据库,并创建高权限账号或具有数据库order_dw读写权限的普通账号。
    创建三张表,并插入相应数据。
CREATE TABLE `orders` (
  order_id bigint not null primary key,
  user_id varchar(50) not null,
  shop_id bigint not null,
  product_id bigint not null,
  buy_fee bigint not null,   
  create_time timestamp not null,
  update_time timestamp not null default now(),
  state int not null
);
CREATE TABLE `orders_pay` (
  pay_id bigint not null primary key,
  order_id bigint not null,
  pay_platform int not null, 
  create_time timestamp not null
);
CREATE TABLE `product_catalog` (
  product_id bigint not null primary key,
  catalog_name varchar(50) not null
);
-- 准备数据
INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
INSERT INTO orders VALUES
(100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
(100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
(100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
(100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
(100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
(100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
(100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
INSERT INTO orders_pay VALUES
(2001, 100001, 1, '2023-02-15 17:40:56'),
(2002, 100002, 1, '2023-02-15 17:40:56'),
(2003, 100003, 0, '2023-02-15 17:40:56'),
(2004, 100004, 0, '2023-02-15 17:40:56'),
(2005, 100005, 0, '2023-02-15 18:40:56'),
(2006, 100006, 0, '2023-02-15 18:40:56'),
(2007, 100007, 0, '2023-02-15 18:40:56');

创建MySQL Catalog

  1. 元数据管理页面,单击创建Catalog
  2. 内置Catalog页签,单击MySQL,单击下一步
  3. 填写以下参数,单击确定,新建名为mysqlcatalog的MySQL Catalog。

配置项

说明

是否必填

备注

catalog name

Catalog名称。

填写为自定义的英文名。本文以mysqlcatalog为例。

hostname

MySQL数据库的IP地址或者Hostname。

详情请参见查看和管理实例连接地址和端口。由于RDS MySQL版实例和Flink全托管处于相同VPC,此处应填写内网地址。

port

MySQL数据库服务的端口号,默认值为3306。

详情请参见查看和管理实例连接地址和端口

default-database

默认的MySQL数据库名称。

本文填写需要同步的数据库名order_dw。

username

MySQL数据库服务的用户名。

本文为准备MySQL数据源中创建的账号和密码。

password

MySQL数据库服务的密码。

  1. 关于MySQL Catalog的更多使用方法详情请参见管理MySQL Catalog

创建Paimon Catalog

  1. 登录实时计算控制台
  2. 在左侧导航栏,选择元数据管理页面,单击创建Catalog
  3. 内置Catalog页签,单击Apache Paimon,单击下一步
  4. 填写以下参数,选择DLF 2.0作为存储类型,单击确定

配置项

说明

是否必填

备注

metastore

元数据存储类型。

此示例选择为dlf存储类型。

catalog name

DLF数据目录名称。

重要

使用RAM用户或角色时,请确保拥有DLF数据读写权限,详情请参见授权管理

推荐使用DLF 2.0,无需您再填写AccessKey等信息,支持快速选择已创建的DLF数据目录,创建数据目录操作请参见数据目录

本示例选择名称为paimoncatalog的数据目录。


  1. 在数据目录下创建相应的order_dw数据库,以便后续同步MySQL中order_dw库下所有表的数据。
    在左侧导航栏,选择数据查询 > 查询脚本,单击新建一个临时查询。
-- 使用paimoncatalog数据源
USE CATALOG paimoncatalog;
-- 新建order_dw数据库
CREATE DATABASE order_dw;
  1. 返回The following statement has been executed successfully!表示创建库成功。

关于Paimon Catalog的更多使用方法详情请参见管理Paimon Catalog

构建ODS层:业务数据库实时入仓

基于Flink CDC,通过数据摄入YAML作业实现MySQL数据同步至Paimon,一次性将ODS层构建出来。

  1. 创建并启动数据摄入YAML同步作业。
  1. 实时计算控制台数据开发 > 数据摄入页面,新建名为ods的YAML空白草稿作业。
  2. 将如下代码复制到编辑器,注意修改相应的用户名和密码等参数。
source:
  type: mysql
  name: MySQL Source
  hostname: rm-bp1e********566g.mysql.rds.aliyuncs.com
  port: 3306
  username: ${secret_values.username}
  password: ${secret_values.password}
  tables: order_dw.\.*  # 支持正则表达,读取order_dw库下所有的表
sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: dlf-paimon
  catalog.properties.dlf.endpoint: dlfnext-vpc.cn-hangzhou.aliyuncs.com
  catalog.properties.dlf.region: cn-hangzhou
  catalog.properties.dlf.catalog.instance.id: clg-paimon-927606********35b48a444ee
  
pipeline:
  name: MySQL to Paimon Pipeline
  execution.checkpointing.max-concurrent-checkpoints: 3   # 减轻检查点长尾的影响
  table.exec.sink.upsert-materialize: NONE    # 消除无用的SinkMaterialize算子
  # Paimon结果表在每次检查点完成之后才会正式提交数据。
  # 此处将检查点间隔缩短为10s,是为了更快地看到结果。
  # 在生产环境下,系统检查点的间隔与两次系统检查点之间的最短时间间隔根据业务对延时要求的不同,一般设置为1分钟到10分钟。
  execution.checkpointing.interval: 10s
  execution.checkpointing.min-pause: 10s

参数

说明

username

MySQL用户名和密码。本示例使用变量,可以避免明文展示密码等信息,详情请参见变量管理

password

catalog.properties.dlf.endpoint

访问域名。本示例为VPC网络,其他域名详情请参见地域及访问域名

catalog.properties.dlf.region

DLF的数据目录所在地域。详情请参见地域及访问域名

catalog.properties.dlf.catalog.instance.id

数据目录唯一标识id。该id可在数据湖控制台中的数据目录列表下进行查看。

  1. Paimon写入性能优化请参见Paimon性能优化
  2. 单击右上方的部署
  3. 运维中心 > 作业运维,单击刚刚部署的ods作业操作列的启动,选择无状态启动启动作业。作业启动配置详情请参见作业启动
  1. 查看MySQL同步到Paimon的三张表的数据。
    实时计算控制台数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击右上角的运行
SELECT * FROM paimoncatalog.order_dw.orders ORDER BY order_id;

OpenLake数据ETL加工

构建DWD层:主题宽表

  1. 创建DWD层Paimon宽表dwd_orders。
    实时计算控制台数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击右上角的运行
CREATE TABLE paimoncatalog.order_dw.dwd_orders (
    order_id BIGINT,
    order_user_id STRING,
    order_shop_id BIGINT,
    order_product_id BIGINT,
    order_product_catalog_name STRING,
    order_fee BIGINT,
    order_create_time TIMESTAMP,
    order_update_time TIMESTAMP,
    order_state INT,
    pay_id BIGINT,
    pay_platform INT COMMENT 'platform 0: phone, 1: pc',
    pay_create_time TIMESTAMP,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'merge-engine' = 'partial-update', -- 使用部分更新数据合并机制产生宽表
    'changelog-producer' = 'lookup' -- 使用lookup增量数据产生机制以低延时产出变更数据
);
  1. 返回Query has been executed表示创建成功。
  2. 实时消费ODS层orders、orders_pay表的变更数据。
    实时计算控制台数据开发 > ETL页面,新建名为dwd的SQL流作业,并将如下代码复制到SQL编辑器后,部署作业并无状态启动作业。
    通过该SQL作业,orders表会与product_catalog表进行维表关联,关联后的结果将与orders_pay一起写入dwd_orders表中,利用Paimon表的部分更新数据合并机制,将orders表和orders_pay表中order_id相同的数据进行打宽。
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
-- Paimon目前暂不支持在同一个作业里通过多条INSERT语句写入同一张表,因此这里使用UNION ALL。
INSERT INTO paimoncatalog.order_dw.dwd_orders 
SELECT 
    o.order_id,
    o.user_id,
    o.shop_id,
    o.product_id,
    dim.catalog_name,
    o.buy_fee,
    o.create_time,
    o.update_time,
    o.state,
    NULL,
    NULL,
    NULL
FROM
    paimoncatalog.order_dw.orders o 
    LEFT JOIN paimoncatalog.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
    ON o.product_id = dim.product_id
UNION ALL
SELECT
    order_id,
    NULL,
    NULL,
    NULL,
    NULL,
    NULL,
    NULL,
    NULL,
    NULL,
    pay_id,
    pay_platform,
    create_time
FROM
    paimoncatalog.order_dw.orders_pay;
  1. 查看宽表dwd_orders的数据。
    实时计算控制台数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击右上角的运行
SELECT * FROM paimoncatalog.order_dw.dwd_orders ORDER BY order_id;

构建DWS层:指标计算

  1. 创建DWS层的聚合表dws_users以及dws_shops。
    实时计算控制台数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击右上角的运行
-- 用户维度聚合指标表。
CREATE TABLE paimoncatalog.order_dw.dws_users (
    user_id STRING,
    ds STRING,
    payed_buy_fee_sum BIGINT COMMENT '当日完成支付的总金额',
    PRIMARY KEY (user_id, ds) NOT ENFORCED
) WITH (
    'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
    'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
    -- 由于dws_users表不再被下游流式消费,因此无需指定增量数据产生机制
);
-- 商户维度聚合指标表。
CREATE TABLE paimoncatalog.order_dw.dws_shops (
    shop_id BIGINT,
    ds STRING,
    payed_buy_fee_sum BIGINT COMMENT '当日完成支付总金额',
    uv BIGINT COMMENT '当日不同购买用户总人数',
    pv BIGINT COMMENT '当日购买用户总人次',
    PRIMARY KEY (shop_id, ds) NOT ENFORCED
) WITH (
    'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
    'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
    'fields.uv.aggregate-function' = 'sum', -- 对 uv 的数据求和产生聚合结果
    'fields.pv.aggregate-function' = 'sum' -- 对 pv 的数据求和产生聚合结果
    -- 由于dws_shops表不再被下游流式消费,因此无需指定增量数据产生机制
);
-- 为了同时计算用户视角的聚合表以及商户视角的聚合表,另外创建一个以用户 + 商户为主键的中间表。
CREATE TABLE paimoncatalog.order_dw.dwm_users_shops (
    user_id STRING,
    shop_id BIGINT,
    ds STRING,
    payed_buy_fee_sum BIGINT COMMENT '当日用户在商户完成支付的总金额',
    pv BIGINT COMMENT '当日用户在商户购买的次数',
    PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
) WITH (
    'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
    'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
    'fields.pv.aggregate-function' = 'sum', -- 对 pv 的数据求和产生聚合结果
    'changelog-producer' = 'lookup', -- 使用lookup增量数据产生机制以低延时产出变更数据
    -- dwm层的中间表一般不直接提供上层应用查询,因此可以针对写入性能进行优化。
    'file.format' = 'avro', -- 使用avro行存格式的写入性能更加高效。
    'metadata.stats-mode' = 'none' -- 放弃统计信息会增加OLAP查询代价(对持续的流处理无影响),但会让写入性能更加高效。
);
  1. 返回Query has been executed表示创建成功。
  2. DWD层dwd_orders表的变更数据。
    实时计算控制台数据开发 > ETL页签,新建名为dwm的SQL流作业,并将如下代码复制到SQL编辑器后,部署作业并无状态启动作业。
    通过该SQL作业,dwd_orders表的数据会写入dwm_users_shops表中,利用Paimon表的预聚合数据合并机制,自动对order_fee求和,算出用户在商户的消费总额。同时,自动对1求和,也能算出用户在商户的消费次数。
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
INSERT INTO paimoncatalog.order_dw.dwm_users_shops
SELECT
    order_user_id,
    order_shop_id,
    DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
    order_fee,
    1 -- 一条输入记录代表一次消费
FROM paimoncatalog.order_dw.dwd_orders
WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;
  1. 实时消费DWM层dwm_users_shops表的变更数据。
    实时计算控制台数据开发 > ETL页面,新建名为dws的SQL流作业,并将如下代码复制到SQL编辑器后,部署作业并无状态启动作业。
    通过该SQL作业,dwm_users_shops表的数据会写入dws_users表和dws_shops表中,利用Paimon表的预聚合数据合并机制,在dws_users表中,计算每个用户的总消费额(payed_buy_fee_sum),在dws_shops表中计算商户的总流水(payed_buy_fee_sum),商户的消费用户数量(对1求和)和消费总人次(pv)。
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
-- 与dwd不同,此处每一条INSERT语句写入的是不同的Paimon表,可以放在同一个作业中。
BEGIN STATEMENT SET;
INSERT INTO paimoncatalog.order_dw.dws_users
SELECT 
    user_id,
    ds,
    payed_buy_fee_sum
FROM paimoncatalog.order_dw.dwm_users_shops;
-- 以商户为主键,部分热门商户的数据量可能远高于其他商户。
-- 因此使用local merge在写入Paimon之前先在内存中进行预聚合,缓解数据倾斜问题。
INSERT INTO paimoncatalog.order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
SELECT
    shop_id,
    ds,
    payed_buy_fee_sum,
    1, -- 一条输入记录代表一名用户在该商户的所有消费
    pv
FROM paimoncatalog.order_dw.dwm_users_shops;
END;
  1. 查看dws_users表和dws_shops表的数据
    实时计算控制台数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击右上角的运行
--查看dws_users表数据
SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;

--查看dws_shops表数据
SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;

捕捉业务数据库的变化

前面已完成了流式湖仓的构建,下面将测试流式湖仓捕捉业务数据库变化的能力。

  1. 向MySQL的order_dw数据库中插入如下数据。
INSERT INTO orders VALUES
(100008, 'user_001', 12345, 3, 3000, '2023-02-15 17:40:56', '2023-02-15 18:42:56', 1),
(100009, 'user_002', 12348, 4, 1000, '2023-02-15 18:40:56', '2023-02-15 19:42:56', 1),
(100010, 'user_003', 12348, 2, 2000, '2023-02-15 19:40:56', '2023-02-15 20:42:56', 1);
INSERT INTO orders_pay VALUES
(2008, 100008, 1, '2023-02-15 18:40:56'),
(2009, 100009, 1, '2023-02-15 19:40:56'),
(2010, 100010, 0, '2023-02-15 20:40:56');
  1. 查看dws_users表和dws_shops表的数据。 在实时计算控制台数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击右上角的运行
  • dws_users表
SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;
  • dws_shops表
SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;

OpenLake数据查询分析

上一小节展示了在Flink中进行Paimon Catalog的创建与Paimon表的写入。本节展示流式湖仓搭建完成后,利用StarRocks进行数据分析的一些简单应用场景。

重要

DLF 2.0的访问控制是RAM级别的。默认情况下所有StarRocks用户均不具备DLF 2.0的任何权限,您必须添加一个已存在的RAM用户并进行授权,后续使用该RAM用户连接StarRocks实例和创建SQL查询,详情请参见使用DLF 2.0 Catalog

创建示例

排名查询

对DWS层聚合表进行分析。本文使用StarRocks查询23年2月15日交易额前三高的商户的代码示例如下。

SELECT ROW_NUMBER() OVER (ORDER BY payed_buy_fee_sum DESC) AS rn, shop_id, payed_buy_fee_sum 
FROM dws_shops
WHERE ds = '20230215'
ORDER BY rn LIMIT 3;

明细查询

对DWD层宽表进行分析。本文使用StarRocks查询某个客户23年2月特定支付平台支付的订单明细的代码示例如下。

SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time;;

数据报表

对DWD层宽表进行分析。本文使用StarRocks查询23年2月内每个品类的订单总量和订单总金额的代码示例如下。

SELECT
  order_create_time AS order_create_date,
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
GROUP BY
  order_create_date, order_product_catalog_name
ORDER BY
  order_create_date, order_product_catalog_name;

相关文档

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
相关文章
|
1月前
|
消息中间件 人工智能 监控
Paimon x StarRocks 助力喜马拉雅直播实时湖仓构建
本文由喜马拉雅直播业务与仓库建设负责人王琛撰写,介绍了喜马拉雅直播业务的数据仓库架构迭代升级。文章重点分享了基于 Flink + Paimon + StarRocks 实现实时湖仓的架构及其成效,通过分钟级别的收入监控、实时榜单生成、流量监测和盈亏预警,大幅提升了运营效率与决策质量,并为未来的业务扩展和 AI 项目打下坚实基础。
218 5
Paimon x StarRocks 助力喜马拉雅直播实时湖仓构建
|
2月前
|
存储 数据采集 大数据
Flink实时湖仓,为汽车行业数字化加速!
本文由阿里云计算平台产品专家李鲁兵(云觉)分享,聚焦汽车行业大数据应用。内容涵盖市场趋势、典型大数据架构、产品市场地位及能力解读,以及典型客户案例。文章详细介绍了新能源汽车市场的快速增长、大数据架构分析、实时湖仓方案的优势,以及Flink和Paimon在车联网中的应用案例。
190 8
Flink实时湖仓,为汽车行业数字化加速!
|
1月前
|
数据采集 运维 DataWorks
DataWorks on EMR StarRocks,打造标准湖仓新范式
本文整理自阿里云计算平台产品专家周硕(簌篱)在阿里云DataWorks on EMR StarRocks解决方案介绍中的分享。介绍了阿里云DataWorks与EMR Serverless StarRocks的结合使用,详细阐述了在数据同步、数据消费、数据治理三大场景中的核心能力。DataWorks作为大数据开发治理平台,提供了从数据建模、数据集成、数据开发到数据治理的全链路解决方案,结合StarRocks的高性能分析能力,帮助企业实现OLAP分析、湖仓一体开发及数据综合治理,满足复杂业务场景下的需求,提升数据处理和分析效率。
74 4
|
3月前
|
存储 数据采集 OLAP
饿了么基于Flink+Paimon+StarRocks的实时湖仓探索
饿了么的实时数仓经历了多个阶段的演进。初期通过实时ETL、报表应用、联动及监控构建基础架构,随后形成了涵盖数据采集、加工和服务的整体数据架构。1.0版本通过日志和Binlog采集数据,但在研发效率和数据一致性方面存在问题。2.0版本通过Dataphin构建流批一体化系统,提升了数据一致性和研发效率,但仍面临新业务适应性等问题。最终,饿了么选择Paimon和StarRocks作为实时湖仓方案,显著降低了存储成本并提高了系统稳定性。未来,将进一步优化带宽瓶颈、小文件问题及权限控制,实现更多场景的应用。
426 7
饿了么基于Flink+Paimon+StarRocks的实时湖仓探索
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
1月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1158 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
1月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
148 56
|
5月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
859 7
阿里云实时计算Flink在多行业的应用和实践
|
4月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 运维 数据可视化
阿里云实时计算Flink版产品体验测评
阿里云实时计算Flink基于Apache Flink构建,提供一站式实时大数据分析平台,支持端到端亚秒级实时数据分析,适用于实时大屏、实时报表、实时ETL和风控监测等场景,具备高性价比、开发效率、运维管理和企业安全等优势。