大数据电商数仓分析项目

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
全局流量管理 GTM,标准版 1个月
简介: 大数据电商数仓分析项目

一、电商热门商品统计模块


(1)需求分析

如何定义热门商品?


简单模型:直接通过用户对商品的点击量来衡量商品热度。


复杂模型:依据各类别权重(后续补充)


如何获取区域?


通过用户点击日志,获取访问IP,进而获取区域信息。


通过数据库中的订单关联用户表,获取用户的地域信息


如何去除爬虫水军(商家为了提高自己的排名,用爬虫来频繁访问自己的店铺)?


一段时间分析用户IP的访问次数(后续补充)


(2)技术方案

数据采集(ETL)


电商日志一般存储在日志服务器,通过 Flume 拉取到 HDFS 上,本文通过编写python程序模拟日志数据。


业务数据通过 Sqoop 从关系型数据库mysql中读取数据,然后导入到HDFS。


因为要访问数据库,所以会对数据库造成很大的压力,而且在真实的生产环境中,一般没有权限直接访问数据库。可以把数据导出成csv文件,放到日志服务器上,再通过Flume采集到HDFS上。假如有权限访问数据库,数据库也需要设置成读写分离的模式,来缓解压力。


数据清洗


使用 MapReduce 进行数据清洗。


使用 Spark Core 进行数据清洗。


各区域热门商品计算


使用 Hive 进行数据的分析和处理。


使用 Spark SQL 进行数据的分析和处理


(3)实验数据及说明

product(商品)表:

image.png

补充说明: status: 下架-1,上架0,预售1


area_info(地区信息)表

image.png


补充说明: action_type: 1 收藏,2 加购物车,3 购买 area_id:已经通过IP地址,解析出了区域信息


area_hot_product(区域热门商品)表

image.png


image.png

(4)技术实现

使用Flume采集用户点击日志


Flume配置文件(flume-areahot.conf)


启动 Flume agent,在 Flume 的根目录下执行命令:bin/flume-ng agent -n a4 -f flume-areahot.conf -c conf -Dflume.root.logger=INFO,console


