离线数仓(三)【业务日志采集平台搭建】(2)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
日志服务 SLS,月写入数据量 50GB 1个月
简介: 离线数仓(三)【业务日志采集平台搭建】

离线数仓(三)【业务日志采集平台搭建】(1)https://developer.aliyun.com/article/1532363

2.1.23 退单表(order_refund_info

字段名

字段说明

id

编号

user_id

用户id

order_id

订单id

sku_id

skuid

refund_type

退款类型

refund_num

退货件数

refund_amount

退款金额

refund_reason_type

原因类型

refund_reason_txt

原因内容

refund_status

退款状态(0:待审批 1:已退款)

create_time

创建时间

2.1.24 订单状态流水表(order_status_log

维护每一个订单的状态信息

字段名

字段说明

id

编号

order_id

订单编号

order_status

订单状态

operate_time

操作时间

2.1.25 支付表(payment_info

字段名

字段说明

id

编号

out_trade_no

对外业务编号

order_id

订单编号

user_id

用户id

payment_type

支付类型(微信 支付宝)

trade_no

交易编号

total_amount

支付金额

subject

交易内容

payment_status

支付状态

create_time

创建时间

callback_time

回调时间

callback_content

回调信息

2.1.26 退款表(refund_payment

字段名

字段说明

id

编号

out_trade_no

对外业务编号

order_id

订单编号

sku_id

商品sku_id

payment_type

支付类型(微信 支付宝)

trade_no

交易编号

total_amount

退款金额

subject

交易内容

refund_status

退款状态

create_time

创建时间

callback_time

回调时间

callback_content

回调信息

2.1.27 SKU平台属性表(sku_attr_value

字段名

字段说明

id

编号

attr_id

属性id(冗余)

value_id

属性值id

sku_id

skuid

attr_name

属性名称

value_name

属性值名称

2.1.28 SKU信息表(sku_info

字段名

字段说明

id

库存id(itemID)

spu_id

商品id

price

价格

sku_name

sku名称

sku_desc

商品规格描述

weight

重量

tm_id

品牌(冗余)

category3_id

三级分类id(冗余)

sku_default_img

默认显示图片(冗余)

is_sale

是否销售(1:是 0:否)

create_time

创建时间

2.1.29 SKU销售属性表(sku_sale_attr_value

字段名

字段说明

id

id

sku_id

库存单元id

spu_id

spu_id冗余

sale_attr_value_id

销售属性值id

sale_attr_id

销售属性id

sale_attr_name

销售属性值名称

sale_attr_value_name

销售属性值名称

2.1.30 SPU信息表(spu_info

字段名

字段说明

id

商品id

spu_name

商品名称

description

商品描述(后台简述)

category3_id

三级分类id

tm_id

品牌id

2.1.31 SPU销售属性表(spu_sale_attr

字段名

字段说明

id

编号(业务中无关联)

spu_id

商品id

base_sale_attr_id

销售属性id

sale_attr_name

销售属性名称(冗余)

2.1.32 SPU销售属性值表(spu_sale_attr_value

字段名

字段说明

id

销售属性值编号

spu_id

商品id

base_sale_attr_id

销售属性id

sale_attr_value_name

销售属性值名称

sale_attr_name

销售属性名称(冗余)

2.1.33 用户地址表(user_address

字段名

字段说明

id

编号

user_id

用户id

province_id

省份id

user_address

用户地址

consignee

收件人

phone_num

联系方式

is_default

是否是默认

2.1.34 用户信息表(user_info

字段名

字段说明

id

编号

login_name

用户名称

nick_name

用户昵称

passwd

用户密码

name

用户姓名

phone_num

手机号

email

邮箱

head_img

头像

user_level

用户级别

birthday

用户生日

gender

性别 M男,F女

create_time

创建时间

operate_time

修改时间

status

状态

电商业务表结构:

后台管理表结构:

3、业务数据采集模块

       我们的项目架构图中,业务数据是通过两方面采集到离线数仓中的:需要增量同步的数据通过 Maxwell(可以实时捕获和传输MySQL数据库的变更操作,适合增量同步) 发送到 Kafka 集群,然后再通过 flume 发送到离线数仓当中;而 DataX 是每天把全量的数据发送到集群当中的。

       而实时数仓中的业务数据源也是由 Maxwell 发送到 Kafka 集群,然后 Flink 直接从 Kafka 读取的。

3.1、采集通道 Maxwell 配置

开启 mysql binlog

在 /etc/my.cnf 中添加以下配置:

server-id=1
log-bin=mysql-bin
# maxwell要求binlog模式必须为 row
binlog_format=row
binlog-do-db=gmall
修改 maxwell 配置文件

我们这里通过配置文件来启动 maxwell:

# tl;dr config
log_level=info
 
#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
#目标Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table},相当于给每张表创
建一个 topic
kafka_topic=maxwell
# kafka 分区规则 按照数据库分区
#producer_partition_by=database
 
# mysql 相关配置
host=hadoop102
user=maxwell
password=123456

这里的 kafka_topic 测试完后需要改成我们项目指定的 topic_db

bin/maxwell --config config.properties
启动 Kafka 消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell
模拟数据生成
cd /opt/module/db_log/
java -jar gmall2020-mock-db-2021-11-14.jar
查看结果:

我们可以看到,我们在 hadoop103 上开启的 Kafka 消费者的控制台,输出了抓取到所有日志:

而且此刻的 mysql 的 binlog 文件的大小也在急剧增加:

到这里,我们的 maxwell 的业务数据采集通道已经打通了。

编写 maxwell 启停脚本
#!/bin/bash
 
MAXWELL_HOME=/opt/module/maxwell-1.29.2
 
status_maxwell(){
    result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
    return $result
}
 
 
start_maxwell(){
    status_maxwell
    if [[ $? -lt 1 ]]; then
        echo "启动Maxwell"
        $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
    else
        echo "Maxwell正在运行"
    fi
}
 
 
stop_maxwell(){
    status_maxwell
    if [[ $? -gt 0 ]]; then
        echo "停止Maxwell"
        ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
    else
        echo "Maxwell未在运行"
    fi
}
 
 
case $1 in
    start )
        start_maxwell
    ;;
    stop )
        stop_maxwell
    ;;
    restart )
       stop_maxwell
       start_maxwell
    ;;
esac

总结

至此,我们的日志数据采集(用户行为日志和业务日志)平台都已经搭建完毕了,我们的日志数据都已经汇总到了 Kafka,接下来我们的离线数仓和实时数仓只需要直接去 Kafka 读取即可。


相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
4天前
|
SQL 消息中间件 Kafka
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
本文介绍了阿里云实时数仓Hologres负责人姜伟华在Flink Forward Asia 2024上的分享,涵盖实时数仓的发展历程、从实时数仓到实时湖仓的演进,以及总结。文章通过三代实时数仓架构的演变,详细解析了Lambda架构、Kafka实时数仓分层+OLAP、Hologres实时数仓分层复用等方案,并探讨了未来从实时数仓到实时湖仓的演进方向。最后,结合实际案例和Demo展示了Hologres + Flink + Paimon在实时湖仓中的应用,帮助用户根据业务需求选择合适的方案。
329 20
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
|
3天前
|
SQL 运维 BI
湖仓分析|浙江霖梓基于 Doris + Paimon 打造实时/离线一体化湖仓架构
浙江霖梓早期基于 Apache Doris 进行整体架构与表结构的重构,并基于湖仓一体和查询加速展开深度探索与实践,打造了 Doris + Paimon 的实时/离线一体化湖仓架构,实现查询提速 30 倍、资源成本节省 67% 等显著成效。
湖仓分析|浙江霖梓基于 Doris + Paimon 打造实时/离线一体化湖仓架构
|
4月前
|
存储 运维 监控
超越传统模型:从零开始构建高效的日志分析平台——基于Elasticsearch的实战指南
【10月更文挑战第8天】随着互联网应用和微服务架构的普及,系统产生的日志数据量日益增长。有效地收集、存储、检索和分析这些日志对于监控系统健康状态、快速定位问题以及优化性能至关重要。Elasticsearch 作为一种分布式的搜索和分析引擎,以其强大的全文检索能力和实时数据分析能力成为日志处理的理想选择。
337 6
|
26天前
|
存储 运维 监控
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
|
1月前
|
SQL 存储 JSON
实时数仓 Hologres 产品介绍:一体化实时湖仓平台
本次方案的主题是实时数仓 Hologres 产品介绍:一体化实时湖仓平台,介绍了 Hologres 湖仓存储一体,多模式计算一体、分析服务一体和 Data+AI 一体四方面一体化场景,并对其运维监控方面及客户案例进行一定讲解。 1. Hologres :面向未来的一体化实时湖仓 2. 运维监控 3. 客户案例 4. 总结
74 14
|
5月前
|
存储 消息中间件 网络协议
日志平台-ELK实操系列(一)
日志平台-ELK实操系列(一)
|
2月前
|
SQL 人工智能 自然语言处理
DataWorks年度发布:智能化湖仓一体数据开发与治理平台的演进
阿里云在过去15年中持续为268集团提供数据服务,积累了丰富的实践经验,并连续三年在IDC中国数据治理市场份额中排名第一。新一代智能数据开发平台DateWorks推出了全新的DateStudio IDE,支持湖仓一体化开发,新增Flink计算引擎和全面适配locs,优化工作流程系统和数据目录管理。同时,阿里云正式推出个人开发环境模式和个人Notebook,提升开发者体验和效率。此外,DateWorks Copilot通过自然语言生成SQL、代码补全等功能,显著提升了数据开发与分析的效率,已累计帮助开发者生成超过3200万行代码。
|
2月前
|
监控 测试技术 开发者
一行代码改进:Logtail的多行日志采集性能提升7倍的奥秘
一个有趣的现象引起了作者的注意:当启用行首正则表达式处理多行日志时,采集性能出现下降。究竟是什么因素导致了这种现象?本文将探索Logtail多行日志采集性能提升的秘密。
170 23
|
3月前
|
SQL 存储 数据挖掘
快速入门:利用AnalyticDB构建实时数据分析平台
【10月更文挑战第22天】在大数据时代,实时数据分析成为了企业和开发者们关注的焦点。传统的数据仓库和分析工具往往无法满足实时性要求,而AnalyticDB(ADB)作为阿里巴巴推出的一款实时数据仓库服务,凭借其强大的实时处理能力和易用性,成为了众多企业的首选。作为一名数据分析师,我将在本文中分享如何快速入门AnalyticDB,帮助初学者在短时间内掌握使用AnalyticDB进行简单数据分析的能力。
88 2
|
4月前
|
DataWorks 数据挖掘 关系型数据库
基于hologres搭建轻量OLAP分析平台解决方案评测
一文带你详细了解基于hologres搭建轻量OLAP分析平台解决方案的优与劣
685 10