电商热门商品统计

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 针对常规电商网站进行大数据分析,通过完整大数据处理流程最终对每个区域热门商品进行统计,支持用户决策。

一、电商热门商品统计项目


(一)项目介绍

针对常规电商网站进行大数据分析,通过完整大数据处理流程最终对每个区域热门商品进行统计,支持用户决策。


项目流程及框架:Python-->Flume-->HDFS-->Mapreduce/Spark ETL-->HDFS-->Hive-->Sqoop-->Mysql


(二)需求分析

如何定义热门商品?

简单模型:通过用户对商品的点击量来衡量商品热度。

复杂模型:通过用户点击 + 购买以及收藏等综合数据对商品进行评价,商品热门程度得分模型 = 点击次数 * 2 + 购买次数 * 5 + 收藏次数 * 3, 其中2,5,3为得分权重。(后续更新)

如何获取区域?

通过用户点击日志,获取访问IP,进而获取区域信息。

通过数据库中的订单关联用户表,获取用户的地域信息

深度思考:如何去除爬虫水军(商家为了提高自己的排名,用爬虫来频繁访问自己的店铺)?

一段时间分析用户IP的访问次数

关键字


(三)技术方案

数据采集(ETL)

电商日志一般存储在日志服务器,通过 Flume 拉取到 HDFS 上,本文通过编写python程序模拟日志数据。

业务数据通过 Sqoop 从关系型数据库mysql中读取数据,然后导入到HDFS。

因为要访问数据库,所以会对数据库造成很大的压力,而且在真实的生产环境中,一般没有权限直接访问数据库。可以把数据导出成csv文件,放到日志服务器上,再通过Flume采集到HDFS上。假如有权限访问数据库,数据库也需要设置成读写分离的模式,来缓解压力。


数据清洗

使用 MapReduce 进行数据清洗。

使用 Spark Core 进行数据清洗。

各区域热门商品计算

使用 Hive 进行数据的分析和处理。

使用 Spark SQL 进行数据的分析和处理。

4.热门商品及PV实时计算(后续更新)


使用 Flink进行数据的分析和处理。


(四)实验数据及说明

1.product(商品)表:

屏幕截图 2022-10-24 220615.png


2.area_info(地区信息)表

image.png

补充说明: action_type: 1 收藏,2 加购物车,3 购买 area_id:已经通过IP地址,解析出了区域信息


3.area_hot_product(区域热门商品)表

image.png

补充说明: action_type: 1 收藏,2 加购物车,3 购买 area_id:已经通过IP地址,解析出了区域信息


4.area_hot_product(区域热门商品)表

image.png




(五)技术实现

1.使用Flume采集用户点击日志


Flume配置文件(flume-areahot.conf)

启动 Flume agent,在 Flume 的根目录下执行命令:bin/flume-ng agent -n a4 -f flume-areahot.conf -c conf -Dflume.root.logger=INFO,console

