1. 实验室名称:
大数据实验教学系统
2. 实验项目名称:
案例:企业订单数据分析
一、业务场景
现有某公司销售数据集,包含orders、order details和products数据。其中:830个orders和2,155个order details。现公司领导提出,希望大数据分析部门通过分析该销售数据集,回答以下问题:
• 每个客户下了多少订单?
• 每个国家的订单有多少?
• 每月(年)有多少订单?
• 每个客户的年销售总额是多少?
• 客户每年的平均订单是多少?
二、数据集说明
本案例用到的数据集说明如下:
订单数据集文件:/data/dataset/nw/NW-Orders-01.csv 订单详情数据集文件:/data/dataset/nw/NW-Order-Details.csv
其中,NW-Orders-01.csv数据集包含订单信息,其中部分字段的说明如下:
字段 | 定义 |
OrderID | 订单ID |
CustomerID | 客户ID |
EmployeeID | 员工ID |
OrderDate | 下单日期 |
ShipCountry | 收货国家 |
其中,NW-Order-Details.csv数据集包含订单详细信息,其中部分字段的说明如下:
字段 | 定义 |
OrderID | 订单ID |
ProductID | 产品ID |
UnitPrice | 商品单价 |
Qty | 商品数量 |
Discount | 折扣 |
三、操作步骤
阶段一、启动HDFS、Spark集群服务和zeppelin服务器
1、启动HDFS、Spark集群和Zeppelin服务器
在Linux终端窗口下,输入以下命令,启动HDFS集群、Spark集群和zeppelin服务器:
1. $ start-dfs.sh 2. $ cd /opt/spark 3. $ ./sbin/start-all.sh 4. $ zeppelin-daemon.sh start
2、验证以上进程是否已启动
在Linux终端窗口下,输入以下命令,查看启动的服务进程:
1. $ jps
如果显示以下6个进程,则说明各项服务启动正常,可以继续下一阶段。
1. 2288 NameNode 2. 2402 DataNode 3. 2603 SecondaryNameNode 4. 2769 Master 5. 2891 Worker 6. 2984 ZeppelinServer
阶段二、准备案例中用到的数据集
1、将本案例要用到的数据集上传到HDFS文件系统的”/data/dataset/“目录下。在Linux终端窗口下,输入以下命令:
1. $ hdfs dfs -mkdir -p /data/dataset 2. $ hdfs dfs -put /data/dataset/nw /data/dataset/
2、在Linux终端窗口下,输入以下命令,查看HDFS上是否已经上传了该数据集:
1. $ hdfs dfs -ls /data/dataset/nw
这时应该看到数据集文件已经上传到了HDFS的”/data/dataset/“目录下。
阶段三、对数据集进行探索和分析
1、新建一个zeppelin notebook文件,并命名为nw_project。
2、加载数据集到RDD。在notebook单元格中,输入以下代码,加载数据集到RDD:
1. // 定义要加载数据集的HDFS路径 2. val filePath = "/data/dataset/nw/"
同时按下【Shift+Enter】键,执行以上代码。
3、加载订单数据到DataFrame中,进行简单探索,并输出其Schema。在notebook单元格中,输入以下代码:
1. // 订单数据 2. val orders = spark.read.option("header","true"). 3. option("inferSchema","true"). 4. csv(filePath + "NW-Orders-01.csv") 5. println("订单有" + orders.count() + "个")
同时按下【Shift+Enter】键,执行以上代码,输出内容如下:
orders: org.apache.spark.sql.DataFrame = [OrderID: int, CustomerID: string … 3 more fields] 订单有830个
简单探索,并输出其Schema。在notebook单元格中,输入以下代码:
1. orders.show(5) 2. orders.printSchema()
同时按下【Shift+Enter】键,执行以上代码,输出内容如下:
4、加载订单详情数据到DataFrame中,进行简单探索,并输出其Schema。在notebook单元格中,输入以下代码:
1. // 订单详细信息 2. val orderDetails = spark.read.option("header","true"). 3. option("inferSchema","true"). 4. csv(filePath + "NW-Order-Details.csv") 5. println("订单明细有" + orderDetails.count() + "个")
同时按下【Shift+Enter】键,执行以上代码,输出内容如下:
orderDetails: org.apache.spark.sql.DataFrame = [OrderID: int, ProductId: int … 3 more fields] 订单明细有2155个
简单探索,并输出其Schema。在notebook单元格中,输入以下代码:
1. orderDetails.show(5) 2. orderDetails.printSchema()
同时按下【Shift+Enter】键,执行以上代码,输出内容如下:
5、回答以下问题:每个客户下了多少订单?在notebook单元格中,输入以下代码:
1. val orderByCustomer = orders.groupBy("CustomerID").count() 2. orderByCustomer.sort(orderByCustomer("count").desc).show(5)
同时按下【Shift+Enter】键,执行以上代码,输出内容如下:
在上面的输出中,我们按订单数量由高到低顺序输出。
6、回答以下问题:来自不同国家的订单各有多少。在notebook单元格中,输入以下代码:
1. val orderByCountry = orders.groupBy("ShipCountry").count() 2. orderByCountry.sort($"count".desc).show(5)
同时按下【Shift+Enter】键,执行以上代码,输出内容如下:
由以上输出内容可以看出,订单最多的国家是德国和美国。
7、对于后面三个问题,需要对数据进行一些转换:
- 向Orders DataFrame增加一个OrderTotal列:
- 计算每个订单明细的实际金额
- 根据order id统计每张订单的总金额
- 对order details & orders进行等值内连接,增加订单总金额
- 检查是否有任何null列
- 增加一个date列
- 增加month和year
8、向order details中增加每行的小计(每个订单明细的实际金额)。在notebook单元格中,输入以下代码:
1. val orderDetails1 = orderDetails.select($"OrderID",(($"UnitPrice" * $"Qty") - ($"UnitPrice" * $"Qty") * $"Discount").as("OrderPrice")) 2. 3. orderDetails1.show(5)
同时按下【Shift+Enter】键,执行以上代码,输出内容如下:
由以上输出内容可以看出,订单最多的国家是德国和美国。
9、根据order id统计每张订单的总金额。在notebook单元格中,输入以下代码:
1. val orderTot = orderDetails1.groupBy("OrderID") 2. .agg(sum("OrderPrice").as("OrderTotal")) 3. 4. orderTot.select($"OrderID",bround($"OrderTotal",2)) 5. .sort("OrderID") 6. .show(5)
同时按下【Shift+Enter】键,执行以上代码,,输出内容如下:
10、对order details & orders进行等值内连接,增加订单总金额。在notebook单元格中,输入以下代码:
1. val orders1 = orders.join(orderTot, orders("OrderID").equalTo(orderTot("OrderID")), "inner"). 2. select(orders("OrderID"), 3. orders("CustomerID"), 4. orders("OrderDate"), 5. orders("ShipCountry").alias("ShipCountry"), 6. bround(orderTot("OrderTotal"),2).alias("Total") 7. ) 8. 9. orders1.sort($"Total".desc).show()
同时按下【Shift+Enter】键,执行以上代码,,输出内容如下:
11、增加一个date列。在notebook单元格中,输入以下代码:
1. val orders2 = orders1.withColumn("Date",to_date(orders1("OrderDate"))) 2. 3. orders2.show(2) 4. orders2.printSchema()
同时按下【Shift+Enter】键,执行以上代码,,输出内容如下:
12、增加month列和year列。在notebook单元格中,输入以下代码:
1. val orders3 = orders2.withColumn("Month",month($"OrderDate")) 2. .withColumn("Year",year($"OrderDate")) 3. 4. orders3.show(2)
同时按下【Shift+Enter】键,执行以上代码,,输出内容如下:
13、回答以下问题:每月/年有多少订单?在notebook单元格中,输入以下代码:
1. val ordersByYM = orders3.groupBy("Year","Month") 2. .agg(sum("Total").as("Total")) 3. 4. ordersByYM.select($"Year",$"Month",bround($"Total",2) as "Total") 5. .sort($"Year",$"Month") 6. .show()
同时按下【Shift+Enter】键,执行以上代码,,输出内容如下:
13、回答以下问题:每个客户的年销售总额是多少?在notebook单元格中,输入以下代码:
1. var ordersByCY = orders3.groupBy("CustomerID","Year") 2. .agg(sum("Total").as("Total")) 3. 4. ordersByCY.sort($"CustomerID",$"Year").show()
同时按下【Shift+Enter】键,执行以上代码,,输出内容如下:
14、回答以下问题:客户每年的平均订单是多少?在notebook单元格中,输入以下代码:
1. ordersByCY = orders3.groupBy("CustomerID","Year") 2. .agg(avg("Total").as("Avg")) 3. 4. ordersByCY.select($"CustomerID",$"Year",bround($"Avg",2)) 5. .sort($"CustomerID",$"Year") 6. .show()
同时按下【Shift+Enter】键,执行以上代码,,输出内容如下:
15、回答以下问题:客户的平均订单是多少?在notebook单元格中,输入以下代码:
1. val ordersCA = orders3.groupBy("CustomerID") 2. .agg(avg("Total").as("C-Avg")) 3. 4. ordersCA.sort(col("C-Avg").desc).show()
同时按下【Shift+Enter】键,执行以上代码,,输出内容如下:
阶段四、自行练习
1、使用本案例的数据集,查询指定客户每年的销售额,并可视化呈现结果。