再执行python dslog.py向 /log0208 目录里放入用户日志文件(实现方法:此处


Flume 会将 /log0208 目录下的文件采集到 hdfs://master:9000/flume/ 当天日期 目录下。


     2.利用python编写程序模拟日志信息,jian放入/log0208文件夹下,自定义添加不符合字段数据,要经过mr或spark进行数据清洗。


运行dslog.py程序如下:


#coding=utf-8
import random
import time
iplist=[26,23,47,56,108,10,33,48,66,77,101,45,61,52,88,89,108,191,65,177,98,21,34,61,19,11,112,114]
url = "http://mystore.jsp/?productid={query}"
x=[1,2,3,4]
def use_id():
    return random.randint(1,20)
def get_ip():
    return '.'.join(str(x) for x in random.sample(iplist,4))
def urllist():
def sample_references():
    if random.uniform(0,1)>0.8:
        return ""
    query_str=random.sample(x,1)
    return url.format(query=query_str[0])
def get_time():
    return time.strftime('%Y%m%d%H%M%S',time.localtime())
#  action: 1 收藏,2 加购物车,3 购买  area_id代表不同区域
def action():
    return random.randint(1,4)
def area_id():
    return random.randint(1,21)
def get_log(count):
    while count>0:
        log='{},{},{},{},{},{}\n'.format(use_id(),get_ip(),urllist(),get_time(),action(),area_id())
        # with open('/usr/local/src/tmp/1.log','a+')as file:
        with open('/log0208/click.log','a+')as file:
            file.write(log)
        # print(log)
        # time.sleep(1)
        count=count-1
if __name__ == '__main__':
    get_log(10000)

生成日志结果截取:


5,10.26.56.45,http://mystore.jsp/?productid=1,20210222005139,1,19
2,10.101.98.47,http://mystore.jsp/?productid=1,20210222005139,3,8
17,191.88.66.108,http://mystore.jsp/?productid=3,20210222005139,2,14
4,89.21.33.108,,20210222005139,2,10
4,108.23.48.114,http://mystore.jsp/?productid=4,20210222005139,1,21
8,21.48.19.65,,20210222005139,1,3
16,61.21.89.11,http://mystore.jsp/?productid=2,20210222005139,3,11
6,56.47.112.88,,20210222005139,1,3

flume-areahot.conf配置文件如下:


#bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console
#定义agent名, source、channel、sink的名称
a4.sources = r1
a4.channels = c1
a4.sinks = k1
#具体定义source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /log0208
#具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100 
#定义拦截器,为消息添加时间戳
a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#具体定义sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://master:9000/flume/%Y%m%d
a4.sinks.k1.hdfs.filePrefix = events-
a4.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a4.sinks.k1.hdfs.rollCount = 0 
#HDFS上的文件达到128M时生成一个文件
a4.sinks.k1.hdfs.rollSize = 134217728
#HDFS上的文件达到60秒生成一个文件
a4.sinks.k1.hdfs.rollInterval = 60
#组装source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1


3.数据清洗


需要将用户点击日志里面对于商品的点击识别出来


过滤不满足6个字段的数据


过滤URL为空的数据,即:过滤出包含http开头的日志记录


方式一:使用 MapReduce 程序进行数据的清洗

1. CleanDataMain.java及CleanDataMapper.java代码实现:


import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CleanDataMain {
  public static void main(String[] args) throws Exception {
  //1、创建Job
  Job job = Job.getInstance(new Configuration());
  job.setJarByClass(CleanDataMain.class);
  //2、指定任务的Mapper和输出的类型
  job.setMapperClass(CleanDataMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(NullWritable.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(NullWritable.class);
  //4、任务的输入和输出
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  //5、执行
  job.waitForCompletion(true);
  }
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
过滤不满足6个字段的数据
过滤URL为空的数据,即:过滤出包含http开头的日志记录
 */
public class CleanDataMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
  @Override
  protected void map(LongWritable key1, Text value1, Context context)
    throws IOException, InterruptedException {
  String log = value1.toString();
  //分词
  String[] words = log.split(",");
  if(words.length == 6 && words[2].startsWith("http")){
    context.write(value1, NullWritable.get());
  }
  }
}

2.利用maven clean、maven install打成 jar 包,提交到 yarn 上运行:运行脚本run.sh,输入数据为Flume采集到的路径


HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
OUTPUT_PATH="/output/210219"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
hadoop jar /ds/MyMapReduceProject-0.0.1-SNAPSHOT.jar mapreduce.clean/CleanDataMain /flume/20210219/events-.1613712374044 /output/210219

3.过滤后结果查看:


[root@master ds]# hadoop fs -cat /output/210219/part-r-00000
1,201.105.101.102,http://mystore.jsp/?productid=1,2017020020,1,1
1,201.105.101.102,http://mystore.jsp/?productid=1,2017020029,2,1
1,201.105.101.102,http://mystore.jsp/?productid=4,2017020021,3,1
2,201.105.101.103,http://mystore.jsp/?productid=2,2017020022,1,1
3,201.105.101.105,http://mystore.jsp/?productid=3,2017020023,1,2
4,201.105.101.107,http://mystore.jsp/?productid=1,2017020025,1,1

 方式二:使用 Spark 程序进行数据的清洗

1.cleanData代码实现:


import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object CleanData {
  def main(args: Array[String]): Unit = {
    // 为了避免执行过程中打印过多的日志
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("CleanData")
    val sc = new SparkContext(conf)
    // 读取数据
    val fileRDD = sc.textFile(args(0))
    // 清洗数据
    val cleanDataRDD = fileRDD.map(_.split(",")).filter(_(2).startsWith("http")).filter(_.length == 6)
    // 将清洗后的结果保存到HDFS
    cleanDataRDD.saveAsTextFile(args(1))
    // 停止SparkContext
    sc.stop()
    println("Finished")
  }
}


   2.同上打成 jar 包,提交到 spark 上运行:


bin/spark-submit /
--class clean.CleanData   /
--master spark://master:7077 /
/ds/people-0.0.1-SNAPSHOT.jar  /
hdfs://master:9000/flume/210219/events-.1613712374044   /
hdfs://master:9000/testOutput/    
(5)各区域热门商品热度统计:基于 Hive 和 Spark SQL
 方式一:使用 Hive 进行统计
# 创建地区表:
create external table area
(area_id string,area_name string)
row format delimited fields terminated by ','
location '/input/hotproject/area';
# 创建商品表
create external table product
(product_id string,product_name string,
marque string,barcode string, price double,
brand_id string,market_price double,stock int,status int)
row format delimited fields terminated by ','
location '/input/hotproject/product';
# 创建一个临时表,用于保存用户点击的初始日志
create external table clicklogTemp
(user_id string,user_ip string,url string,click_time string,action_type string,area_id string)
row format delimited fields terminated by ','
location '/input/hotproject/cleandata';
# 创建用户点击日志表(注意:需要从上面的临时表中解析出product_id)
create external table clicklog
(user_id string,user_ip string,product_id string,click_time string,action_type string,area_id string)
row format delimited fields terminated by ','
location '/input/hotproject/clicklog';
#导入数据,业务一般用sqoop从mysql数据库导入到HDFS
load data  inpath "/input/data/areainfo.txt" into table area;
load data  inpath "/input/data/productinfo.txt" into table product;
#日志通过flume采集到HDFS
load data  inpath "/output/210220/part-r-00000" into table clicklogTemp;
insert into table clicklog
select user_id,user_ip,substring(url,instr(url,"=")+1),
click_time,action_type,area_id from clicklogTemp;
## 查询各地区商品热度
select a.area_id,b.area_name,a.product_id,c.product_name,count(a.product_id) pv 
from clicklog a join area b on a.area_id = b.area_id join product c on a.product_id = c.product_id
group by a.area_id,b.area_name,a.product_id,c.product_name;
注意:在上面的例子中,我们建立一张临时表,然后从临时表中解析出productid 也可以直接使用hive的函数:parse_url进行解析,如下: parse_url(a.url,'QUERY','productid')
# 这样就可以不用创建临时表来保存中间状态的结果,修改后的Hive SQL如下:
select a.area_id,b.area_name,parse_url(a.url,'QUERY','productid'),
c.product_name,count(parse_url(a.url,'QUERY','productid'))
from clicklogtemp a join area b on a.area_id = b.area_id
join product c on parse_url(a.url,'QUERY','productid') = c.product_id
group by a.area_id,b.area_name,parse_url(a.url,'QUERY','productid'),c.product_name;

输出结果,最后一列为PV


a.area_id    b.area_name    a.product_id    c.product_name    pv
1    beijing    2    nike shoes1    2
1    beijing    3    nike shoes2    1
1    beijing    4    nike shoes4    1
10    heilongjiang    2    nike shoes1    3
11    tianjin    2    nike shoes1    1
11    tianjin    3    nike shoes2    1
11    tianjin    4    nike shoes4    2


上述语句可以通过insert into 导入另一个新表,将hive分析结果插入另一个表,通过sqoop导入mysql关系数据库,最终实现电商可视化可视化页面展示。


insert into table result
select a.area_id,b.area_name,parse_url(a.url,'QUERY','productid'),c.product_name,count(parse_url(a.url,'QUERY','productid'))
from clicklogtemp a join area b on a.area_id = b.area_id
join product c on parse_url(a.url,'QUERY','productid') = c.product_id
group by a.area_id,b.area_name,parse_url(a.url,'QUERY','productid'),c.product_name;

注:导入数据把握原则:能不导入数据,就不要导入数据(外部表),输出结果由于日志不同结果不同。


方式二:使用 Spark SQL 进行统计
 1.Hotproduct.scala代码实现
package com.hot
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
//地区表
case class AreaInfo(area_id:String,area_name:String)
//商品表 用不到的数据,不要导入
case class ProductInfo(product_id:String,product_name:String,marque:String,barcode:String,price:Double,brand_id:String,market_price:Double,stock:Int,status:Int)
//经过清洗后的,用户点击日志信息
case class LogInfo(user_id:String,user_ip:String,product_id:String,click_time:String,action_type:String,area_id:String)
object HotProduct {
  def main(args:Array[String]):Unit={
    // 避免打印过多的日志
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    val spark=SparkSession.builder().master("local").appName("").getOrCreate()
//    val spark=SparkSession.builder().appName("").getOrCreate()
    import spark.sqlContext.implicits._
    //获取地区数据
    val areaDF = spark.sparkContext.textFile("hdfs://master:9000/input/data/areainfo1.txt")
      .map(_.split(",")).map(x=> AreaInfo(x(0),x(1))).toDF()
    areaDF.createTempView("area")
    //获取商品数据
    val productDF = spark.sparkContext.textFile("hdfs://master:9000/input/data/productinfo.txt")
      .map(_.split(",")).map(x=>  ProductInfo(x(0),x(1),x(2),x(3),x(4).toDouble,x(5),x(6).toDouble,x(7).toInt,x(8).toInt))
      .toDF()
    productDF.createTempView("product")
    //获取点击日志
    val clickLogDF = spark.sparkContext.textFile("hdfs://master:9000/output/210220/part-r-00000")
      .map(_.split(",")).map(x =>  LogInfo(x(0),x(1),x(2).substring(x(2).indexOf("=")+1),x(3),x(4),x(5)))
      .toDF()
    clickLogDF.createTempView("clicklog")
    //执行SQL
    // 通过SparkSQL分析各区域商品的热度,结果输出到屏幕
    val sql = "select a.area_id,a.area_name,p.product_id,product_name,count(c.product_id) from area a,product p,clicklog c where a.area_id=c.area_id and p.product_id=c.product_id group by a.area_id,a.area_name,p.product_id,p.product_name"
    spark.sql(sql).show()
//    var sql1 = " select concat(a.area_id,',',a.area_name,',',p.product_id,',',p.product_name,',',count(c.product_id)) "
//    sql1 = sql1 + " from area a,product p,clicklog c "
//    sql1 = sql1 + " where a.area_id=c.area_id and p.product_id=c.product_id "
//    sql1 = sql1 + " group by a.area_id,a.area_name,p.product_id,product_name "
//    spark.sql(sql1).repartition(1).write.text(args(3))
    spark.stop()
  }
}

2.Maven打包提交到spark集群上运行:


spark-submit --class com.hot.HotProduct --master spark://master:7077 hotspark-1.0-SNAPSHOT.jar 
#hdfs://master:9000/input/hotproject/area/areainfo.txt \ #hdfs://master:9000/input/hotproject/product/productinfo.txt \ #hdfs://master:9000/output/210219/part-r-00000 hdfs://master:9000/output/analysis
+-------+---------+----------+------------+-----------------+                   
|area_id|area_name|product_id|product_name|count(product_id)|
+-------+---------+----------+------------+-----------------+
|      7|    hubei|         3| nike shoes2|                1|
|     15|  guizhou|         3| nike shoes2|                2|
|     11|  tianjin|         3| nike shoes2|                1|
|      3| shanghai|         3| nike shoes2|                1|
|      8| zhejiang|         3| nike shoes2|                2|
|      5| shenzhen|         3| nike shoes2|                2|
|     17|   fujian|         3| nike shoes2|                1|
|     19|    anhui|         3| nike shoes2|                3|
|      9|     jili|         3| nike shoes2|                1|
|      1|  beijing|         3| nike shoes2|                1|
|     20|    henan|         3| nike shoes2|                4|
|      4| hangzhou|         3| nike shoes2|                1|
|     13|    hebei|         3| nike shoes2|                3|
|     15|  guizhou|         1|  nike shoes|                1|
|      3| shanghai|         1|  nike shoes|                1|
|      8| zhejiang|         1|  nike shoes|                1|
|     18|neimenggu|         1|  nike shoes|                2|
|     17|   fujian|         1|  nike shoes|                2|
|     19|    anhui|         1|  nike shoes|                2|
|      9|     jili|         1|  nike shoes|                2|
+-------+---------+----------+------------+-----------------+


二、业务采集导入模块


(1)业务数据建模

编写数据库脚本实现各表创建,通过本地Navicat 工具实现数据建模,通过外键连接,表结构如下:


sku_info商品表


user_info用户表


base_category1商品一级分类表


base_category2商品二级分类表


base_category3商品三级分类表


order_detail订单详情表


payment_info支付流水表


order_info订单表

image.png



(2)业务数据数仓导入:

通过安装sqoop工具将mysql数据库中业务数据导入到HDFS,再导入hive数仓,sqoop原理是利用mapreduce中的map。(sqoop命令加入--null-string '\\N'、--null-non-string '\\N'字段)


import    把数据从关系型数据库 导到 数据,仓库,自定义InputFormat,


export    把数据从数据仓库 导到 关系型数据库,自定义OutputFormat,


用sqoop从mysql中将八张表的数据导入数仓的ods原始数据层全导导入按查询条件为where 1=1或无条件,增量导入按照当天时间,增量+变化按照创建时间或操作时间。


sqoop脚本解释:


bin/sqoop  import       (在sqoop的安装目录内,import表名是导入)
--connect jdbc:mysql://192.168.52.130:3306/userdb   (连接:协议:数据库类型://ip地址:端口号/数据库)
--username root     (用户名 root)
--password 123456  (密码 123456)
--table emp    (表 emp)
--delete-target-dir  (如果指定目录存在就删除它)
--target-dir /sqoop/emp (导入到指定目录)
--fields-terminated-by '\t'  (指定字段分割符为\t)
--m 1 (--num-mappers:使用几个mapper,写1就可以)

Sqoop定时导入脚本

1)在/root/bin目录下创建脚本sqoop_import.sh


[root@hadoop102 bin]$ vim sqoop_import.sh

在脚本中填写如下内容


#!/bin/bash
export HADOOP_USER_NAME=hive
db_date=$2
echo $db_date
db_name=eshop
import_data() {
sqoop import \
--connect jdbc:mysql://hadoop102:3306/$db_name \
--username root \
--password Yy8266603@ \
--target-dir  /origin_data/$db_name/db/$1/$db_date \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t" \
--query "$2"' and  $CONDITIONS;'
}
import_sku_info(){
  import_data  "sku_info"  "select 
id, spu_id, price, sku_name, sku_desc, weight, tm_id,
category3_id, create_time 
  from sku_info  where 1=1"
}
import_user_info(){
  import_data "user_info" "select 
id, name, birthday, gender, email, user_level, 
create_time 
from user_info where 1=1"
}
import_base_category1(){
  import_data "base_category1" "select 
id, name from base_category1 where 1=1"
}
import_base_category2(){
  import_data "base_category2" "select 
id, name, category1_id from base_category2 where 1=1"
}
import_base_category3(){
  import_data "base_category3" "select id, name, category2_id from base_category3 where 1=1"
}
import_order_detail(){
  import_data   "order_detail"   "select 
    od.id, 
    order_id, 
    user_id, 
    sku_id, 
    sku_name, 
    order_price, 
    sku_num, 
    o.create_time  
  from order_info o , order_detail od 
  where o.id=od.order_id 
  and DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date'"
}
import_payment_info(){
  import_data "payment_info"   "select 
    id,  
    out_trade_no, 
    order_id, 
    user_id, 
    alipay_trade_no, 
    total_amount,  
    subject, 
    payment_type, 
    payment_time 
  from payment_info 
  where DATE_FORMAT(payment_time,'%Y-%m-%d')='$db_date'"
}
import_order_info(){
  import_data   "order_info"   "select 
    id, 
    total_amount, 
    order_status, 
    user_id, 
    payment_way, 
    out_trade_no, 
    create_time, 
    operate_time  
  from order_info 
  where  (DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date' or DATE_FORMAT(operate_time,'%Y-%m-%d')='$db_date')"
}
case $1 in
  "base_category1")
     import_base_category1
;;
  "base_category2")
     import_base_category2
;;
  "base_category3")
     import_base_category3