再执行python dslog.py向 /log0208 目录里放入用户日志文件(实现方法:此处

Flume 会将 /log0208 目录下的文件采集到 hdfs://master:9000/flume/ 当天日期 目录下。


2.python编写dslog.py模拟日志放入/log0208文件夹下,自定义添加不符合字段数据,要经过mr或spark进行数据清洗。


#coding=utf-8
import random
import time
iplist=[26,23,47,56,108,10,33,48,66,77,101,45,61,52,88,89,108,191,65,177,98,21,34,61,19,11,112,114]
url = "http://mystore.jsp/?productid={query}"
x=[1,2,3,4]
def use_id():
    return random.randint(1,20)
def get_ip():
    return '.'.join(str(x) for x in random.sample(iplist,4))
def urllist():
def sample_references():
    if random.uniform(0,1)>0.8:
        return ""
    query_str=random.sample(x,1)
    return url.format(query=query_str[0])
def get_time():
    return time.strftime('%Y%m%d%H%M%S',time.localtime())
#  action: 1 收藏,2 加购物车,3 购买  area_id代表不同区域
def action():
    return random.randint(1,4)
def area_id():
    return random.randint(1,21)
def get_log(count):
    while count>0:
        log='{},{},{},{},{},{}\n'.format(use_id(),get_ip(),urllist(),get_time(),action(),area_id())
        # with open('/usr/local/src/tmp/1.log','a+')as file:
        with open('/log0208/click.log','a+')as file:
            file.write(log)
        # print(log)
        # time.sleep(1)
        count=count-1
if __name__ == '__main__':
    get_log(10000)
#bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console
#定义agent名, source、channel、sink的名称
a4.sources = r1
a4.channels = c1
a4.sinks = k1
#具体定义source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /log0208
#具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100 
#定义拦截器,为消息添加时间戳
a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#具体定义sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://master:9000/flume/%Y%m%d
a4.sinks.k1.hdfs.filePrefix = events-
a4.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a4.sinks.k1.hdfs.rollCount = 0 
#HDFS上的文件达到128M时生成一个文件
a4.sinks.k1.hdfs.rollSize = 134217728
#HDFS上的文件达到60秒生成一个文件
a4.sinks.k1.hdfs.rollInterval = 60
#组装source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1

3.数据清洗


需要将用户点击日志里面对于商品的点击识别出来

过滤不满足6个字段的数据

过滤URL为空的数据,即:过滤出包含http开头的日志记录

实现方式一:使用 MapReduce 程序进行数据的清洗


1. CleanDataMain.java及CleanDataMapper.java代码实现:


import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CleanDataMain {
  public static void main(String[] args) throws Exception {
  //1、创建Job
  Job job = Job.getInstance(new Configuration());
  job.setJarByClass(CleanDataMain.class);
  //2、指定任务的Mapper和输出的类型
  job.setMapperClass(CleanDataMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(NullWritable.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(NullWritable.class);
  //4、任务的输入和输出
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  //5、执行
  job.waitForCompletion(true);
  }
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
过滤不满足6个字段的数据
过滤URL为空的数据,即:过滤出包含http开头的日志记录
 */
public class CleanDataMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
  @Override
  protected void map(LongWritable key1, Text value1, Context context)
    throws IOException, InterruptedException {
  String log = value1.toString();
  //分词
  String[] words = log.split(",");
  if(words.length == 6 && words[2].startsWith("http")){
    context.write(value1, NullWritable.get());
  }
  }
}

2.利用maven clean、maven install打成 jar 包,提交到 yarn 上运行:运行脚本run.sh,输入数据为Flume采集到的路径


HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
OUTPUT_PATH="/output/210219"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
hadoop jar /ds/MyMapReduceProject-0.0.1-SNAPSHOT.jar mapreduce.clean/CleanDataMain /flume/20210219/events-.1613712374044 /output/210219

3.过滤后结果查看:


[root@master ds]# hadoop fs -cat /output/210219/part-r-00000
1,201.105.101.102,http://mystore.jsp/?productid=1,2017020020,1,1
1,201.105.101.102,http://mystore.jsp/?productid=1,2017020029,2,1
1,201.105.101.102,http://mystore.jsp/?productid=4,2017020021,3,1
2,201.105.101.103,http://mystore.jsp/?productid=2,2017020022,1,1
3,201.105.101.105,http://mystore.jsp/?productid=3,2017020023,1,2
4,201.105.101.107,http://mystore.jsp/?productid=1,2017020025,1,1

 实现方式二:使用 Spark 程序进行数据的清洗


1.cleanData代码实现:


import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object CleanData {
  def main(args: Array[String]): Unit = {
    // 为了避免执行过程中打印过多的日志
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("CleanData")
    val sc = new SparkContext(conf)
    // 读取数据
    val fileRDD = sc.textFile(args(0))
    // 清洗数据
    val cleanDataRDD = fileRDD.map(_.split(",")).filter(_(2).startsWith("http")).filter(_.length == 6)
    // 将清洗后的结果保存到HDFS
    cleanDataRDD.saveAsTextFile(args(1))
    // 停止SparkContext
    sc.stop()
    println("Finished")
  }
}

2.同上打成 jar 包,提交到 spark 上运行:


bin/spark-submit /
--class clean.CleanData   /
--master spark://master:7077 /
/ds/people-0.0.1-SNAPSHOT.jar  /
hdfs://master:9000/flume/210219/events-.1613712374044   /
hdfs://master:9000/testOutput/

各区域热门商品热度统计:基于 Hive 和 Spark SQL

方式一:使用 Hive 进行统计


# 创建地区表:
create external table area
(area_id string,area_name string)
row format delimited fields terminated by ','
location '/input/hotproject/area';
# 创建商品表
create external table product
(product_id string,product_name string,
marque string,barcode string, price double,
brand_id string,market_price double,stock int,status int)
row format delimited fields terminated by ','
location '/input/hotproject/product';
# 创建一个临时表,用于保存用户点击的初始日志
create external table clicklogTemp
(user_id string,user_ip string,url string,click_time string,action_type string,area_id string)
row format delimited fields terminated by ','
location '/input/hotproject/cleandata';
# 创建用户点击日志表(注意:需要从上面的临时表中解析出product_id)
create external table clicklog
(user_id string,user_ip string,product_id string,click_time string,action_type string,area_id string)
row format delimited fields terminated by ','
location '/input/hotproject/clicklog';
#导入数据,业务一般用sqoop从mysql数据库导入到HDFS
load data  inpath "/input/data/areainfo.txt" into table area;
load data  inpath "/input/data/productinfo.txt" into table product;
#日志通过flume采集到HDFS
load data  inpath "/output/210220/part-r-00000" into table clicklogTemp;
insert into table clicklog
select user_id,user_ip,substring(url,instr(url,"=")+1),
click_time,action_type,area_id from clicklogTemp;
## 查询各地区商品热度
select a.area_id,b.area_name,a.product_id,c.product_name,count(a.product_id) pv  
from clicklog a join area b on a.area_id = b.area_id join product c on a.product_id = c.product_id  
group by a.area_id,b.area_name,a.product_id,c.product_name;
注意:在上面的例子中,我们建立一张临时表,然后从临时表中解析出productid 也可以直接使用hive的函数:parse_url进行解析,如下: parse_url(a.url,'QUERY','productid')
# 这样就可以不用创建临时表来保存中间状态的结果,修改后的Hive SQL如下:
select a.area_id,b.area_name,parse_url(a.url,'QUERY','productid'),
c.product_name,count(parse_url(a.url,'QUERY','productid'))  
from clicklog a join area b on a.area_id = b.area_id 
join product c on parse_url(a.url,'QUERY','productid') = c.product_id  
group by a.area_id,b.area_name,parse_url(a.url,'QUERY','productid'),c.product_name;
注:导入数据把握原则:能不导入数据,就不要导入数据(外部表),输出结果由于日志不同结果不同。

方式二:使用 Spark SQL 进行统计


1.Hotproduct.scala代码实现


package com.hot
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
//地区表
case class AreaInfo(area_id:String,area_name:String)
//商品表 用不到的数据,不要导入
case class ProductInfo(product_id:String,product_name:String,marque:String,barcode:String,price:Double,brand_id:String,market_price:Double,stock:Int,status:Int)
//经过清洗后的,用户点击日志信息
case class LogInfo(user_id:String,user_ip:String,product_id:String,click_time:String,action_type:String,area_id:String)
object HotProduct {
  def main(args:Array[String]):Unit={
    // 避免打印过多的日志
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    val spark=SparkSession.builder().master("local").appName("").getOrCreate()
//    val spark=SparkSession.builder().appName("").getOrCreate()
    import spark.sqlContext.implicits._
    //获取地区数据
    val areaDF = spark.sparkContext.textFile("hdfs://master:9000/input/data/areainfo1.txt")
      .map(_.split(",")).map(x=> AreaInfo(x(0),x(1))).toDF()
    areaDF.createTempView("area")
    //获取商品数据
    val productDF = spark.sparkContext.textFile("hdfs://master:9000/input/data/productinfo.txt")
      .map(_.split(",")).map(x=>  ProductInfo(x(0),x(1),x(2),x(3),x(4).toDouble,x(5),x(6).toDouble,x(7).toInt,x(8).toInt))
      .toDF()
    productDF.createTempView("product")
    //获取点击日志
    val clickLogDF = spark.sparkContext.textFile("hdfs://master:9000/output/210220/part-r-00000")
      .map(_.split(",")).map(x =>  LogInfo(x(0),x(1),x(2).substring(x(2).indexOf("=")+1),x(3),x(4),x(5)))
      .toDF()
    clickLogDF.createTempView("clicklog")
    //执行SQL
    // 通过SparkSQL分析各区域商品的热度,结果输出到屏幕
    val sql = "select a.area_id,a.area_name,p.product_id,product_name,count(c.product_id) from area a,product p,clicklog c where a.area_id=c.area_id and p.product_id=c.product_id group by a.area_id,a.area_name,p.product_id,p.product_name"
    spark.sql(sql).show()
//    var sql1 = " select concat(a.area_id,',',a.area_name,',',p.product_id,',',p.product_name,',',count(c.product_id)) "
//    sql1 = sql1 + " from area a,product p,clicklog c "
//    sql1 = sql1 + " where a.area_id=c.area_id and p.product_id=c.product_id "
//    sql1 = sql1 + " group by a.area_id,a.area_name,p.product_id,product_name "
//    spark.sql(sql1).repartition(1).write.text(args(3))
    spark.stop()
  }
}

2.Maven打包提交到spark集群上运行:


spark-submit --class com.hot.HotProduct --master spark://master:7077 hotspark-1.0-SNAPSHOT.jar 
#hdfs://master:9000/input/hotproject/area/areainfo.txt \ #hdfs://master:9000/input/hotproject/product/productinfo.txt \ #hdfs://master:9000/output/210219/part-r-00000 hdfs://master:9000/output/analysis

3.将hive分析结果插入另一个表,通过sqoop导入mysql关系数据库,最终实现电商可视化可视化页面展示。


+-------+---------+----------+------------+-----------------+
|area_id|area_name|product_id|product_name|count(product_id)|
+-------+---------+----------+------------+-----------------+
|      7|    hubei|         3| nike shoes2|                1|
|     15|  guizhou|         3| nike shoes2|                2|
|     11|  tianjin|         3| nike shoes2|                1|
|      3| shanghai|         3| nike shoes2|                1|
|      8| zhejiang|         3| nike shoes2|                2|
|      5| shenzhen|         3| nike shoes2|                2|
|     17|   fujian|         3| nike shoes2|                1|
|     19|    anhui|         3| nike shoes2|                3|
|      9|     jili|         3| nike shoes2|                1|
|      1|  beijing|         3| nike shoes2|                1|
|     20|    henan|         3| nike shoes2|                4|
|      4| hangzhou|         3| nike shoes2|                1|
|     13|    hebei|         3| nike shoes2|                3|
|     15|  guizhou|         1|  nike shoes|                1|
|      3| shanghai|         1|  nike shoes|                1|
|      8| zhejiang|         1|  nike shoes|                1|
|     18|neimenggu|         1|  nike shoes|                2|
|     17|   fujian|         1|  nike shoes|                2|
|     19|    anhui|         1|  nike shoes|                2|
|      9|     jili|         1|  nike shoes|                2|
+-------+---------+----------+------------+-----------------+


二、电商数仓采集


a40326b42ccd8c48d573b28d01954525_1def6cfbd716e47f5a010ba6d4faee2f.png

业务数据数仓模拟搭建:

通过sqoop将mysql数据库中业务数据导入到HDFS,再导入hive数仓,sqoop原理是利用mapreduce中的map。


import    把数据从关系型数据库 导到 数据,仓库,自定义InputFormat,


export    把数据从数据仓库 导到 关系型数据库,自定义OutputFormat,


用sqoop从mysql中将八张表的数据导入数仓的ods原始数据层全量无条件,增量按照创建时间,增量+变化按照创建时间或操作时间。

下图为表结构,依据各表关系进行数据建模。

adea29992237e52aa90aa45f8a44a375_40f744a0cd834cefff873ede60a54a15.png


三、离线数据仓库搭建


1.origin_data原始数据

sku_info商品表(每日导全量)


user_info用户表(每日导全量)


base_category1商品一级分类表(每日导全量)


base_category2商品二级分类表(每日导全量)


base_category3商品三级分类表(每日导全量)


order_detail订单详情表(每日导增量)


payment_info支付流水表(每日导增量)


order_info订单表(每日导增量+变化)


2.ods层

(八张表,表名,字段跟mysql完全相同)


从origin_data把数据导入到ods层,表名在原表名前加ods_


3.dwd层

对ODS层数据进行判空过滤。对商品分类表进行维度退化(降维)。其他数据跟ods层一模一样


订单表 dwd_order_info


订单详情表 dwd_order_detail


用户表 dwd_user_info


支付流水表 dwd_payment_info


商品表 dwd_sku_info


其他表字段不变,唯独商品表,通过关联3张分类表,增加了


category2_id` string COMMENT '2id',


`category1_id` string COMMENT '3id',


`category3_name` string COMMENT '3',


`category2_name` string COMMENT '2',


`category1_name` string COMMENT '1',


4.dws层


从订单表 dwd_order_info 中获取 下单次数 和 下单总金额

从支付流水表 dwd_payment_info 中获取 支付次数 和 支付总金额

最终按照user_id聚合,获得订单支付明细。


相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
XML JSON API
淘宝天猫API接入说明(淘宝天猫商品详情+关键词搜索商品列表)商品详情数据,商品sku数据,商品优惠券数据
业务场景:作为全球最大的 B2C 电子商务平台之一,淘宝天猫平台提供了丰富的商品资源,吸引了大量的全球买家和卖家。为了方便开发者接入淘宝天猫平台,淘宝天猫平台提供了丰富的 API 接口,其中历史价格接口是非常重要的一部分。大家有探讨稳定采集淘宝(天猫)京东阿里拼多多等平台整站实时商品详情历史价格数据接口,通过该接口开发者可以更好地了解商品的情况,商品详情数据详细信息查询,数据参数包括:商品链接,商品列表主图、价格、标题,sku,库存,销量,店铺昵称,店铺等级,商品详情SKU属性,商品视频,商品优惠券,促销信息,详情属性描述,宝贝ID,区域ID,发货地,发货至,快递费用,物流费用等页面上有的数据
|
6天前
|
JSON 数据挖掘 API
电商信息指南:API接口淘宝关键词、店铺所有商品获取
要获取淘宝关键词商品数据和店铺所有商品的API接口,需先注册淘宝开放平台账号并创建应用,获取API密钥。接着,使用密钥获取访问令牌,详细阅读API文档,构造并发送API请求,解析响应数据。特别地,使用`item_search_shop`接口可获取店铺内所有商品信息。
|
5月前
|
数据库
电商购物系统首页的商品分类
电商购物系统首页的商品分类
|
1月前
|
API 数据安全/隐私保护 开发者
淘宝 API:关键词搜商品列表接口,助力商家按价格销量排序分析数据
此接口用于通过关键词搜索淘宝商品列表。首先需在淘宝开放平台注册并创建应用获取API权限,之后利用应用密钥和访问令牌调用接口。请求参数包括关键词、页码、每页数量、排序方式及价格区间等。返回结果含总商品数量及具体商品详情。使用时需注意签名验证及官方文档更新。
|
6月前
|
数据采集 存储 数据挖掘
京东商品优惠券数据采集
京东商品优惠券数据采集
|
4月前
|
数据采集 供应链 API
电商商品详情数据和店铺所有商品数据
电商商品详情数据和店铺所有商品数据是电商运营中至关重要的两部分数据,它们对于商家来说具有极高的商业价值。以下是对这两部分数据的详细解析:
|
6月前
|
存储 数据采集 搜索推荐
商品比价系统实现
商品比价系统实现
384 4
|
6月前
|
搜索推荐 API
如何搭建私域获取淘宝店铺卖家订单信息trade.fullinfo.get
私域流量主要由已经对企业或品牌感兴趣并进行关注的用户组成,这些用户对企业具有一定的忠诚度和粘性
|
6月前
|
消息中间件 供应链 NoSQL
电商订单待支付(思路分析)
电商订单待支付(思路分析)
|
XML JSON 缓存
Java实现根据店铺ID或店铺名称获取京东店铺所有商品数据方法
Java实现根据店铺ID或店铺名称获取京东店铺所有商品数据方法