离线数仓(三)【业务日志采集平台搭建】(2)

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS Agent(兼容OpenClaw),2核4GB
简介: 离线数仓(三)【业务日志采集平台搭建】

离线数仓(三)【业务日志采集平台搭建】(1)https://developer.aliyun.com/article/1532363

2.1.23 退单表(order_refund_info

字段名

字段说明

id

编号

user_id

用户id

order_id

订单id

sku_id

skuid

refund_type

退款类型

refund_num

退货件数

refund_amount

退款金额

refund_reason_type

原因类型

refund_reason_txt

原因内容

refund_status

退款状态(0:待审批 1:已退款)

create_time

创建时间

2.1.24 订单状态流水表(order_status_log

维护每一个订单的状态信息

字段名

字段说明

id

编号

order_id

订单编号

order_status

订单状态

operate_time

操作时间

2.1.25 支付表(payment_info

字段名

字段说明

id

编号

out_trade_no

对外业务编号

order_id

订单编号

user_id

用户id

payment_type

支付类型(微信 支付宝)

trade_no

交易编号

total_amount

支付金额

subject

交易内容

payment_status

支付状态

create_time

创建时间

callback_time

回调时间

callback_content

回调信息

2.1.26 退款表(refund_payment

字段名

字段说明

id

编号

out_trade_no

对外业务编号

order_id

订单编号

sku_id

商品sku_id

payment_type

支付类型(微信 支付宝)

trade_no

交易编号

total_amount

退款金额

subject

交易内容

refund_status

退款状态

create_time

创建时间

callback_time

回调时间

callback_content

回调信息

2.1.27 SKU平台属性表(sku_attr_value

字段名

字段说明

id

编号

attr_id

属性id(冗余)

value_id

属性值id

sku_id

skuid

attr_name

属性名称

value_name

属性值名称

2.1.28 SKU信息表(sku_info

字段名

字段说明

id

库存id(itemID)

spu_id

商品id

price

价格

sku_name

sku名称

sku_desc

商品规格描述

weight

重量

tm_id

品牌(冗余)

category3_id

三级分类id(冗余)

sku_default_img

默认显示图片(冗余)

is_sale

是否销售(1:是 0:否)

create_time

创建时间

2.1.29 SKU销售属性表(sku_sale_attr_value

字段名

字段说明

id

id

sku_id

库存单元id

spu_id

spu_id冗余

sale_attr_value_id

销售属性值id

sale_attr_id

销售属性id

sale_attr_name

销售属性值名称

sale_attr_value_name

销售属性值名称

2.1.30 SPU信息表(spu_info

字段名

字段说明

id

商品id

spu_name

商品名称

description

商品描述(后台简述)

category3_id

三级分类id

tm_id

品牌id

2.1.31 SPU销售属性表(spu_sale_attr

字段名

字段说明

id

编号(业务中无关联)

spu_id

商品id

base_sale_attr_id

销售属性id

sale_attr_name

销售属性名称(冗余)

2.1.32 SPU销售属性值表(spu_sale_attr_value

字段名

字段说明

id

销售属性值编号

spu_id

商品id

base_sale_attr_id

销售属性id

sale_attr_value_name

销售属性值名称

sale_attr_name

销售属性名称(冗余)

2.1.33 用户地址表(user_address

字段名

字段说明

id

编号

user_id

用户id

province_id

省份id

user_address

用户地址

consignee

收件人

phone_num

联系方式

is_default

是否是默认

2.1.34 用户信息表(user_info

字段名

字段说明

id

编号

login_name

用户名称

nick_name

用户昵称

passwd

用户密码

name

用户姓名

phone_num

手机号

email

邮箱

head_img

头像

user_level

用户级别

birthday

用户生日

gender

性别 M男,F女

create_time

创建时间

operate_time

修改时间

status

状态

电商业务表结构:

后台管理表结构:

3、业务数据采集模块

       我们的项目架构图中,业务数据是通过两方面采集到离线数仓中的:需要增量同步的数据通过 Maxwell(可以实时捕获和传输MySQL数据库的变更操作,适合增量同步) 发送到 Kafka 集群,然后再通过 flume 发送到离线数仓当中;而 DataX 是每天把全量的数据发送到集群当中的。

       而实时数仓中的业务数据源也是由 Maxwell 发送到 Kafka 集群,然后 Flink 直接从 Kafka 读取的。

3.1、采集通道 Maxwell 配置

开启 mysql binlog

在 /etc/my.cnf 中添加以下配置:

server-id=1
log-bin=mysql-bin
# maxwell要求binlog模式必须为 row
binlog_format=row
binlog-do-db=gmall
修改 maxwell 配置文件

我们这里通过配置文件来启动 maxwell:

# tl;dr config
log_level=info
 
#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
#目标Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table},相当于给每张表创
建一个 topic
kafka_topic=maxwell
# kafka 分区规则 按照数据库分区
#producer_partition_by=database
 
# mysql 相关配置
host=hadoop102
user=maxwell
password=123456

这里的 kafka_topic 测试完后需要改成我们项目指定的 topic_db

bin/maxwell --config config.properties
启动 Kafka 消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell
模拟数据生成
cd /opt/module/db_log/
java -jar gmall2020-mock-db-2021-11-14.jar
查看结果:

我们可以看到,我们在 hadoop103 上开启的 Kafka 消费者的控制台,输出了抓取到所有日志:

而且此刻的 mysql 的 binlog 文件的大小也在急剧增加:

到这里,我们的 maxwell 的业务数据采集通道已经打通了。

编写 maxwell 启停脚本
#!/bin/bash
 
MAXWELL_HOME=/opt/module/maxwell-1.29.2
 
status_maxwell(){
    result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
    return $result
}
 
 
start_maxwell(){
    status_maxwell
    if [[ $? -lt 1 ]]; then
        echo "启动Maxwell"
        $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
    else
        echo "Maxwell正在运行"
    fi
}
 
 
stop_maxwell(){
    status_maxwell
    if [[ $? -gt 0 ]]; then
        echo "停止Maxwell"
        ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
    else
        echo "Maxwell未在运行"
    fi
}
 
 
case $1 in
    start )
        start_maxwell
    ;;
    stop )
        stop_maxwell
    ;;
    restart )
       stop_maxwell
       start_maxwell
    ;;
esac

总结

至此,我们的日志数据采集(用户行为日志和业务日志)平台都已经搭建完毕了,我们的日志数据都已经汇总到了 Kafka,接下来我们的离线数仓和实时数仓只需要直接去 Kafka 读取即可。


相关实践学习
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
12月前
|
监控 Kubernetes Go
日志采集效能跃迁:iLogtail 到 LoongCollector 的全面升级
LoongCollector 在日志场景中实现了全面的重磅升级,从功能、性能、稳定性等各个方面均进行了深度优化和提升,本文我们将对 LoongCollector 的升级进行详细介绍。
803 88
|
10月前
|
数据采集 存储 大数据
大数据之路:阿里巴巴大数据实践——日志采集与数据同步
本资料全面介绍大数据处理技术架构,涵盖数据采集、同步、计算与服务全流程。内容包括Web/App端日志采集方案、数据同步工具DataX与TimeTunnel、离线与实时数仓架构、OneData方法论及元数据管理等核心内容,适用于构建企业级数据平台体系。
923 1
|
存储 运维 开发工具
警惕日志采集失败的 6 大经典雷区:从本地管理反模式到 LoongCollector 标准实践
本文探讨了日志管理中的常见反模式及其潜在问题,强调科学的日志管理策略对系统可观测性的重要性。文中分析了6种反模式:copy truncate轮转导致的日志丢失或重复、NAS/OSS存储引发的采集不一致、多进程写入造成的日志混乱、创建文件空洞释放空间的风险、频繁覆盖写带来的数据完整性问题,以及使用vim编辑日志文件导致的重复采集。针对这些问题,文章提供了最佳实践建议,如使用create模式轮转日志、本地磁盘存储、单线程追加写入等方法,以降低日志采集风险,提升系统可靠性。最后总结指出,遵循这些实践可显著提高故障排查效率和系统性能。
1861 22
|
分布式计算 运维 监控
Dataphin离线数仓搭建深度测评:数据工程师的实战视角
作为一名金融行业数据工程师,我参与了阿里云Dataphin智能研发版的评测。通过《离线数仓搭建》实践,体验了其在数据治理中的核心能力。Dataphin在环境搭建、管道开发和任务管理上显著提效,如测试环境搭建从3天缩短至2小时,复杂表映射效率提升50%。产品支持全链路治理、智能提效和架构兼容,帮助企业降低40%建设成本,缩短60%需求响应周期。建议加强行业模板库和移动适配功能,进一步提升使用体验。
|
11月前
|
存储 运维 开发工具
警惕日志采集失败的 6 大经典雷区:从本地管理反模式到 LoongCollector 标准实践
本文总结了日志管理中的六大反模式及优化建议,涵盖日志轮转、存储选择、并发写入等常见问题,帮助提升日志采集的完整性与系统可观测性,适用于运维及开发人员优化日志管理策略。
370 8
|
12月前
|
存储 缓存 Apache
StarRocks+Paimon 落地阿里日志采集:万亿级实时数据秒级查询
本文介绍了阿里集团A+流量分析平台的日志查询优化方案,针对万亿级日志数据的写入与查询挑战,提出基于Flink、Paimon和StarRocks的技术架构。通过Paimon存储日志数据,结合StarRocks高效计算能力,实现秒级查询性能。具体包括分桶表设计、数据缓存优化及文件大小控制等措施,解决高并发、大数据量下的查询效率问题。最终,日志查询耗时从分钟级降至秒级,显著提升业务响应速度,并为未来更低存储成本、更高性能及更多业务场景覆盖奠定基础。
|
7月前
|
数据采集 缓存 大数据
【赵渝强老师】大数据日志采集引擎Flume
Apache Flume 是一个分布式、可靠的数据采集系统,支持从多种数据源收集日志信息,并传输至指定目的地。其核心架构由Source、Channel、Sink三组件构成,通过Event封装数据,保障高效与可靠传输。
405 1
|
8月前
|
存储 Kubernetes 监控
Kubernetes日志管理:使用Loki进行日志采集
通过以上步骤,在Kubernetes环境下利用LoKi进行有效率且易于管理地logs采集变成可能。此外,在实施过程中需要注意版本兼容性问题,并跟进社区最新动态以获取功能更新或安全补丁信息。
502 16
|
9月前
|
存储 缓存 Apache
StarRocks+Paimon 落地阿里日志采集:万亿级实时数据秒级查询
A+流量分析平台是阿里集团统一的全域流量数据分析平台,致力于通过埋点、采集、计算构建流量数据闭环,助力业务提升流量转化。面对万亿级日志数据带来的写入与查询挑战,平台采用Flink+Paimon+StarRocks技术方案,实现高吞吐写入与秒级查询,优化存储成本与扩展性,提升日志分析效率。
1200 1
|
监控 算法 测试技术
突破极限: 高负载场景下的单机300M多行正则日志采集不是梦
在当今数字化时代,日志数据已成为企业 IT 运营和业务分析的关键资源。然而,随着业务规模的扩大和系统复杂度的提升,日志数据的体量呈现爆发式增长,给日志采集和处理系统带来了巨大挑战。
711 101