;;
  "order_info")
     import_order_info
;;
  "order_detail")
     import_order_detail
;;
  "sku_info")
     import_sku_info
;;
  "user_info")
     import_user_info
;;
  "payment_info")
     import_payment_info
;;
   "all")
   import_base_category1
   import_base_category2
   import_base_category3
   import_order_info
   import_order_detail
   import_sku_info
   import_user_info
   import_payment_info
;;
esac

2)增加脚本执行权限


[root@master bin]$ chmod 777 sqoop_import.sh


3)执行脚本导入数据

[root@master]# sqoop_import.sh all 2019-02-10


三、离线数据仓库搭建


1.origin_data原始数据

sku_info商品表(每日导全量)


user_info用户表(每日导全量)


base_category1商品一级分类表(每日导全量)


base_category2商品二级分类表(每日导全量)


base_category3商品三级分类表(每日导全量)


order_detail  订单详情表 (用户、用户、地区、商品四个维度)事务型事实表(每日导增量)


payment_info支付流水表 事务型事实表(每日导增量)


order_info订单表(每日导增量+变化)


订单表


drop table if exists ods_order_info;
create external table ods_order_info ( 
    `id` string COMMENT '订单编号',
    `total_amount` decimal(10,2) COMMENT '订单金额', 
    `order_status` string COMMENT '订单状态', 
    `user_id` string COMMENT '用户id' ,
    `payment_way` string COMMENT '支付方式',  
    `out_trade_no` string COMMENT '支付流水号',  
    `create_time` string COMMENT '创建时间',  
    `operate_time` string COMMENT '操作时间' 
) COMMENT '订单表'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/gmall/ods/ods_order_info/'
;
订单详情表(事实表)
drop table if exists ods_order_detail;
create external table ods_order_detail( 
    `id` string COMMENT '订单编号',
    `order_id` string  COMMENT '订单号', 
    `user_id` string COMMENT '用户id' ,
    `sku_id` string COMMENT '商品id',  
    `sku_name` string COMMENT '商品名称',  
    `order_price` string COMMENT '商品价格',  
    `sku_num` string COMMENT '商品数量',  
    `create_time` string COMMENT '创建时间'
) COMMENT '订单明细表'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/gmall/ods/ods_order_detail/'
;


