离线数仓(三)【业务日志采集平台搭建】(1)https://developer.aliyun.com/article/1532363
2.1.23 退单表(order_refund_info)
字段名 |
字段说明 |
id |
编号 |
user_id |
用户id |
order_id |
订单id |
sku_id |
skuid |
refund_type |
退款类型 |
refund_num |
退货件数 |
refund_amount |
退款金额 |
refund_reason_type |
原因类型 |
refund_reason_txt |
原因内容 |
refund_status |
退款状态(0:待审批 1:已退款) |
create_time |
创建时间 |
2.1.24 订单状态流水表(order_status_log)
维护每一个订单的状态信息
字段名 |
字段说明 |
id |
编号 |
order_id |
订单编号 |
order_status |
订单状态 |
operate_time |
操作时间 |
2.1.25 支付表(payment_info)
字段名 |
字段说明 |
id |
编号 |
out_trade_no |
对外业务编号 |
order_id |
订单编号 |
user_id |
用户id |
payment_type |
支付类型(微信 支付宝) |
trade_no |
交易编号 |
total_amount |
支付金额 |
subject |
交易内容 |
payment_status |
支付状态 |
create_time |
创建时间 |
callback_time |
回调时间 |
callback_content |
回调信息 |
2.1.26 退款表(refund_payment)
字段名 |
字段说明 |
id |
编号 |
out_trade_no |
对外业务编号 |
order_id |
订单编号 |
sku_id |
商品sku_id |
payment_type |
支付类型(微信 支付宝) |
trade_no |
交易编号 |
total_amount |
退款金额 |
subject |
交易内容 |
refund_status |
退款状态 |
create_time |
创建时间 |
callback_time |
回调时间 |
callback_content |
回调信息 |
2.1.27 SKU平台属性表(sku_attr_value)
字段名 |
字段说明 |
id |
编号 |
attr_id |
属性id(冗余) |
value_id |
属性值id |
sku_id |
skuid |
attr_name |
属性名称 |
value_name |
属性值名称 |
2.1.28 SKU信息表(sku_info)
字段名 |
字段说明 |
id |
库存id(itemID) |
spu_id |
商品id |
price |
价格 |
sku_name |
sku名称 |
sku_desc |
商品规格描述 |
weight |
重量 |
tm_id |
品牌(冗余) |
category3_id |
三级分类id(冗余) |
sku_default_img |
默认显示图片(冗余) |
is_sale |
是否销售(1:是 0:否) |
create_time |
创建时间 |
2.1.29 SKU销售属性表(sku_sale_attr_value)
字段名 |
字段说明 |
id |
id |
sku_id |
库存单元id |
spu_id |
spu_id(冗余) |
sale_attr_value_id |
销售属性值id |
sale_attr_id |
销售属性id |
sale_attr_name |
销售属性值名称 |
sale_attr_value_name |
销售属性值名称 |
2.1.30 SPU信息表(spu_info)
字段名 |
字段说明 |
id |
商品id |
spu_name |
商品名称 |
description |
商品描述(后台简述) |
category3_id |
三级分类id |
tm_id |
品牌id |
2.1.31 SPU销售属性表(spu_sale_attr)
字段名 |
字段说明 |
id |
编号(业务中无关联) |
spu_id |
商品id |
base_sale_attr_id |
销售属性id |
sale_attr_name |
销售属性名称(冗余) |
2.1.32 SPU销售属性值表(spu_sale_attr_value)
字段名 |
字段说明 |
id |
销售属性值编号 |
spu_id |
商品id |
base_sale_attr_id |
销售属性id |
sale_attr_value_name |
销售属性值名称 |
sale_attr_name |
销售属性名称(冗余) |
2.1.33 用户地址表(user_address)
字段名 |
字段说明 |
id |
编号 |
user_id |
用户id |
province_id |
省份id |
user_address |
用户地址 |
consignee |
收件人 |
phone_num |
联系方式 |
is_default |
是否是默认 |
2.1.34 用户信息表(user_info)
字段名 |
字段说明 |
id |
编号 |
login_name |
用户名称 |
nick_name |
用户昵称 |
passwd |
用户密码 |
name |
用户姓名 |
phone_num |
手机号 |
邮箱 |
|
head_img |
头像 |
user_level |
用户级别 |
birthday |
用户生日 |
gender |
性别 M男,F女 |
create_time |
创建时间 |
operate_time |
修改时间 |
status |
状态 |
电商业务表结构:
后台管理表结构:
3、业务数据采集模块
我们的项目架构图中,业务数据是通过两方面采集到离线数仓中的:需要增量同步的数据通过 Maxwell(可以实时捕获和传输MySQL数据库的变更操作,适合增量同步) 发送到 Kafka 集群,然后再通过 flume 发送到离线数仓当中;而 DataX 是每天把全量的数据发送到集群当中的。
而实时数仓中的业务数据源也是由 Maxwell 发送到 Kafka 集群,然后 Flink 直接从 Kafka 读取的。
3.1、采集通道 Maxwell 配置
开启 mysql binlog
在 /etc/my.cnf 中添加以下配置:
server-id=1 log-bin=mysql-bin # maxwell要求binlog模式必须为 row binlog_format=row binlog-do-db=gmall
修改 maxwell 配置文件
我们这里通过配置文件来启动 maxwell:
# tl;dr config log_level=info #Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis producer=kafka #目标Kafka集群地址 kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 #目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table},相当于给每张表创 建一个 topic kafka_topic=maxwell # kafka 分区规则 按照数据库分区 #producer_partition_by=database # mysql 相关配置 host=hadoop102 user=maxwell password=123456
这里的 kafka_topic 测试完后需要改成我们项目指定的 topic_db
bin/maxwell --config config.properties
启动 Kafka 消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell
模拟数据生成
cd /opt/module/db_log/ java -jar gmall2020-mock-db-2021-11-14.jar
查看结果:
我们可以看到,我们在 hadoop103 上开启的 Kafka 消费者的控制台,输出了抓取到所有日志:
而且此刻的 mysql 的 binlog 文件的大小也在急剧增加:
到这里,我们的 maxwell 的业务数据采集通道已经打通了。
编写 maxwell 启停脚本
#!/bin/bash MAXWELL_HOME=/opt/module/maxwell-1.29.2 status_maxwell(){ result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l` return $result } start_maxwell(){ status_maxwell if [[ $? -lt 1 ]]; then echo "启动Maxwell" $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon else echo "Maxwell正在运行" fi } stop_maxwell(){ status_maxwell if [[ $? -gt 0 ]]; then echo "停止Maxwell" ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9 else echo "Maxwell未在运行" fi } case $1 in start ) start_maxwell ;; stop ) stop_maxwell ;; restart ) stop_maxwell start_maxwell ;; esac
总结
至此,我们的日志数据采集(用户行为日志和业务日志)平台都已经搭建完毕了,我们的日志数据都已经汇总到了 Kafka,接下来我们的离线数仓和实时数仓只需要直接去 Kafka 读取即可。