【最佳实践】实时计算 Flink 版在金融行业的实时数仓建设实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 金融是现代经济的核心。我国金融业在市场化改革和对外开放中不断发展,金融总量大幅增长。金融稳定直接关系到国家经济发展的前途和命运,金融业是国民经济发展的晴雨表。对我国金融业发展现状进行客观分析,对金融业发展趋势进行探索,有助于消除金融隐患,使金融业朝着健康、有序方向发展。

行业背景

  • 行业现状: 

金融是现代经济的核心。我国金融业在市场化改革和对外开放中不断发展,金融总量大幅增长。金融稳定直接关系到国家经济发展的前途和命运,金融业是国民经济发展的晴雨表。对我国金融业发展现状进行客观分析,对金融业发展趋势进行探索,有助于消除金融隐患,使金融业朝着健康、有序方向发展。

  • 大数据在其行业中的作用:

    1. 金融服务和产品创新:借助社交网络等平台产生的海量用户和数据记录着用户群体的兴趣和偏好情绪等信息, 对客户行为模式进行分析,可以带来更贴近客户需求的产品创新。
    2. 增强用户体验:通过大数据分析对客户进行画像,结合客户画像特征,为用户提供个性化服务,增强用户体验。

业务场景

某保险公司开发了个金融类APP,该公司在APP中会投放保险广告、发布优惠活动,用户通过APP进行投保等操作。
业务的构建涉及到几个端:

  1. APP:应用程序,用户访问入口,用户通过APP点击浏览保险广告、优惠活动等,并进行投保下单。
  2. 后台系统:

a.运营人员:
(1)根据用户提交的订单统计指定时间段的总投保人数和总投保金额,辅助优化运营方案。
(2)对用户的日常行为做出分析,分析出每个用户比较关注的信息,作为推荐系统的数据来源。
b:业务经理:
对重点客户的投保金额变动进行监控,将投保金额变动较大的重点客户推送给业务经理,业务经理针对性开展客户挽留等操作。

技术架构

image.png
架构解析:
数据采集:该场景中,数仓的数据主要来源于APP等系统的埋点信息,被实时采集至DATAHUB作为Flink的输入数据。
实时数仓架构:该场景中,整个实时数仓的ETL和BI部分的构建,全部通过Flink完成,Flink实时读取DATAHUB的数据进行处理,并与维表进行关联查询等操作,最终实时统计的结果输入到下游数据库RDS中。

业务指标

  • 运营数据分析

    • 每小时的投保人数
    • 每小时的总保费
    • 每小时总保单数
  • 用户行为监控

    • 用户原投保金额
    • 用户现投保金额
  • 用户行为分析

    • 用户最后访问的页面类型
    • 用户最后访问的页面地址

数据结构

场景一:运营数据分析

本场景用于计算每小时总投保人数和总保费。
用户投保会生成一份订单,订单内容包括用户id、用户姓名、订单号等。flink实时读取订单信息,用where过滤出大于当前小时时间段的数据(数据过滤),然后根据用户id做分组用last_value函数获取每个用户最终生成的订单信息(订单去重),最后按照小时维度聚合统计当前小时的总保费和总投保人数。

输入表

CREATE   TABLE  user_order
(
    id                          bigint    comment '用户id'
    ,order_no                    varchar    comment '订单号'
    ,order_type                  bigint    comment '订单类型'
    ,pay_time                    bigint  comment '支付时间'
    ,order_price                 double    comment '订单价格'
    ,customer_name               varchar    comment '用户姓名'
    ,customer_tel                varchar    comment '用户电话'
    ,certificate_no              varchar    comment '证件号码'
    ,gmt_created                 bigint  comment '创建时间'
    ,gmt_modified                bigint  comment '修改时间'
    ,account_payble             double      comment '应付金额'

) WITH (
       type='datahub',
     topic='user_order'
       ...
)

输出表

CREATE    TABLE hs_order (
    biz_date              varchar COMMENT 'yyyymmddhh'
    ,total_premium         DOUBLE COMMENT '总保费'
    ,policy_cnt            BIGINT COMMENT '保单数'
    ,policy_holder_cnt     BIGINT COMMENT '投保人数'
    ,PRIMARY KEY (biz_date)
) WITH
 (
   type='rds',
   tableName='adm_pfm_zy_order_gmv_msx_hs'
   ...
 ) 
 ;

业务代码

1:数据清洗