商品表


drop table if exists ods_sku_info;
create external table ods_sku_info( 
    `id` string COMMENT 'skuId',
    `spu_id` string   COMMENT 'spuid', 
    `price` decimal(10,2) COMMENT '价格' ,
    `sku_name` string COMMENT '商品名称',  
    `sku_desc` string COMMENT '商品描述',  
    `weight` string COMMENT '重量',  
    `tm_id` string COMMENT '品牌id',  
    `category3_id` string COMMENT '品类id',  
    `create_time` string COMMENT '创建时间'
) COMMENT '商品表'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/gmall/ods/ods_sku_info/'
;

用户表


drop table if exists ods_user_info;
create external table ods_user_info( 
    `id` string COMMENT '用户id',
    `name`  string COMMENT '姓名', 
    `birthday` string COMMENT '生日' ,
    `gender` string COMMENT '性别',  
    `email` string COMMENT '邮箱',  
    `user_level` string COMMENT '用户等级',  
    `create_time` string COMMENT '创建时间'
) COMMENT '用户信息'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/gmall/ods/ods_user_info/'
;

商品一级分类表


drop table if exists ods_base_category1;
create external table ods_base_category1( 
    `id` string COMMENT 'id',
    `name`  string COMMENT '名称'
) COMMENT '商品一级分类'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/gmall/ods/ods_base_category1/'
;



