一、电商热门商品统计模块
(1)需求分析
如何定义热门商品?
简单模型:直接通过用户对商品的点击量来衡量商品热度。
复杂模型:依据各类别权重(后续补充)
如何获取区域?
通过用户点击日志,获取访问IP,进而获取区域信息。
通过数据库中的订单关联用户表,获取用户的地域信息
如何去除爬虫水军(商家为了提高自己的排名,用爬虫来频繁访问自己的店铺)?
一段时间分析用户IP的访问次数(后续补充)
(2)技术方案
数据采集(ETL)
电商日志一般存储在日志服务器,通过 Flume 拉取到 HDFS 上,本文通过编写python程序模拟日志数据。
业务数据通过 Sqoop 从关系型数据库mysql中读取数据,然后导入到HDFS。
因为要访问数据库,所以会对数据库造成很大的压力,而且在真实的生产环境中,一般没有权限直接访问数据库。可以把数据导出成csv文件,放到日志服务器上,再通过Flume采集到HDFS上。假如有权限访问数据库,数据库也需要设置成读写分离的模式,来缓解压力。
数据清洗
使用 MapReduce 进行数据清洗。
使用 Spark Core 进行数据清洗。
各区域热门商品计算
使用 Hive 进行数据的分析和处理。
使用 Spark SQL 进行数据的分析和处理
(3)实验数据及说明
product(商品)表:
补充说明: status: 下架-1,上架0,预售1
area_info(地区信息)表
补充说明: action_type: 1 收藏,2 加购物车,3 购买 area_id:已经通过IP地址,解析出了区域信息
area_hot_product(区域热门商品)表
(4)技术实现
使用Flume采集用户点击日志
Flume配置文件(flume-areahot.conf)
启动 Flume agent,在 Flume 的根目录下执行命令:bin/flume-ng agent -n a4 -f flume-areahot.conf -c conf -Dflume.root.logger=INFO,console
再执行python dslog.py向 /log0208 目录里放入用户日志文件(实现方法:此处
Flume 会将 /log0208 目录下的文件采集到 hdfs://master:9000/flume/ 当天日期 目录下。
2.利用python编写程序模拟日志信息,jian放入/log0208文件夹下,自定义添加不符合字段数据,要经过mr或spark进行数据清洗。
运行dslog.py程序如下:
#coding=utf-8 import random import time iplist=[26,23,47,56,108,10,33,48,66,77,101,45,61,52,88,89,108,191,65,177,98,21,34,61,19,11,112,114] url = "http://mystore.jsp/?productid={query}" x=[1,2,3,4] def use_id(): return random.randint(1,20) def get_ip(): return '.'.join(str(x) for x in random.sample(iplist,4)) def urllist(): def sample_references(): if random.uniform(0,1)>0.8: return "" query_str=random.sample(x,1) return url.format(query=query_str[0]) def get_time(): return time.strftime('%Y%m%d%H%M%S',time.localtime()) # action: 1 收藏,2 加购物车,3 购买 area_id代表不同区域 def action(): return random.randint(1,4) def area_id(): return random.randint(1,21) def get_log(count): while count>0: log='{},{},{},{},{},{}\n'.format(use_id(),get_ip(),urllist(),get_time(),action(),area_id()) # with open('/usr/local/src/tmp/1.log','a+')as file: with open('/log0208/click.log','a+')as file: file.write(log) # print(log) # time.sleep(1) count=count-1 if __name__ == '__main__': get_log(10000)
生成日志结果截取:
5,10.26.56.45,http://mystore.jsp/?productid=1,20210222005139,1,19 2,10.101.98.47,http://mystore.jsp/?productid=1,20210222005139,3,8 17,191.88.66.108,http://mystore.jsp/?productid=3,20210222005139,2,14 4,89.21.33.108,,20210222005139,2,10 4,108.23.48.114,http://mystore.jsp/?productid=4,20210222005139,1,21 8,21.48.19.65,,20210222005139,1,3 16,61.21.89.11,http://mystore.jsp/?productid=2,20210222005139,3,11 6,56.47.112.88,,20210222005139,1,3
flume-areahot.conf配置文件如下:
#bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console #定义agent名, source、channel、sink的名称 a4.sources = r1 a4.channels = c1 a4.sinks = k1 #具体定义source a4.sources.r1.type = spooldir a4.sources.r1.spoolDir = /log0208 #具体定义channel a4.channels.c1.type = memory a4.channels.c1.capacity = 10000 a4.channels.c1.transactionCapacity = 100 #定义拦截器,为消息添加时间戳 a4.sources.r1.interceptors = i1 a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder #具体定义sink a4.sinks.k1.type = hdfs a4.sinks.k1.hdfs.path = hdfs://master:9000/flume/%Y%m%d a4.sinks.k1.hdfs.filePrefix = events- a4.sinks.k1.hdfs.fileType = DataStream #不按照条数生成文件 a4.sinks.k1.hdfs.rollCount = 0 #HDFS上的文件达到128M时生成一个文件 a4.sinks.k1.hdfs.rollSize = 134217728 #HDFS上的文件达到60秒生成一个文件 a4.sinks.k1.hdfs.rollInterval = 60 #组装source、channel、sink a4.sources.r1.channels = c1 a4.sinks.k1.channel = c1
3.数据清洗
需要将用户点击日志里面对于商品的点击识别出来
过滤不满足6个字段的数据
过滤URL为空的数据,即:过滤出包含http开头的日志记录
方式一:使用 MapReduce 程序进行数据的清洗
1. CleanDataMain.java及CleanDataMapper.java代码实现:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CleanDataMain { public static void main(String[] args) throws Exception { //1、创建Job Job job = Job.getInstance(new Configuration()); job.setJarByClass(CleanDataMain.class); //2、指定任务的Mapper和输出的类型 job.setMapperClass(CleanDataMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //4、任务的输入和输出 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //5、执行 job.waitForCompletion(true); } } import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /* 过滤不满足6个字段的数据 过滤URL为空的数据,即:过滤出包含http开头的日志记录 */ public class CleanDataMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException { String log = value1.toString(); //分词 String[] words = log.split(","); if(words.length == 6 && words[2].startsWith("http")){ context.write(value1, NullWritable.get()); } } }
2.利用maven clean、maven install打成 jar 包,提交到 yarn 上运行:运行脚本run.sh,输入数据为Flume采集到的路径
HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop" OUTPUT_PATH="/output/210219" $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH hadoop jar /ds/MyMapReduceProject-0.0.1-SNAPSHOT.jar mapreduce.clean/CleanDataMain /flume/20210219/events-.1613712374044 /output/210219
3.过滤后结果查看:
[root@master ds]# hadoop fs -cat /output/210219/part-r-00000 1,201.105.101.102,http://mystore.jsp/?productid=1,2017020020,1,1 1,201.105.101.102,http://mystore.jsp/?productid=1,2017020029,2,1 1,201.105.101.102,http://mystore.jsp/?productid=4,2017020021,3,1 2,201.105.101.103,http://mystore.jsp/?productid=2,2017020022,1,1 3,201.105.101.105,http://mystore.jsp/?productid=3,2017020023,1,2 4,201.105.101.107,http://mystore.jsp/?productid=1,2017020025,1,1
方式二:使用 Spark 程序进行数据的清洗
1.cleanData代码实现:
import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.SparkConf import org.apache.spark.SparkContext object CleanData { def main(args: Array[String]): Unit = { // 为了避免执行过程中打印过多的日志 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) val conf = new SparkConf().setAppName("CleanData") val sc = new SparkContext(conf) // 读取数据 val fileRDD = sc.textFile(args(0)) // 清洗数据 val cleanDataRDD = fileRDD.map(_.split(",")).filter(_(2).startsWith("http")).filter(_.length == 6) // 将清洗后的结果保存到HDFS cleanDataRDD.saveAsTextFile(args(1)) // 停止SparkContext sc.stop() println("Finished") } }
2.同上打成 jar 包,提交到 spark 上运行:
bin/spark-submit / --class clean.CleanData / --master spark://master:7077 / /ds/people-0.0.1-SNAPSHOT.jar / hdfs://master:9000/flume/210219/events-.1613712374044 / hdfs://master:9000/testOutput/ (5)各区域热门商品热度统计:基于 Hive 和 Spark SQL 方式一:使用 Hive 进行统计 # 创建地区表: create external table area (area_id string,area_name string) row format delimited fields terminated by ',' location '/input/hotproject/area'; # 创建商品表 create external table product (product_id string,product_name string, marque string,barcode string, price double, brand_id string,market_price double,stock int,status int) row format delimited fields terminated by ',' location '/input/hotproject/product'; # 创建一个临时表,用于保存用户点击的初始日志 create external table clicklogTemp (user_id string,user_ip string,url string,click_time string,action_type string,area_id string) row format delimited fields terminated by ',' location '/input/hotproject/cleandata'; # 创建用户点击日志表(注意:需要从上面的临时表中解析出product_id) create external table clicklog (user_id string,user_ip string,product_id string,click_time string,action_type string,area_id string) row format delimited fields terminated by ',' location '/input/hotproject/clicklog'; #导入数据,业务一般用sqoop从mysql数据库导入到HDFS load data inpath "/input/data/areainfo.txt" into table area; load data inpath "/input/data/productinfo.txt" into table product; #日志通过flume采集到HDFS load data inpath "/output/210220/part-r-00000" into table clicklogTemp; insert into table clicklog select user_id,user_ip,substring(url,instr(url,"=")+1), click_time,action_type,area_id from clicklogTemp; ## 查询各地区商品热度 select a.area_id,b.area_name,a.product_id,c.product_name,count(a.product_id) pv from clicklog a join area b on a.area_id = b.area_id join product c on a.product_id = c.product_id group by a.area_id,b.area_name,a.product_id,c.product_name; 注意:在上面的例子中,我们建立一张临时表,然后从临时表中解析出productid 也可以直接使用hive的函数:parse_url进行解析,如下: parse_url(a.url,'QUERY','productid') # 这样就可以不用创建临时表来保存中间状态的结果,修改后的Hive SQL如下: select a.area_id,b.area_name,parse_url(a.url,'QUERY','productid'), c.product_name,count(parse_url(a.url,'QUERY','productid')) from clicklogtemp a join area b on a.area_id = b.area_id join product c on parse_url(a.url,'QUERY','productid') = c.product_id group by a.area_id,b.area_name,parse_url(a.url,'QUERY','productid'),c.product_name;
输出结果,最后一列为PV
a.area_id b.area_name a.product_id c.product_name pv 1 beijing 2 nike shoes1 2 1 beijing 3 nike shoes2 1 1 beijing 4 nike shoes4 1 10 heilongjiang 2 nike shoes1 3 11 tianjin 2 nike shoes1 1 11 tianjin 3 nike shoes2 1 11 tianjin 4 nike shoes4 2
上述语句可以通过insert into 导入另一个新表,将hive分析结果插入另一个表,通过sqoop导入mysql关系数据库,最终实现电商可视化可视化页面展示。
insert into table result select a.area_id,b.area_name,parse_url(a.url,'QUERY','productid'),c.product_name,count(parse_url(a.url,'QUERY','productid')) from clicklogtemp a join area b on a.area_id = b.area_id join product c on parse_url(a.url,'QUERY','productid') = c.product_id group by a.area_id,b.area_name,parse_url(a.url,'QUERY','productid'),c.product_name;
注:导入数据把握原则:能不导入数据,就不要导入数据(外部表),输出结果由于日志不同结果不同。
方式二:使用 Spark SQL 进行统计 1.Hotproduct.scala代码实现 package com.hot import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.sql.SparkSession //地区表 case class AreaInfo(area_id:String,area_name:String) //商品表 用不到的数据,不要导入 case class ProductInfo(product_id:String,product_name:String,marque:String,barcode:String,price:Double,brand_id:String,market_price:Double,stock:Int,status:Int) //经过清洗后的,用户点击日志信息 case class LogInfo(user_id:String,user_ip:String,product_id:String,click_time:String,action_type:String,area_id:String) object HotProduct { def main(args:Array[String]):Unit={ // 避免打印过多的日志 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) val spark=SparkSession.builder().master("local").appName("").getOrCreate() // val spark=SparkSession.builder().appName("").getOrCreate() import spark.sqlContext.implicits._ //获取地区数据 val areaDF = spark.sparkContext.textFile("hdfs://master:9000/input/data/areainfo1.txt") .map(_.split(",")).map(x=> AreaInfo(x(0),x(1))).toDF() areaDF.createTempView("area") //获取商品数据 val productDF = spark.sparkContext.textFile("hdfs://master:9000/input/data/productinfo.txt") .map(_.split(",")).map(x=> ProductInfo(x(0),x(1),x(2),x(3),x(4).toDouble,x(5),x(6).toDouble,x(7).toInt,x(8).toInt)) .toDF() productDF.createTempView("product") //获取点击日志 val clickLogDF = spark.sparkContext.textFile("hdfs://master:9000/output/210220/part-r-00000") .map(_.split(",")).map(x => LogInfo(x(0),x(1),x(2).substring(x(2).indexOf("=")+1),x(3),x(4),x(5))) .toDF() clickLogDF.createTempView("clicklog") //执行SQL // 通过SparkSQL分析各区域商品的热度,结果输出到屏幕 val sql = "select a.area_id,a.area_name,p.product_id,product_name,count(c.product_id) from area a,product p,clicklog c where a.area_id=c.area_id and p.product_id=c.product_id group by a.area_id,a.area_name,p.product_id,p.product_name" spark.sql(sql).show() // var sql1 = " select concat(a.area_id,',',a.area_name,',',p.product_id,',',p.product_name,',',count(c.product_id)) " // sql1 = sql1 + " from area a,product p,clicklog c " // sql1 = sql1 + " where a.area_id=c.area_id and p.product_id=c.product_id " // sql1 = sql1 + " group by a.area_id,a.area_name,p.product_id,product_name " // spark.sql(sql1).repartition(1).write.text(args(3)) spark.stop() } }
2.Maven打包提交到spark集群上运行:
spark-submit --class com.hot.HotProduct --master spark://master:7077 hotspark-1.0-SNAPSHOT.jar #hdfs://master:9000/input/hotproject/area/areainfo.txt \ #hdfs://master:9000/input/hotproject/product/productinfo.txt \ #hdfs://master:9000/output/210219/part-r-00000 hdfs://master:9000/output/analysis +-------+---------+----------+------------+-----------------+ |area_id|area_name|product_id|product_name|count(product_id)| +-------+---------+----------+------------+-----------------+ | 7| hubei| 3| nike shoes2| 1| | 15| guizhou| 3| nike shoes2| 2| | 11| tianjin| 3| nike shoes2| 1| | 3| shanghai| 3| nike shoes2| 1| | 8| zhejiang| 3| nike shoes2| 2| | 5| shenzhen| 3| nike shoes2| 2| | 17| fujian| 3| nike shoes2| 1| | 19| anhui| 3| nike shoes2| 3| | 9| jili| 3| nike shoes2| 1| | 1| beijing| 3| nike shoes2| 1| | 20| henan| 3| nike shoes2| 4| | 4| hangzhou| 3| nike shoes2| 1| | 13| hebei| 3| nike shoes2| 3| | 15| guizhou| 1| nike shoes| 1| | 3| shanghai| 1| nike shoes| 1| | 8| zhejiang| 1| nike shoes| 1| | 18|neimenggu| 1| nike shoes| 2| | 17| fujian| 1| nike shoes| 2| | 19| anhui| 1| nike shoes| 2| | 9| jili| 1| nike shoes| 2| +-------+---------+----------+------------+-----------------+
二、业务采集导入模块
(1)业务数据建模
编写数据库脚本实现各表创建,通过本地Navicat 工具实现数据建模,通过外键连接,表结构如下:
sku_info商品表
user_info用户表
base_category1商品一级分类表
base_category2商品二级分类表
base_category3商品三级分类表
order_detail订单详情表
payment_info支付流水表
order_info订单表
(2)业务数据数仓导入:
通过安装sqoop工具将mysql数据库中业务数据导入到HDFS,再导入hive数仓,sqoop原理是利用mapreduce中的map。(sqoop命令加入--null-string '\\N'、--null-non-string '\\N'字段)
import 把数据从关系型数据库 导到 数据,仓库,自定义InputFormat,
export 把数据从数据仓库 导到 关系型数据库,自定义OutputFormat,
用sqoop从mysql中将八张表的数据导入数仓的ods原始数据层全导导入按查询条件为where 1=1或无条件,增量导入按照当天时间,增量+变化按照创建时间或操作时间。
sqoop脚本解释:
bin/sqoop import (在sqoop的安装目录内,import表名是导入) --connect jdbc:mysql://192.168.52.130:3306/userdb (连接:协议:数据库类型://ip地址:端口号/数据库) --username root (用户名 root) --password 123456 (密码 123456) --table emp (表 emp) --delete-target-dir (如果指定目录存在就删除它) --target-dir /sqoop/emp (导入到指定目录) --fields-terminated-by '\t' (指定字段分割符为\t) --m 1 (--num-mappers:使用几个mapper,写1就可以)
Sqoop定时导入脚本
1)在/root/bin目录下创建脚本sqoop_import.sh
[root@hadoop102 bin]$ vim sqoop_import.sh
在脚本中填写如下内容
#!/bin/bash export HADOOP_USER_NAME=hive db_date=$2 echo $db_date db_name=eshop import_data() { sqoop import \ --connect jdbc:mysql://hadoop102:3306/$db_name \ --username root \ --password Yy8266603@ \ --target-dir /origin_data/$db_name/db/$1/$db_date \ --delete-target-dir \ --num-mappers 1 \ --fields-terminated-by "\t" \ --query "$2"' and $CONDITIONS;' } import_sku_info(){ import_data "sku_info" "select id, spu_id, price, sku_name, sku_desc, weight, tm_id, category3_id, create_time from sku_info where 1=1" } import_user_info(){ import_data "user_info" "select id, name, birthday, gender, email, user_level, create_time from user_info where 1=1" } import_base_category1(){ import_data "base_category1" "select id, name from base_category1 where 1=1" } import_base_category2(){ import_data "base_category2" "select id, name, category1_id from base_category2 where 1=1" } import_base_category3(){ import_data "base_category3" "select id, name, category2_id from base_category3 where 1=1" } import_order_detail(){ import_data "order_detail" "select od.id, order_id, user_id, sku_id, sku_name, order_price, sku_num, o.create_time from order_info o , order_detail od where o.id=od.order_id and DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date'" } import_payment_info(){ import_data "payment_info" "select id, out_trade_no, order_id, user_id, alipay_trade_no, total_amount, subject, payment_type, payment_time from payment_info where DATE_FORMAT(payment_time,'%Y-%m-%d')='$db_date'" } import_order_info(){ import_data "order_info" "select id, total_amount, order_status, user_id, payment_way, out_trade_no, create_time, operate_time from order_info where (DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date' or DATE_FORMAT(operate_time,'%Y-%m-%d')='$db_date')" } case $1 in "base_category1") import_base_category1 ;; "base_category2") import_base_category2 ;; "base_category3") import_base_category3 ;; "order_info") import_order_info ;; "order_detail") import_order_detail ;; "sku_info") import_sku_info ;; "user_info") import_user_info ;; "payment_info") import_payment_info ;; "all") import_base_category1 import_base_category2 import_base_category3 import_order_info import_order_detail import_sku_info import_user_info import_payment_info ;; esac
2)增加脚本执行权限
[root@master bin]$ chmod 777 sqoop_import.sh
3)执行脚本导入数据
[root@master]# sqoop_import.sh all 2019-02-10
三、离线数据仓库搭建
1.origin_data原始数据
sku_info商品表(每日导全量)
user_info用户表(每日导全量)
base_category1商品一级分类表(每日导全量)
base_category2商品二级分类表(每日导全量)
base_category3商品三级分类表(每日导全量)
order_detail 订单详情表 (用户、用户、地区、商品四个维度)事务型事实表(每日导增量)
payment_info支付流水表 事务型事实表(每日导增量)
order_info订单表(每日导增量+变化)
订单表
drop table if exists ods_order_info; create external table ods_order_info ( `id` string COMMENT '订单编号', `total_amount` decimal(10,2) COMMENT '订单金额', `order_status` string COMMENT '订单状态', `user_id` string COMMENT '用户id' , `payment_way` string COMMENT '支付方式', `out_trade_no` string COMMENT '支付流水号', `create_time` string COMMENT '创建时间', `operate_time` string COMMENT '操作时间' ) COMMENT '订单表' PARTITIONED BY ( `dt` string) row format delimited fields terminated by '\t' location '/warehouse/gmall/ods/ods_order_info/' ; 订单详情表(事实表) drop table if exists ods_order_detail; create external table ods_order_detail( `id` string COMMENT '订单编号', `order_id` string COMMENT '订单号', `user_id` string COMMENT '用户id' , `sku_id` string COMMENT '商品id', `sku_name` string COMMENT '商品名称', `order_price` string COMMENT '商品价格', `sku_num` string COMMENT '商品数量', `create_time` string COMMENT '创建时间' ) COMMENT '订单明细表' PARTITIONED BY ( `dt` string) row format delimited fields terminated by '\t' location '/warehouse/gmall/ods/ods_order_detail/' ;
商品表
drop table if exists ods_sku_info; create external table ods_sku_info( `id` string COMMENT 'skuId', `spu_id` string COMMENT 'spuid', `price` decimal(10,2) COMMENT '价格' , `sku_name` string COMMENT '商品名称', `sku_desc` string COMMENT '商品描述', `weight` string COMMENT '重量', `tm_id` string COMMENT '品牌id', `category3_id` string COMMENT '品类id', `create_time` string COMMENT '创建时间' ) COMMENT '商品表' PARTITIONED BY ( `dt` string) row format delimited fields terminated by '\t' location '/warehouse/gmall/ods/ods_sku_info/' ;
用户表
drop table if exists ods_user_info; create external table ods_user_info( `id` string COMMENT '用户id', `name` string COMMENT '姓名', `birthday` string COMMENT '生日' , `gender` string COMMENT '性别', `email` string COMMENT '邮箱', `user_level` string COMMENT '用户等级', `create_time` string COMMENT '创建时间' ) COMMENT '用户信息' PARTITIONED BY ( `dt` string) row format delimited fields terminated by '\t' location '/warehouse/gmall/ods/ods_user_info/' ;
商品一级分类表
drop table if exists ods_base_category1; create external table ods_base_category1( `id` string COMMENT 'id', `name` string COMMENT '名称' ) COMMENT '商品一级分类' PARTITIONED BY ( `dt` string) row format delimited fields terminated by '\t' location '/warehouse/gmall/ods/ods_base_category1/' ;
商品二级分类表
drop table if exists ods_base_category2; create external table ods_base_category2( `id` string COMMENT ' id', `name` string COMMENT '名称', category1_id string COMMENT '一级品类id' ) COMMENT '商品二级分类' PARTITIONED BY ( `dt` string) row format delimited fields terminated by '\t' location '/warehouse/gmall/ods/ods_base_category2/' ;
商品三级分类表
drop table if exists ods_base_category3; create external table ods_base_category3( `id` string COMMENT ' id', `name` string COMMENT '名称', category2_id string COMMENT '二级品类id' ) COMMENT '商品三级分类' PARTITIONED BY ( `dt` string) row format delimited fields terminated by '\t' location '/warehouse/gmall/ods/ods_base_category3/' ;
支付流水表
drop table if exists `ods_payment_info`; create external table `ods_payment_info`( `id` bigint COMMENT '编号', `out_trade_no` string COMMENT '对外业务编号', `order_id` string COMMENT '订单编号', `user_id` string COMMENT '用户编号', `alipay_trade_no` string COMMENT '支付宝交易流水编号', `total_amount` decimal(16,2) COMMENT '支付金额', `subject` string COMMENT '交易内容', `payment_type` string COMMENT '支付类型', `payment_time` string COMMENT '支付时间' ) COMMENT '支付流水表' PARTITIONED BY ( `dt` string) row format delimited fields terminated by '\t' location '/warehouse/gmall/ods/ods_payment_info/'; 导入脚本ods_db.sh #!/bin/bash APP=eshop hive=user/local/hive # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table "$APP".ods_order_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table "$APP".ods_order_detail partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table "$APP".ods_sku_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table "$APP".ods_user_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table "$APP".ods_payment_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table "$APP".ods_base_category1 partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table "$APP".ods_base_category2 partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table "$APP".ods_base_category3 partition(dt='$do_date'); " $hive -e "$sql"
2.ods层
(八张表,表名,字段跟mysql完全相同)
从origin_data把数据导入到ods层,表名在原表名前加ods_
3.dwd层
对ODS层数据进行判空过滤。对商品分类表进行维度退化(降维)。其他数据跟ods层一模一样。
事实表
1订单表 dwd_order_info
2.订单详情表 dwd_order_detail
3.支付流水表 dwd_payment_info
维度表
用户表 dwd_user_info
商品表 dwd_sku_info
其他表字段不变,唯独商品表,通过关联3张分类表,增加了
category3_id` string COMMENT '3id', category2_id` string COMMENT '2id', `category1_id` string COMMENT '1id', `category3_name` string COMMENT '3', `category2_name` string COMMENT '2', `category1_name` string COMMENT '1',
拉链表
订单表拉链表 dwd_order_info_his
`id` string COMMENT '订单编号', `total_amount` decimal(10,2) COMMENT '订单金额', `order_status` string COMMENT '订单状态', `user_id` string COMMENT '用户id' , `payment_way` string COMMENT '支付方式', `out_trade_no` string COMMENT '支付流水号', `create_time` string COMMENT '创建时间', `operate_time` string COMMENT '操作时间' , `start_date` string COMMENT '有效开始日期', `end_date` string COMMENT '有效结束日期'
1)创建订单表拉链表,字段跟拉链表一样,只增加了有效开始日期和有效结束日期
初始日期,从订单变化表ods_order_info导入数据,且让有效开始时间=当前日期,有效结束日期=9999-99-99
(从mysql导入数仓的时候就只导了新增的和变化的数据ods_order_info,dwd_order_info跟ods_order_info基本一样,只多了一个id的判空处理)
2)建一张拉链临时表dwd_order_info_his_tmp,字段跟拉链表完全一致
3)新的拉链表中应该有这几部分数据,
(1)增加订单变化表dwd_order_info的全部数据
(2)更新旧的拉链表左关联订单变化表dwd_order_info,关联字段:订单id, where 过滤出end_date只等于9999-99-99的数据,如果旧的拉链表中的end_date不等于9999-99-99,说明已经是终态了,不需要再更新
如果dwd_order_info.id is null , 没关联上,说明数据状态没变,让end_date还等于旧的end_date
如果dwd_order_info.id is not null , 关联上了,说明数据状态变了,让end_date等于当前日期-1
把查询结果插入到拉链临时表中
4)把拉链临时表覆盖到旧的拉链表中
4.dws层--按主题建宽表
项目宽表包括用户行为宽表、用户购买商品明细行为宽表、商品宽表等。
为什么要建宽表?
需求目标,把每个用户单日的行为聚合起来组成一张多列宽表,以便之后关联用户维度信息后进行,不同角度的统计分析。
从订单表 dwd_order_info 中获取 下单次数 和 下单总金额
从支付流水表 dwd_payment_info 中获取 支付次数 和 支付总金额
最终按照user_id聚合,获得明细。
5.ADS层
围绕商品、用户主题进行相关指标统计。
商品主题:
需求一:GMV成交总额
从用户行为宽表中dws_user_action,根据统计日期分组,聚合,直接sum就可以了。
1.商品销售排名
2.商品收藏排名
用户主题
需求二:转化率
1 新增用户占日活跃用户比率表
从日活跃数表 ads_uv_count 和 日新增设备数表 ads_new_mid_count 中取即可。
2 用户行为转化率表
漏斗行为分析:
从用户行为宽表dws_user_action中取,下单人数(只要下单次数>0),支付人数(只要支付次数>0)
从日活跃数表 ads_uv_count 中取活跃人数,然后对应的相除就可以了。
需求三:品牌复购率
需求:以月为单位统计,购买2次以上商品的用户
1 用户购买商品明细表(宽表)
2 品牌复购率表
从用户购买商品明细宽表dws_sale_detail_daycount中,根据品牌id--sku_tm_id聚合,计算每个品牌购买的总次数,购买人数a=购买次数>=1,两次及以上购买人数b=购买次数>=2,三次及以上购买人数c=购买次数>=3,
单次复购率=b/a,多次复购率=c/a