【最佳实践】实时计算 Flink 版在金融行业的实时数仓建设实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 金融是现代经济的核心。我国金融业在市场化改革和对外开放中不断发展,金融总量大幅增长。金融稳定直接关系到国家经济发展的前途和命运,金融业是国民经济发展的晴雨表。对我国金融业发展现状进行客观分析,对金融业发展趋势进行探索,有助于消除金融隐患,使金融业朝着健康、有序方向发展。

行业背景

  • 行业现状: 

金融是现代经济的核心。我国金融业在市场化改革和对外开放中不断发展,金融总量大幅增长。金融稳定直接关系到国家经济发展的前途和命运,金融业是国民经济发展的晴雨表。对我国金融业发展现状进行客观分析,对金融业发展趋势进行探索,有助于消除金融隐患,使金融业朝着健康、有序方向发展。

  • 大数据在其行业中的作用:

    1. 金融服务和产品创新:借助社交网络等平台产生的海量用户和数据记录着用户群体的兴趣和偏好情绪等信息, 对客户行为模式进行分析,可以带来更贴近客户需求的产品创新。
    2. 增强用户体验:通过大数据分析对客户进行画像,结合客户画像特征,为用户提供个性化服务,增强用户体验。

业务场景

某保险公司开发了个金融类APP,该公司在APP中会投放保险广告、发布优惠活动,用户通过APP进行投保等操作。
业务的构建涉及到几个端:

  1. APP:应用程序,用户访问入口,用户通过APP点击浏览保险广告、优惠活动等,并进行投保下单。
  2. 后台系统:

a.运营人员:
(1)根据用户提交的订单统计指定时间段的总投保人数和总投保金额,辅助优化运营方案。
(2)对用户的日常行为做出分析,分析出每个用户比较关注的信息,作为推荐系统的数据来源。
b:业务经理:
对重点客户的投保金额变动进行监控,将投保金额变动较大的重点客户推送给业务经理,业务经理针对性开展客户挽留等操作。

技术架构

image.png
架构解析:
数据采集:该场景中,数仓的数据主要来源于APP等系统的埋点信息,被实时采集至DATAHUB作为Flink的输入数据。
实时数仓架构:该场景中,整个实时数仓的ETL和BI部分的构建,全部通过Flink完成,Flink实时读取DATAHUB的数据进行处理,并与维表进行关联查询等操作,最终实时统计的结果输入到下游数据库RDS中。

业务指标

  • 运营数据分析

    • 每小时的投保人数
    • 每小时的总保费
    • 每小时总保单数
  • 用户行为监控

    • 用户原投保金额
    • 用户现投保金额
  • 用户行为分析

    • 用户最后访问的页面类型
    • 用户最后访问的页面地址

数据结构

场景一:运营数据分析

本场景用于计算每小时总投保人数和总保费。
用户投保会生成一份订单,订单内容包括用户id、用户姓名、订单号等。flink实时读取订单信息,用where过滤出大于当前小时时间段的数据(数据过滤),然后根据用户id做分组用last_value函数获取每个用户最终生成的订单信息(订单去重),最后按照小时维度聚合统计当前小时的总保费和总投保人数。

输入表

CREATE   TABLE  user_order
(
    id                          bigint    comment '用户id'
    ,order_no                    varchar    comment '订单号'
    ,order_type                  bigint    comment '订单类型'
    ,pay_time                    bigint  comment '支付时间'
    ,order_price                 double    comment '订单价格'
    ,customer_name               varchar    comment '用户姓名'
    ,customer_tel                varchar    comment '用户电话'
    ,certificate_no              varchar    comment '证件号码'
    ,gmt_created                 bigint  comment '创建时间'
    ,gmt_modified                bigint  comment '修改时间'
    ,account_payble             double      comment '应付金额'

) WITH (
       type='datahub',
     topic='user_order'
       ...
)

输出表