商品二级分类表


drop table if exists ods_base_category2;
create external table ods_base_category2( 
    `id` string COMMENT ' id',
    `name`  string COMMENT '名称',
    category1_id string COMMENT '一级品类id'
) COMMENT '商品二级分类'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/gmall/ods/ods_base_category2/'
;

商品三级分类表


drop table if exists ods_base_category3;
create external table ods_base_category3( 
    `id` string COMMENT ' id',
    `name`  string COMMENT '名称',
    category2_id string COMMENT '二级品类id'
) COMMENT '商品三级分类'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/gmall/ods/ods_base_category3/'
;

支付流水表


drop table if exists `ods_payment_info`;
create external table  `ods_payment_info`(
    `id`   bigint COMMENT '编号',
    `out_trade_no`    string COMMENT '对外业务编号',
    `order_id`        string COMMENT '订单编号',
    `user_id`         string COMMENT '用户编号',
    `alipay_trade_no` string COMMENT '支付宝交易流水编号',
    `total_amount`    decimal(16,2) COMMENT '支付金额',
    `subject`         string COMMENT '交易内容',
    `payment_type` string COMMENT '支付类型',
    `payment_time`   string COMMENT '支付时间'
   )  COMMENT '支付流水表'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/gmall/ods/ods_payment_info/';
