【最佳实践】实时计算 Flink 版在金融行业的实时数仓建设实践

简介: 金融是现代经济的核心。我国金融业在市场化改革和对外开放中不断发展,金融总量大幅增长。金融稳定直接关系到国家经济发展的前途和命运,金融业是国民经济发展的晴雨表。对我国金融业发展现状进行客观分析,对金融业发展趋势进行探索,有助于消除金融隐患,使金融业朝着健康、有序方向发展。

行业背景

  • 行业现状: 

金融是现代经济的核心。我国金融业在市场化改革和对外开放中不断发展,金融总量大幅增长。金融稳定直接关系到国家经济发展的前途和命运,金融业是国民经济发展的晴雨表。对我国金融业发展现状进行客观分析,对金融业发展趋势进行探索,有助于消除金融隐患,使金融业朝着健康、有序方向发展。

  • 大数据在其行业中的作用:

    1. 金融服务和产品创新:借助社交网络等平台产生的海量用户和数据记录着用户群体的兴趣和偏好情绪等信息, 对客户行为模式进行分析,可以带来更贴近客户需求的产品创新。
    2. 增强用户体验:通过大数据分析对客户进行画像,结合客户画像特征,为用户提供个性化服务,增强用户体验。

业务场景

某保险公司开发了个金融类APP,该公司在APP中会投放保险广告、发布优惠活动,用户通过APP进行投保等操作。
业务的构建涉及到几个端:

  1. APP:应用程序,用户访问入口,用户通过APP点击浏览保险广告、优惠活动等,并进行投保下单。
  2. 后台系统:

a.运营人员:
(1)根据用户提交的订单统计指定时间段的总投保人数和总投保金额,辅助优化运营方案。
(2)对用户的日常行为做出分析,分析出每个用户比较关注的信息,作为推荐系统的数据来源。
b:业务经理:
对重点客户的投保金额变动进行监控,将投保金额变动较大的重点客户推送给业务经理,业务经理针对性开展客户挽留等操作。

技术架构

image.png
架构解析:
数据采集:该场景中,数仓的数据主要来源于APP等系统的埋点信息,被实时采集至DATAHUB作为Flink的输入数据。
实时数仓架构:该场景中,整个实时数仓的ETL和BI部分的构建,全部通过Flink完成,Flink实时读取DATAHUB的数据进行处理,并与维表进行关联查询等操作,最终实时统计的结果输入到下游数据库RDS中。

业务指标

  • 运营数据分析

    • 每小时的投保人数
    • 每小时的总保费
    • 每小时总保单数
  • 用户行为监控

    • 用户原投保金额
    • 用户现投保金额
  • 用户行为分析

    • 用户最后访问的页面类型
    • 用户最后访问的页面地址

数据结构

场景一:运营数据分析

本场景用于计算每小时总投保人数和总保费。
用户投保会生成一份订单,订单内容包括用户id、用户姓名、订单号等。flink实时读取订单信息,用where过滤出大于当前小时时间段的数据(数据过滤),然后根据用户id做分组用last_value函数获取每个用户最终生成的订单信息(订单去重),最后按照小时维度聚合统计当前小时的总保费和总投保人数。

输入表

CREATE   TABLE  user_order
(
    id                          bigint    comment '用户id'
    ,order_no                    varchar    comment '订单号'
    ,order_type                  bigint    comment '订单类型'
    ,pay_time                    bigint  comment '支付时间'
    ,order_price                 double    comment '订单价格'
    ,customer_name               varchar    comment '用户姓名'
    ,customer_tel                varchar    comment '用户电话'
    ,certificate_no              varchar    comment '证件号码'
    ,gmt_created                 bigint  comment '创建时间'
    ,gmt_modified                bigint  comment '修改时间'
    ,account_payble             double      comment '应付金额'

) WITH (
       type='datahub',
     topic='user_order'
       ...
)

输出表

CREATE    TABLE hs_order (
    biz_date              varchar COMMENT 'yyyymmddhh'
    ,total_premium         DOUBLE COMMENT '总保费'
    ,policy_cnt            BIGINT COMMENT '保单数'
    ,policy_holder_cnt     BIGINT COMMENT '投保人数'
    ,PRIMARY KEY (biz_date)
) WITH
 (
   type='rds',
   tableName='adm_pfm_zy_order_gmv_msx_hs'
   ...
 ) 
 ;

业务代码