CREATE    TABLE hs_order (
    biz_date              varchar COMMENT 'yyyymmddhh'
    ,total_premium         DOUBLE COMMENT '总保费'
    ,policy_cnt            BIGINT COMMENT '保单数'
    ,policy_holder_cnt     BIGINT COMMENT '投保人数'
    ,PRIMARY KEY (biz_date)
) WITH
 (
   type='rds',
   tableName='adm_pfm_zy_order_gmv_msx_hs'
   ...
 ) 
 ;

业务代码

1:数据清洗

create view  last_order
as 
select 
     id                                 as id               
    ,last_value(order_no)               as order_no                   
    ,last_value(customer_tel)           as customer_tel     
    ,last_value(gmt_modified)           as gmt_modified                      
    ,last_value(account_payble)         as account_payble   
    from user_order
    where gmt_modified  >= cast(UNIX_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'), 'yyyy-MM-dd')*1000 as bigint)
    group by id
;

2:数据汇总

insert into hs_order
select 
biz_date
,cast (total_premium as double) as total_premium
,cast (policy_cnt as bigint) as policy_cnt
,cast (policy_holder_cnt as bigint) as policy_holder_cnt
from (
select 
    from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH')      as biz_date
    ,sum(coalesce(account_payble,0))  as total_premium
    ,count(distinct order_no)   as policy_cnt
    ,count(distinct customer_tel)  as policy_holder_cnt
from  last_order a 
group by 
from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH')
)a 
;

场景二:用户行为监控

本场景对投保金额变动较大(总保额变动大于15%)的重点用户进行监控。
Flink实时读取用户新建订单数据,新建订单包括用户的id和现投保金额,通过where过滤没有保存成功的订单。维表中存储了业务经理关注的重点用户数据(如原投保金额),通过新建订单中的用户id与维表进行关联查询,如果维表中存在此用户且总保额下降15%以上,则将该用户的详细信息推送给业务经理,并且在维表中更新该用户投保金额及投保信息。

输入表

create table update_info
(
 id             bigint      comment '用户id'
,channel        varchar     comment '渠道编号'
,open_id        varchar     comment '订单id'
,event          varchar     comment '事件类型'
,now_premium  varchar     comment '现投保金额'
,creator        varchar     comment '创建人'
,modifier       varchar     comment '最后修改人'
,gmt_modified   bigint      comment '修改时间'
,now_info       varchar     comment '现投保信息'
) with (
    type = 'datahub',
    topic = 'dh_prd_dm_account_wechat_trace'
    ...
   
);

维表

 create table raw_info
(
     id                 bigint  comment '用户id'
    ,raw_premium      varchar  comment '原投保金额'
    ,raw_info           varchar  comment '原投保信息'
    ,PRIMARY KEY(id)
    ,PERIOD FOR SYSTEM_TIME
) WITH (
    type='ots',
    tableName='adm_zy_acct_sub_wechat_list'
    ...
);

输出表

create table update_info
(
     id               bigint  comment '用户id'
    ,raw_info         varchar comment '原投保信息'
    ,now_info         varchar comment '现投保信息'
    ,raw_premium      varchar comment '原投保金额'
    ,now_premium      varchar comment '现投保金额'
    ,PRIMARY KEY(id)
) WITH (
    type='rds',
    tableName='wechat_activity_user'
    ...
);

业务代码:

create view info_join as 
select
      t1.id               as  id
    ,t2.raw_info          as  raw_info
    ,t1.now_info          as  now_info  
    ,t2.raw_premium     as raw_premium
    ,t1.now_premium     as now_premium
from update_info t1
inner join raw_info FOR SYSTEM_TIME AS OF PROCTIME() as t2
on t1.id = t2.id ;
insert into update_info
select 
     id                        as id  
    ,raw_info                  as raw_info
    ,now_info                  as now_info
    ,raw_premium               as raw_premium  
    ,now_premium               as now_premium  
from info_join where now_premium<raw_premium*0.85
;
insert into raw_info
select 
     id                        as id  
    ,now_premium               as raw_premium  
    ,now_info                  as raw_info
from info_join
;

场景三:用户行为分析

本场景记录用户最后访问的页面名称和类型,作为用户画像的特征值。
Flink读取用户浏览APP页面的日志信息,如用户id、页面名称和页面类型等信息,根据用户id和设备id进行分组,通过last_value函数获取用户最后一次访问页面的名称和类型,输出到RDS作为推荐系统的输入数据,在下次用户登录的时候为其推送相关广告信息,提升用户广告点击率和下单的成功率。

输入表


create table user_log
(
 log_time                bigint  comment '日志采集日期(Linux时间)' 
,device_id               varchar  comment '设备id'
,account_id              varchar  comment '账户id'
,bury_point_type         varchar  comment '页面类型或埋点类型'
,page_name               varchar  comment '页面名称或埋点时一级目录'
) WITH (
    type = 'datahub',
    topic = 'edw_zy_evt_bury_point_log'
    ...
);

输出表

create table user_last_log
(
     account_id         varchar  comment 'account_id'
    ,device_id          varchar    comment  '设备id'
    ,bury_point_type    varchar  comment '页面类型'
    ,page_name          varchar  comment '页面名称'
    ,primary key(account_id)
) WITH (
    type='rds',
    tableName='adm_zy_moblie_charge_exchg_rs'
    ...
    
);

业务代码


insert into user_last_log
select
    account_id
    ,device_id
    ,last_value(bury_point_type)  as bury_point_type
    ,last_value(page_name)  as page_name
from user_log
where account_id is not null 
group by account_id,device_id

实时计算 Flink 版产品交流群

test

阿里云实时计算Flink - 解决方案:
https://developer.aliyun.com/article/765097
阿里云实时计算Flink - 场景案例:
https://ververica.cn/corporate-practice
阿里云实时计算Flink - 产品详情页:
https://www.aliyun.com/product/bigdata/product/sc

相关文章
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
1月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1161 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
1月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
155 56
|
9天前
|
DataWorks 数据挖掘 大数据
方案实践测评 | DataWorks集成Hologres构建一站式高性能的OLAP数据分析
DataWorks在任务开发便捷性、任务运行速度、产品使用门槛等方面都表现出色。在数据处理场景方面仍有改进和扩展的空间,通过引入更多的智能技术、扩展数据源支持、优化任务调度和可视化功能以及提升团队协作效率,DataWorks将能够为企业提供更全面、更高效的数据处理解决方案。
|
1月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
356 4
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
2月前
|
存储 数据采集 大数据
Flink实时湖仓,为汽车行业数字化加速!
本文由阿里云计算平台产品专家李鲁兵(云觉)分享,聚焦汽车行业大数据应用。内容涵盖市场趋势、典型大数据架构、产品市场地位及能力解读,以及典型客户案例。文章详细介绍了新能源汽车市场的快速增长、大数据架构分析、实时湖仓方案的优势,以及Flink和Paimon在车联网中的应用案例。
193 8
Flink实时湖仓,为汽车行业数字化加速!
|
1月前
|
SQL 运维 数据可视化
阿里云实时计算Flink版产品体验测评
阿里云实时计算Flink基于Apache Flink构建,提供一站式实时大数据分析平台,支持端到端亚秒级实时数据分析,适用于实时大屏、实时报表、实时ETL和风控监测等场景,具备高性价比、开发效率、运维管理和企业安全等优势。
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
67 1
|
2月前
|
运维 搜索推荐 数据安全/隐私保护
阿里云实时计算Flink版测评报告
阿里云实时计算Flink版在用户行为分析与标签画像场景中表现出色,通过实时处理电商平台用户行为数据,生成用户兴趣偏好和标签,提升推荐系统效率。该服务具备高稳定性、低延迟、高吞吐量,支持按需计费,显著降低运维成本,提高开发效率。
82 1
|
2月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版

相关产品

  • 实时计算 Flink版
  • 下一篇
    DataWorks