导入脚本ods_db.sh
#!/bin/bash
   APP=eshop
   hive=user/local/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
    do_date=$1
else 
    do_date=`date -d "-1 day" +%F`
fi
sql=" 
load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table "$APP".ods_order_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table "$APP".ods_order_detail partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table "$APP".ods_sku_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table "$APP".ods_user_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table "$APP".ods_payment_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table "$APP".ods_base_category1 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table "$APP".ods_base_category2 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table "$APP".ods_base_category3 partition(dt='$do_date'); 
"
$hive -e "$sql"

2.ods层

(八张表,表名,字段跟mysql完全相同)


从origin_data把数据导入到ods层,表名在原表名前加ods_


3.dwd层

对ODS层数据进行判空过滤。对商品分类表进行维度退化(降维)。其他数据跟ods层一模一样。


事实表


1订单表 dwd_order_info


2.订单详情表 dwd_order_detail


3.支付流水表 dwd_payment_info


维度表


用户表 dwd_user_info


商品表 dwd_sku_info


其他表字段不变,唯独商品表,通过关联3张分类表,增加了


category3_id` string COMMENT '3id',  
        category2_id` string COMMENT '2id',  
        `category1_id` string COMMENT '1id',  
        `category3_name` string COMMENT '3',  
        `category2_name` string COMMENT '2',  
        `category1_name` string COMMENT '1',