1:数据清洗

create view  last_order
as 
select 
     id                                 as id               
    ,last_value(order_no)               as order_no                   
    ,last_value(customer_tel)           as customer_tel     
    ,last_value(gmt_modified)           as gmt_modified                      
    ,last_value(account_payble)         as account_payble   
    from user_order
    where gmt_modified  >= cast(UNIX_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'), 'yyyy-MM-dd')*1000 as bigint)
    group by id
;

2:数据汇总

insert into hs_order
select 
biz_date
,cast (total_premium as double) as total_premium
,cast (policy_cnt as bigint) as policy_cnt
,cast (policy_holder_cnt as bigint) as policy_holder_cnt
from (
select 
    from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH')      as biz_date
    ,sum(coalesce(account_payble,0))  as total_premium
    ,count(distinct order_no)   as policy_cnt
    ,count(distinct customer_tel)  as policy_holder_cnt
from  last_order a 
group by 
from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH')
)a 
;

场景二:用户行为监控

本场景对投保金额变动较大(总保额变动大于15%)的重点用户进行监控。
Flink实时读取用户新建订单数据,新建订单包括用户的id和现投保金额,通过where过滤没有保存成功的订单。维表中存储了业务经理关注的重点用户数据(如原投保金额),通过新建订单中的用户id与维表进行关联查询,如果维表中存在此用户且总保额下降15%以上,则将该用户的详细信息推送给业务经理,并且在维表中更新该用户投保金额及投保信息。

输入表

create table update_info
(
 id             bigint      comment '用户id'
,channel        varchar     comment '渠道编号'
,open_id        varchar     comment '订单id'
,event          varchar     comment '事件类型'
,now_premium  varchar     comment '现投保金额'
,creator        varchar     comment '创建人'
,modifier       varchar     comment '最后修改人'
,gmt_modified   bigint      comment '修改时间'
,now_info       varchar     comment '现投保信息'
) with (
    type = 'datahub',
    topic = 'dh_prd_dm_account_wechat_trace'
    ...
   
);

维表

 create table raw_info
(
     id                 bigint  comment '用户id'
    ,raw_premium      varchar  comment '原投保金额'
    ,raw_info           varchar  comment '原投保信息'
    ,PRIMARY KEY(id)
    ,PERIOD FOR SYSTEM_TIME
) WITH (
    type='ots',
    tableName='adm_zy_acct_sub_wechat_list'
    ...
);

输出表

create table update_info
(
     id               bigint  comment '用户id'
    ,raw_info         varchar comment '原投保信息'
    ,now_info         varchar comment '现投保信息'
    ,raw_premium      varchar comment '原投保金额'
    ,now_premium      varchar comment '现投保金额'
    ,PRIMARY KEY(id)
) WITH (
    type='rds',
    tableName='wechat_activity_user'
    ...
);

业务代码:

create view info_join as 
select
      t1.id               as  id
    ,t2.raw_info          as  raw_info
    ,t1.now_info          as  now_info  
    ,t2.raw_premium     as raw_premium
    ,t1.now_premium     as now_premium
from update_info t1
inner join raw_info FOR SYSTEM_TIME AS OF PROCTIME() as t2
on t1.id = t2.id ;
insert into update_info
select 
     id                        as id  
    ,raw_info                  as raw_info
    ,now_info                  as now_info
    ,raw_premium               as raw_premium  
    ,now_premium               as now_premium  
from info_join where now_premium<raw_premium*0.85
;
insert into raw_info
select 
     id                        as id  
    ,now_premium               as raw_premium  
    ,now_info                  as raw_info
from info_join
;

场景三:用户行为分析

本场景记录用户最后访问的页面名称和类型,作为用户画像的特征值。
Flink读取用户浏览APP页面的日志信息,如用户id、页面名称和页面类型等信息,根据用户id和设备id进行分组,通过last_value函数获取用户最后一次访问页面的名称和类型,输出到RDS作为推荐系统的输入数据,在下次用户登录的时候为其推送相关广告信息,提升用户广告点击率和下单的成功率。

输入表


create table user_log
(
 log_time                bigint  comment '日志采集日期(Linux时间)' 
,device_id               varchar  comment '设备id'
,account_id              varchar  comment '账户id'
,bury_point_type         varchar  comment '页面类型或埋点类型'
,page_name               varchar  comment '页面名称或埋点时一级目录'
) WITH (
    type = 'datahub',
    topic = 'edw_zy_evt_bury_point_log'
    ...
);

输出表

create table user_last_log
(
     account_id         varchar  comment 'account_id'
    ,device_id          varchar    comment  '设备id'
    ,bury_point_type    varchar  comment '页面类型'
    ,page_name          varchar  comment '页面名称'
    ,primary key(account_id)
) WITH (
    type='rds',
    tableName='adm_zy_moblie_charge_exchg_rs'
    ...
    
);

业务代码


insert into user_last_log
select
    account_id
    ,device_id
    ,last_value(bury_point_type)  as bury_point_type
    ,last_value(page_name)  as page_name
from user_log
where account_id is not null 
group by account_id,device_id

实时计算 Flink 版产品交流群

test

阿里云实时计算Flink - 解决方案:
https://developer.aliyun.com/article/765097
阿里云实时计算Flink - 场景案例:
https://ververica.cn/corporate-practice
阿里云实时计算Flink - 产品详情页:
https://www.aliyun.com/product/bigdata/product/sc

相关文章
|
11月前
|
存储 消息中间件 分布式计算
Hologres实时数仓在B站游戏的建设与实践
本文介绍了B站游戏业务中实时数据仓库的构建与优化过程。为满足日益增长的数据实时性需求,采用了Hologres作为核心组件优化传统Lambda架构,实现了存储层面的流批一体化及离线-实时数据的无缝衔接。文章详细描述了架构选型、分层设计(ODS、DWD、DIM、ADS)及关键技术挑战的解决方法,如高QPS点查、数据乱序重写等。目前,该实时数仓已广泛应用于运营分析、广告投放等多个场景,并计划进一步完善实时指标体系、扩展明细层应用及研发数据实时解析能力。
Hologres实时数仓在B站游戏的建设与实践
|
存储 分布式计算 MaxCompute
Hologres实时湖仓能力入门实践
本文由武润雪(栩染)撰写,介绍Hologres 3.0版本作为一体化实时湖仓平台的升级特性。其核心能力包括湖仓存储一体、多模式计算一体、分析服务一体及Data+AI一体,极大提升数据开发效率。文章详细解析了两种湖仓架构:MaxCompute + Hologres实现离线实时一体化,以及Hologres + DLF + OSS构建开放湖仓架构,并深入探讨元数据抽象、权限互通等重点功能,同时提供具体使用说明与Demo演示。
|
SQL 弹性计算 运维
Hologres计算组实例&分时弹性入门实践
本文由骆撷冬(Hologres PD)撰写,围绕Hologres计算组实例与分时弹性的入门实践展开。内容分为三部分:第一部分介绍Hologres计算组实例的原理与架构,解决负载隔离、资源浪费、大任务和运维难题;第二部分演示计算组实例的入门实践,包括管理、授权、连接及监控等操作;第三部分讲解分时弹性的使用,涵盖配置方法、成本优化及监控告警。通过具体案例与操作步骤,帮助用户更好地理解和应用Hologres的弹性计算能力。
|
7月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
694 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
SQL 运维 网络安全
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
|
11月前
|
存储 消息中间件 Java
抖音集团电商流量实时数仓建设实践
本文基于抖音集团电商数据工程师姚遥在Flink Forward Asia 2024的分享,围绕电商流量数据处理展开。内容涵盖业务挑战、电商流量建模架构、流批一体实践、大流量任务调优及总结展望五个部分。通过数据建模与优化,实现效率、质量、成本和稳定性全面提升,数据质量达99%以上,任务性能提升70%。未来将聚焦自动化、低代码化与成本优化,探索更高效的流批一体化方案。
688 12
抖音集团电商流量实时数仓建设实践
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
4216 74
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
852 1
Flink CDC + Hologres高性能数据同步优化实践
|
SQL 监控 关系型数据库
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
本文整理自用友畅捷通数据架构师王龙强在FFA2024上的分享,介绍了公司在Flink上构建实时数仓的经验。内容涵盖业务背景、数仓建设、当前挑战、最佳实践和未来展望。随着数据量增长,公司面临数据库性能瓶颈及实时数据处理需求,通过引入Flink技术逐步解决了数据同步、链路稳定性和表结构差异等问题,并计划在未来进一步优化链路稳定性、探索湖仓一体架构以及结合AI技术推进数据资源高效利用。
894 25
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践

相关产品

  • 实时计算 Flink版