1. 目标读者
互联网/游戏行业具有数据化运营分析需求企业的CTO,架构师。
企业运营系统产品,运营系统研发。
数据分析师,ClickHouse 数仓用户。
2. 游戏行业用户行为分析需求背景
2.1 业务背景
游戏运营工作主要围绕流量建设与用户维系。流量建设通常是各种渠道和营销活动来提升网站的流量指标,形成流量蓄水池,也就是游戏企业的种子客户。用户维系重点关注在流量和客户进入后持续有效地推动用户的活跃与留存,并发现有价值的、甚至高价值的用户,因为这些用户会持续为产品带来价值、产生收益。
通过对用户行为数据的分析,灵活运用事件分析模型、留存模型、转化漏斗。了解用户为什么来、为什么走、为什么活跃、为什么留存。对新用户的增长,已有用户的活跃和留存,活跃用户促付费,流失用户的挽回都有对应的措施,让所有的决策有迹可循而不是“拍脑袋”,才能真正提升活跃、留存和付费。
2.2 系统需求
基于以上需求游戏厂商通常都会建设用户行为分析系统,对用户在进入游戏后各个环节的行为数据收集和分析,分析用户在游戏过程中的登录时间,登录频次,关卡留存率,道具购买,购买群体,留存群体等,依据分析结果进行下一步的运营活动,优化产品和运营方式,从而提高用户的活跃度和留存率,进而提升游戏的商业化价值转化。
3. 系统架构和产品方案选型
3.1 产品诉求
用户行为数据通常会包含用户的历史行数据和实时行为数据,数量可以达到PB级别,游戏业务高峰期实时写入量也非常巨大。此外游戏用户的行为类型和用户等级,性别,游戏等级等个人标签信息非常之多,因此运营人员分析的维度也会非常的丰富,通过多维度的分析探索对比能得到更加有意义的指标。
基于以上的业务需求,因此用户行为分析系统构建选型的核心产品技术诉求可以归为三个问题:
如何支持用户行为数据的采集和高效写入?
如何支持海量存储?
支持多维实时分析?
3.2 核心技术选型
基于分析系统的产品选型需求分析,核心解决是高效写入,海量存储,以及多维实时问题,概括定义为本质是实时分析数仓选型。云数据库ClickHouse 是基于开源 ClickHouse 提供的分布式实时分析型列式数据库服务。具有高性能、开箱即用、企业特性支持。广泛应用于流量分析、广告营销分析、行为分析、人群划分、客户画像、敏捷BI、数据集市、网络监控、分布式服务和链路监控等业务场景。具有如下几个方面的产品优势:
高效读取,SIMD 高效指令集、向量化执行引擎,较传统方式 100-1000倍查询性能提升,参看性能测试文档。
高效写入,50MB~200MB/S 实时导入能力。
低成本存储,列级高压缩率支持,平均5:1的数据压缩率。
简单易用,一键完成集群部署、副本配置、参数配置、网络安全配置和监控运维体系搭建,开箱即用,企业运维能力,支持数据重分布均衡扩容、多租户资源隔离、在线数据库管理、开发任务编排和全面数据链路打通等企业级特性支持。
稳定可靠,提供磁盘、CPU、内存、IOPS、数据库连接、读写量、TPS、ZK、慢SQL等全方位监控诊断体系支撑;数据库领域专家支持,实时解惑答疑。
基于以上技术诉求和产品特点匹配,首推使用云数据库ClickHouse 作为用户行为分析系统的核心分析产品来快速构建用户行为分析系统。
3.3 系统架构设计
基于云数据库ClickHouse 来设计用户行为分析系统,核心数据流程是用户行为日志写入,海量日志存储以及基于日志数据进行数仓多维分析,最后基于实时数仓进行在线交互式分析,整体系统架构设计如下图所示:
通过 kafka 进行用户实时行为数据的订阅,承接实时行为数据的分析。
通过 flink 对实时日志进行ETL清洗和业务预计算,减少数仓数据量,提高查询效率;如果业务强要求保留原始数据的存储,那么也可以不进行预聚合直接kafka数据订阅写入到ClickHouse
通过DataWorks 批量抽取离线大数据平台ODPS /MaxCompute 中的历史数据,支撑用户历史行为数据和标签数据采集分析。
通过云数据库ClickHouse 承载离线和实时数据的导入和实时写入,对接分析系统,提供实时分析能力。
4. 系统构建实践
4.1 第一步:数仓模型构建
用户的离线数据和实时行为数据最终需要在云数仓 ClickHouse 中分析,在阿里云官网购买云数据库ClickHouse 后,既可在其中基于数仓模型构建。为了数据能够有顺序地流转,设计者和使用者能够清晰地知道数据的整个声明周期,经典的数仓模型通常分为如下几个层面。我们也基于用户日志数据进行分层设计。
如下数据模型分层和对应的数据内容类别,读者可参考设计,其中涉及的具体表字段等只做demo 演示,无实质业务含义,用户可以基于自身业务模型参考设计。
ODS 层
Operation Data Store 数据准备区,也称为贴源层。数据源中的数据,经过抽取、洗净、传输,也就是ETL过程之后进入本层。
从kafka 订阅的原始类日志或者从离线大数据平台导入的原始日志数据就可以直接投递和写入到此类表中。
ReplicatedMergeTree 为多副本的引擎版本,可以实现服务的高可用
PARTITION BY 为数据分区标识,为了快速定位分析数据,提高请求效率,通常基于日期等进行数据分区,数据分区的粒度不能太小,太小的分区粒度会造成分区文件过多,Merge效率低,发生不可预期的OOM故障。
ClickHouse 也支持分布式集群模式,通过创建分布式表提高查询的效率和应用开发的简单性。分布式表的创建可以查看表创建文档,此处不详述。
CREATE TABLE user_action_log_ods(
`time` DateTime DEFAULT '1970-01-01 08:00:00' ,
`action_id` UInt16 ,
`action_name` String ,
`version` String ,
`uid` UInt64 DEFAULT 0 ,
`level` UInt32 DEFAULT 0 ,
`tag` UInt32 DEFAULT 0 ,
`uuid` String DEFAULT '' COMMENT '日志唯一id')
ENGINE = ReplicatedMergeTree('/clickhouse/tables/user_action_log_ods/{shard}', '{replica}') PARTITION BY (toYYYYMMDD(time)) ORDER BY (action_id, uid);
DWD层
data warehouse details 细节数据层,是业务层与数据仓库的隔离层。主要对ODS数据层做一些数据清洗和规范化的操作。主要是经过数据清洗:去除空值、脏数据、超过极限范围的基础数据。
CREATE TABLE user_action_log_dwd (
`time` DateTime DEFAULT '1970-01-01 08:00:00' COMMENT '日志时间',
`action_id` UInt16 DEFAULT 0 COMMENT '行为类型id', `
`uid` UInt64 DEFAULT 0 COMMENT '用户id', `
uuid` String DEFAULT '' COMMENT '日志唯一id')
ENGINE = ReplicatedMergeTree('/clickhouse/tables/user_action_log_dwd/{shard}', '{replica}')
PARTITION BY (toYYYYMMDD(time))
ORDER BY (action_id, uuid);
DWB:data warehouse base
数据基础层,存储的是客观数据,一般用作中间层,可以认为是大量指标的数据层。如如最近1个月用户付费订单明细表
CREATE TABLE user_action_dwb (
`time` DateTime DEFAULT '1970-01-01 08:00:00' COMMENT '日志时间',
`uid` UInt64 DEFAULT 0 COMMENT '用户id', `
`vip_level` int DEFAULT '',
`pay_count` int DEFAULT '付费金额',
uuid` String DEFAULT '' COMMENT '日志唯一id')
ENGINE = ReplicatedMergeTree('/clickhouse/tables/user_action_dwb/{shard}', '{replica}')
PARTITION BY (toYYYYMMDD(time))
ORDER BY (action_id, uuid) TTL time + toIntervalMonth(1);
DWS:data warehouse service
数据服务层,基于DWB上的基础数据,整合汇总成分析某一个主题域的服务数据层,一般是宽表。用于提供后续的业务查询,OLAP分析,数据分发等。用户行为,轻度聚合,主要对ODS/DWD层数据做一些轻度的汇总。如付费用户的月付费总金额表
CREATE TABLE user_pay_dws (
`time` DateTime DEFAULT '1970-01-01 08:00:00' COMMENT '日志时间',
`uid` UInt64 DEFAULT 0 COMMENT '用户id', `
`vip_level` int DEFAULT '',
`year_month` String DEFAULT '',
`pay_count` int DEFAULT '付费金额',
uuid` String DEFAULT '' COMMENT '日志唯一id')
ENGINE = ReplicatedMergeTree('/clickhouse/tables/user_pay_dws/{shard}', '{replica}')
PARTITION BY (toYYYYMMDD(time))
ORDER BY (action_id, uuid) TTL time + toIntervalMonth(1);
数据服务层/应用层(ADS)
ADS:applicationData Service应用数据服务,我们通过说的报表数据,或者说那种大宽表,一般就放在这里。如:留存信息表,用户标签表等,便于运营方随机抽取维度进行分析。读者可以根据自己分析业务主题进行组织。
4.2 第二步:离线数据链路构建
离线数据的主要来源于ODPS/Hdoop 体系,这一类平台可以承载长期大量离线数据的存储和分析。如基于用户的长期行为而产生的用户标签表等。
离线大数据治理平台 DataWorks 可以周期性批量同步到云数仓ClickHouse ,结合实时数据进行实时分析。避免大数据离线分析数据更新周期慢,分析效率低,分析维度固定的弊端。
如下介绍如何通过 DataWorks 平台进行离线数据的同步配置步骤:
第一步:在DataWorks数据源管理页面,创建 MaxCompute数据源 。
第二步创建 云数据库ClickHouse 数据源,详细参数配置说明,参考文档。
第三步:在DataWorks 数据开发页面创建业务流程和数据同步节点。
第四步:配置数据来源和数据去向。
第五步:保存并运行数据同步任务。
4.3 第三步:实时数据链路构建
基于kafka 和fink 的实时链路构建基于步骤构建
kafka 数据同步有多种方案,可以通过云ClickHouse kafka 引擎订阅 ,也可以通过开源组件 canal 订阅。以下演示基于ClickHouse Kafka引擎的订阅方式。
第一步:在云ClickHouse 数仓创建kafka消费表,示例代码如下
CREATE TABLE default.kafka_src_table ON CLUSTER default
( //定义表结构的字段
id Int32,
name String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'alikafka-post-cn-tl32i5sc****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-tl32i5sc****-2-vpc.alikafka.aliyuncs.com:9092',
kafka_topic_list = 'test',
kafka_group_name = 'test',
kafka_format = 'CSV',
kafka_num_consumers = 1,
kafka_max_block_size = 65536,
kafka_skip_broken_messages = 0,
kafka_auto_offset_reset = 'latest';
第二步:创建存储数据的ClickHouse 目的表
create table default.kafka_table_local ON CLUSTER default (
id Int32,
name String
) ENGINE = MergeTree()
ORDER BY (id);
第三步:创建VIEW将Kafka消费表消费到的数据同步到云数据库ClickHouse目的表。如果使用分布式模式,也可以创建目的分布式表,并同步消费。
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_local AS SELECT * FROM kafka_src_table;
对于具有实时业务数据清洗,无原始日志数据保留的需求,追求聚合查询效率的可以基于flink 进行kafka数据订阅处理,并写入到ClickHouse 。
第一步:flink 订阅kafka 数据,基于flink SQL 创建kafka 源表,详细参数说明参考文档。
CREATE TABLE kafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`behavior` STRING,
`topic` STRING METADATA VIRTUAL,
`partition` BIGINT METADATA VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'my_excellent_topic',
'properties.bootstrap.servers' = 'mykafka:9092',
'properties.group.id' = 'my_excellent_group'
'format' = 'csv',
'scan.startup.mode' = 'earliest-offset'
)
第二步:flink 实时业务清洗,根据业务逻辑实现。以下基于flinkSQL 滑动窗口函数进行示例。更多函数使用方式参考文档。
CREATE TEMPORARY TABLE user_clicks (
username VARCHAR,
click_url VARCHAR,
eventtime VARCHAR,
ts AS TO_TIMESTAMP(eventtime),
WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --为Rowtime定义Watermark。
) with (
'connector'='sls',
...
);
CREATE TEMPORARY TABLE hop_output (
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) with (
'connector'='datahub' --目前SLS只支持输出VARCHAR类型的DDL,所以使用DataHub存储。
...
);
INSERT INTO
hop_output
SELECT
HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),
HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),
username,
COUNT (click_url)
FROM
user_clicks
GROUP BY
HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),username;
第三步:flink 实时写入到云数仓ClickHouse,通过flink SQL进行数据写入。实例代码如下:
--创建源表sls_test_single_local。
CREATE TEMPORARY TABLE sls_test_single_local (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50'
);
--创建结果表clickhouse_output。
CREATE TEMPORARY TABLE clickhouse_output (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = 'jdbc:clickhouse://demo.aliyuncs.com:8123',
'userName' = 'test',
'password' = '280226Ck',
'tableName' = 'sls_test_single_local',
);
--将源表数据插入到结果表。
INSERT INTO clickhouse_output
SELECT
id,
name,
age,
rate
FROM sls_test_single_local;
对于用户行为分析需要结合在线业务数据如实时订单类数据进行分析的情况,云数据库ClickHouse 推出了升级版 MaterializeMySQL数据库引擎,实现了基于MySQL Binlog机制的业务数据库实时同步功能,相比较社区支持表级别的同步。
创建 MaterializedMySQL 引擎库,语法和实例代码如下,参数说明参考文档。
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializeMySQL('host:port', 'database', 'user', 'password')
[SETTINGS...]
where SETTINGS are:
[ { include_tables | exclude_tables } ]
[ skip_error_count ]
[ skip_unsupported_tables ]
[ query_with_final]
[ order_by_only_primary_key ]
[ enable_binlog_reserved ]
[ shard_model ]
CREATE DATABASE IF NOT EXISTS db_name ON CLUSTER cluster
ENGINE = MaterializeMySQL('rm-bp1**w639t19****.mysql.rds.aliyuncs.com:3306', 'database', 'user', 'password')
SETTINGS
include_tables ='a,b,c',
skip_error_count =0,
skip_unsupported_tables =1,
query_with_final =1,
order_by_only_primary_key =1;
enable_binlog_reserved=1;
shard_model=0;
5. 行为分析业务实践
在系统构建完成后,基于数仓模型内构建的数据表,业务分析人员就可以基于ClickHouse 实时数仓进行用户行为的分析。ClickHouse 内置强大的行为分析函数,支持复杂分析场景的简单开发模式进行数据分析,以下从常用的用户行为分析场景进行demo 示例。
5.1 漏斗转化率分析
通过windowFunnel函数实现漏斗,以指定时长(单位为秒)滑动窗口按序匹配事件链,并返回在窗口内转化到的步数。如有多种匹配,以步数最大(转换最深)的为准。通过对该步数进行统计,示例代码如下:
select site_id, sequenceMatch('(?1)(?t<=15)(?2).*(?3)')(ts_date_time,event_type = 'shtLogon',event_type = 'shtKkclick' AND column_type = 'homePage',event_type = 'shtAddCart') AS is_match FROM ods.analytics_access_log_all WHERE ts_date >= '2020-07-01‘ AND site_id IN (10266,10022,10339,10030)GROUP BY site_id
5.2 留存率分析
基于Retention函数,该函数可以方便地计算留存情况。
该函数接受多个条件,以第一个条件的结果为基准,观察后面的各个条件是否也满足,若满足则置1,不满足则置0,最终返回0和1的数组。通过统计1的数量,统计留存。示例代码如下:
WITH toDate('2022-06-24') AS first_date
select user_id, retention(ts_date = first_date,ts_date = first_date + INTERVAL 1 DAY,ts_date = first_date + INTERVAL 7 DAY) AS retFROM ods.ms_order_done_allWHERE ts_date >= first_date AND ts_date <= first_date + INTERVAL 7 DAYGROUP BY user_id
5.3 用户画像分析
基于标签的统计符合各个标签维度的用户数,基于用户数进行用户画像分析。
select tag, countDistinct(uid) as user_num from user_log_all where log_date='2022-02-16' and action_id in (2, 3, 1102) and log_timestamp < toUnixTimestamp('2022-02-16 23:57:15')*1000 group by tag ;
6. 方案总结
构建用户行为分析系统是支撑互联网/游戏行业进行数据化运营和精准营销的基础。 大数据体系+ 实时数据处理+ 云数据库 ClickHouse 提供了一套完整的系统构建解决方案。方案依赖云ClickHouse 作为核心数仓,凭借其在写入吞吐,查询性能,数据链路打和运维部署简单性的方面优势,相比较其他的数仓引擎选型能更好的解决分析效率和成本的问题。