拉链表



订单表拉链表 dwd_order_info_his

 

`id` string COMMENT '订单编号',
    `total_amount` decimal(10,2) COMMENT '订单金额',
    `order_status` string COMMENT '订单状态',
    `user_id` string COMMENT '用户id' ,
    `payment_way` string COMMENT '支付方式',  
    `out_trade_no` string COMMENT '支付流水号',  
    `create_time` string COMMENT '创建时间',  
    `operate_time` string COMMENT '操作时间' ,
    `start_date`  string COMMENT '有效开始日期',
    `end_date`  string COMMENT '有效结束日期'



1)创建订单表拉链表,字段跟拉链表一样,只增加了有效开始日期和有效结束日期

初始日期,从订单变化表ods_order_info导入数据,且让有效开始时间=当前日期,有效结束日期=9999-99-99

(从mysql导入数仓的时候就只导了新增的和变化的数据ods_order_info,dwd_order_info跟ods_order_info基本一样,只多了一个id的判空处理)



2)建一张拉链临时表dwd_order_info_his_tmp,字段跟拉链表完全一致



3)新的拉链表中应该有这几部分数据,

   (1)增加订单变化表dwd_order_info的全部数据

   (2)更新旧的拉链表左关联订单变化表dwd_order_info,关联字段:订单id, where 过滤出end_date只等于9999-99-99的数据,如果旧的拉链表中的end_date不等于9999-99-99,说明已经是终态了,不需要再更新

   如果dwd_order_info.id is null , 没关联上,说明数据状态没变,让end_date还等于旧的end_date

   如果dwd_order_info.id is not null , 关联上了,说明数据状态变了,让end_date等于当前日期-1

   把查询结果插入到拉链临时表中



4)把拉链临时表覆盖到旧的拉链表中


4.dws层--按主题建宽表

项目宽表包括用户行为宽表、用户购买商品明细行为宽表、商品宽表等。


为什么要建宽表?

需求目标,把每个用户单日的行为聚合起来组成一张多列宽表,以便之后关联用户维度信息后进行,不同角度的统计分析。

image.png

从订单表 dwd_order_info 中获取 下单次数 和 下单总金额