create view  last_order
as 
select 
     id                                 as id               
    ,last_value(order_no)               as order_no                   
    ,last_value(customer_tel)           as customer_tel     
    ,last_value(gmt_modified)           as gmt_modified                      
    ,last_value(account_payble)         as account_payble   
    from user_order
    where gmt_modified  >= cast(UNIX_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'), 'yyyy-MM-dd')*1000 as bigint)
    group by id
;

2:数据汇总

insert into hs_order
select 
biz_date
,cast (total_premium as double) as total_premium
,cast (policy_cnt as bigint) as policy_cnt
,cast (policy_holder_cnt as bigint) as policy_holder_cnt
from (
select 
    from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH')      as biz_date
    ,sum(coalesce(account_payble,0))  as total_premium
    ,count(distinct order_no)   as policy_cnt
    ,count(distinct customer_tel)  as policy_holder_cnt
from  last_order a 
group by 
from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH')
)a 
;

场景二:用户行为监控

本场景对投保金额变动较大(总保额变动大于15%)的重点用户进行监控。
Flink实时读取用户新建订单数据,新建订单包括用户的id和现投保金额,通过where过滤没有保存成功的订单。维表中存储了业务经理关注的重点用户数据(如原投保金额),通过新建订单中的用户id与维表进行关联查询,如果维表中存在此用户且总保额下降15%以上,则将该用户的详细信息推送给业务经理,并且在维表中更新该用户投保金额及投保信息。

输入表

create table update_info
(
 id             bigint      comment '用户id'
,channel        varchar     comment '渠道编号'
,open_id        varchar     comment '订单id'
,event          varchar     comment '事件类型'
,now_premium  varchar     comment '现投保金额'
,creator        varchar     comment '创建人'
,modifier       varchar     comment '最后修改人'
,gmt_modified   bigint      comment '修改时间'
,now_info       varchar     comment '现投保信息'
) with (
    type = 'datahub',
    topic = 'dh_prd_dm_account_wechat_trace'
    ...
   
);

维表

 create table raw_info
(
     id                 bigint  comment '用户id'
    ,raw_premium      varchar  comment '原投保金额'
    ,raw_info           varchar  comment '原投保信息'
    ,PRIMARY KEY(id)
    ,PERIOD FOR SYSTEM_TIME
) WITH (
    type='ots',
    tableName='adm_zy_acct_sub_wechat_list'
    ...
);

输出表

create table update_info
(
     id               bigint  comment '用户id'
    ,raw_info         varchar comment '原投保信息'
    ,now_info         varchar comment '现投保信息'
    ,raw_premium      varchar comment '原投保金额'
    ,now_premium      varchar comment '现投保金额'
    ,PRIMARY KEY(id)
) WITH (
    type='rds',
    tableName='wechat_activity_user'
    ...
);

业务代码:

create view info_join as 
select
      t1.id               as  id
    ,t2.raw_info          as  raw_info
    ,t1.now_info          as  now_info  
    ,t2.raw_premium     as raw_premium
    ,t1.now_premium     as now_premium
from update_info t1
inner join raw_info FOR SYSTEM_TIME AS OF PROCTIME() as t2
on t1.id = t2.id ;
insert into update_info
select 
     id                        as id  
    ,raw_info                  as raw_info
    ,now_info                  as now_info
    ,raw_premium               as raw_premium  
    ,now_premium               as now_premium  
from info_join where now_premium<raw_premium*0.85
;
insert into raw_info
select 
     id                        as id  
    ,now_premium               as raw_premium  
    ,now_info                  as raw_info
from info_join
;

场景三:用户行为分析

本场景记录用户最后访问的页面名称和类型,作为用户画像的特征值。
Flink读取用户浏览APP页面的日志信息,如用户id、页面名称和页面类型等信息,根据用户id和设备id进行分组,通过last_value函数获取用户最后一次访问页面的名称和类型,输出到RDS作为推荐系统的输入数据,在下次用户登录的时候为其推送相关广告信息,提升用户广告点击率和下单的成功率。

输入表


create table user_log
(
 log_time                bigint  comment '日志采集日期(Linux时间)' 
,device_id               varchar  comment '设备id'
,account_id              varchar  comment '账户id'
,bury_point_type         varchar  comment '页面类型或埋点类型'
,page_name               varchar  comment '页面名称或埋点时一级目录'
) WITH (
    type = 'datahub',
    topic = 'edw_zy_evt_bury_point_log'
    ...
);

输出表

create table user_last_log
(
     account_id         varchar  comment 'account_id'
    ,device_id          varchar    comment  '设备id'
    ,bury_point_type    varchar  comment '页面类型'
    ,page_name          varchar  comment '页面名称'
    ,primary key(account_id)
) WITH (
    type='rds',
    tableName='adm_zy_moblie_charge_exchg_rs'
    ...
    
);

业务代码


insert into user_last_log
select
    account_id
    ,device_id
    ,last_value(bury_point_type)  as bury_point_type
    ,last_value(page_name)  as page_name
from user_log
where account_id is not null 
group by account_id,device_id

实时计算 Flink 版产品交流群

test

阿里云实时计算Flink - 解决方案:
https://developer.aliyun.com/article/765097
阿里云实时计算Flink - 场景案例:
https://ververica.cn/corporate-practice
阿里云实时计算Flink - 产品详情页:
https://www.aliyun.com/product/bigdata/product/sc

相关文章
|
28天前
|
运维 监控 安全
选择主题1:实时计算Flink版最佳实践测评
本文介绍了使用实时计算Flink版进行用户行为分析的实践,涵盖用户行为趋势、留存分析、用户画像构建及异常检测等方面。与自建Flink集群相比,实时计算Flink版在稳定性、性能、开发运维和安全能力上表现更优,且显著降低了企业的IT支出和运维成本,提升了业务决策效率和系统可靠性,是企业级应用的理想选择。
76 32
|
30天前
|
存储 数据采集 大数据
Flink实时湖仓,为汽车行业数字化加速!
本文由阿里云计算平台产品专家李鲁兵(云觉)分享,聚焦汽车行业大数据应用。内容涵盖市场趋势、典型大数据架构、产品市场地位及能力解读,以及典型客户案例。文章详细介绍了新能源汽车市场的快速增长、大数据架构分析、实时湖仓方案的优势,以及Flink和Paimon在车联网中的应用案例。
168 8
Flink实时湖仓,为汽车行业数字化加速!
|
17天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
48 1
|
28天前
|
运维 监控 安全
实时计算 Flink 版最佳实践测评
本文介绍了结合电商平台用户行为数据的实时计算Flink版实践,涵盖用户行为分析、标签画像构建、业务指标监控和数据分析预测等场景。文章还对比了实时计算Flink版与其他引擎及自建Flink集群在稳定性、性能、开发运维和安全能力方面的差异,分析了其成本与收益。最后,文章评估了实时计算Flink版的产品内引导、文档帮助、功能满足情况,并提出了针对不同业务场景的改进建议和与其他产品的联动可能性。
51 2
|
2月前
|
存储 数据采集 OLAP
饿了么基于Flink+Paimon+StarRocks的实时湖仓探索
饿了么的实时数仓经历了多个阶段的演进。初期通过实时ETL、报表应用、联动及监控构建基础架构,随后形成了涵盖数据采集、加工和服务的整体数据架构。1.0版本通过日志和Binlog采集数据,但在研发效率和数据一致性方面存在问题。2.0版本通过Dataphin构建流批一体化系统,提升了数据一致性和研发效率,但仍面临新业务适应性等问题。最终,饿了么选择Paimon和StarRocks作为实时湖仓方案,显著降低了存储成本并提高了系统稳定性。未来,将进一步优化带宽瓶颈、小文件问题及权限控制,实现更多场景的应用。
325 7
饿了么基于Flink+Paimon+StarRocks的实时湖仓探索
|
2月前
|
SQL 运维 监控
实时计算Flink版最佳实践测评报告
本报告旨在评估阿里云实时计算Flink版在实际应用中的表现,通过一系列的测试和分析来探讨其在稳定性、性能、开发运维及安全性方面的优势。同时,我们将结合具体的业务场景,如用户行为分析、标签画像构建等,来说明其实时数据处理能力,并对比自建Flink集群以及其他实时计算引擎。最后,从成本效益的角度出发,讨论采用全托管服务对企业运营的影响。
64 13
|
27天前
|
消息中间件 运维 分布式计算
实时计算Flink版最佳实践测评
本文介绍了使用阿里云实时计算Flink版进行用户行为分析的实践,详细探讨了其在性能、稳定性和成本方面的优势,以及与自建Flink集群的对比。通过实时计算,能够快速发现用户行为模式,优化产品功能,提升用户体验和市场竞争力。文章还提到了产品的易用性、功能满足度及改进建议,并与其他Flink实时计算产品进行了对比,强调了Flink在实时处理方面的优势。
|
2月前
|
存储 运维 监控
实时计算Flink版最佳实践测评
实时计算Flink版最佳实践测评
93 1
|
2月前
|
消息中间件 Java 数据库连接
Hologres 数据导入与导出的最佳实践
【9月更文第1天】Hologres 是一款高性能的实时数仓服务,旨在提供快速的数据分析能力。无论是从外部数据源导入数据还是将数据导出至其他系统,都需要确保过程既高效又可靠。本文将详细介绍如何有效地导入数据到 Hologres 中,以及如何从 Hologres 导出数据。
93 1
|
3月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之使用CTAS同步MySQL到Hologres时出现的时区差异,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 实时计算 Flink版