从支付流水表 dwd_payment_info 中获取 支付次数 和 支付总金额

最终按照user_id聚合,获得明细。


5.ADS层

围绕商品、用户主题进行相关指标统计。

image.png

商品主题:

需求一:GMV成交总额



从用户行为宽表中dws_user_action,根据统计日期分组,聚合,直接sum就可以了。


1.商品销售排名


2.商品收藏排名


用户主题

需求二:转化率

1 新增用户占日活跃用户比率表


image.png

从日活跃数表 ads_uv_count 和 日新增设备数表 ads_new_mid_count 中取即可。


2 用户行为转化率表

漏斗行为分析:

image.png


从用户行为宽表dws_user_action中取,下单人数(只要下单次数>0),支付人数(只要支付次数>0)

从日活跃数表 ads_uv_count 中取活跃人数,然后对应的相除就可以了。


需求三:品牌复购率

需求:以月为单位统计,购买2次以上商品的用户


1 用户购买商品明细表(宽表)

image.png


2 品牌复购率表

image.png


从用户购买商品明细宽表dws_sale_detail_daycount中,根据品牌id--sku_tm_id聚合,计算每个品牌购买的总次数,购买人数a=购买次数>=1,两次及以上购买人数b=购买次数>=2,三次及以上购买人数c=购买次数>=3,

单次复购率=b/a,多次复购率=c/a

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
22天前
|
机器学习/深度学习 算法 搜索推荐
从理论到实践,Python算法复杂度分析一站式教程,助你轻松驾驭大数据挑战!
【10月更文挑战第4天】在大数据时代,算法效率至关重要。本文从理论入手,介绍时间复杂度和空间复杂度两个核心概念,并通过冒泡排序和快速排序的Python实现详细分析其复杂度。冒泡排序的时间复杂度为O(n^2),空间复杂度为O(1);快速排序平均时间复杂度为O(n log n),空间复杂度为O(log n)。文章还介绍了算法选择、分而治之及空间换时间等优化策略,帮助你在大数据挑战中游刃有余。
46 4
|
22天前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
57 5
|
1天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
11 1
|
1天前
|
人工智能 供应链 搜索推荐
大数据分析:解锁商业智能的秘密武器
【10月更文挑战第31天】在信息爆炸时代,大数据分析成为企业解锁商业智能的关键工具。本文探讨了大数据分析在客户洞察、风险管理、供应链优化、产品开发和决策支持等方面的应用,强调了明确分析目标、选择合适工具、培养专业人才和持续优化的重要性,并展望了未来的发展趋势。
|
5天前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
44 2
|
27天前
|
分布式计算 大数据 Serverless
云栖实录 | 开源大数据全面升级:Native 核心引擎、Serverless 化、湖仓架构引领云上大数据发展
在2024云栖大会开源大数据专场上,阿里云宣布推出实时计算Flink产品的新一代向量化流计算引擎Flash,该引擎100%兼容Apache Flink标准,性能提升5-10倍,助力企业降本增效。此外,EMR Serverless Spark产品启动商业化,提供全托管Serverless服务,性能提升300%,并支持弹性伸缩与按量付费。七猫免费小说也分享了其在云上数据仓库治理的成功实践。其次 Flink Forward Asia 2024 将于11月在上海举行,欢迎报名参加。
135 1
云栖实录 | 开源大数据全面升级:Native 核心引擎、Serverless 化、湖仓架构引领云上大数据发展
|
14天前
|
机器学习/深度学习 监控 搜索推荐
电商平台如何精准抓住你的心?揭秘大数据背后的神秘推荐系统!
【10月更文挑战第12天】在信息爆炸时代,数据驱动决策成为企业优化决策的关键方法。本文以某大型电商平台的商品推荐系统为例,介绍其通过收集用户行为数据,经过预处理、特征工程、模型选择与训练、评估优化及部署监控等步骤,实现个性化商品推荐,提升用户体验和销售额的过程。
54 1
|
17天前
|
存储 SQL 分布式计算
湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
【10月更文挑战第7天】湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
22 1
|
22天前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
23 4
|
21天前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
26 2