企业spark案例 —出租车轨迹分析

简介: 企业spark案例 —出租车轨迹分析

一、数据清洗

学习目标

1.如何使用 SparkSQL 读取 CSV 文件

2.如何使用正则表达式清洗掉多余字符串。

将出租车轨迹数据规整化,清洗掉多余的字符串,并使用 DataFrame.show() 打印输出。

清洗掉红框里面的 $ 、@ 字符,由于这两字符出现的次数没有规律,所以需要使用正则匹配。

清洗后内容如下:

import org.apache.spark.sql.SparkSession
object Step1 {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("Step1").master("local").getOrCreate()
        /**********begin**********/
        val frame = spark.read.option("header", true).option("delimiter", "\t").csv("/root/data.csv")
        frame.createTempView("data")
        spark.udf.register("cleanData", (x: String) => {
            x.replaceAll("\\@+", "").replaceAll("\\$+", "")
        })
        spark.sql(
        """
            |select cleanData(TRIP_ID) as TRIP_ID,cleanData(CALL_TYPE) as CALL_TYPE,cleanData(ORIGIN_CALL) as ORIGIN_CALL,
            |cleanData(TAXI_ID) as TAXI_ID,cleanData(ORIGIN_STAND) as ORIGIN_STAND ,cleanData(TIMESTAMP) as TIMESTAMP,
            |cleanData(POLYLINE) as POLYLINE
            |from data
        """.stripMargin).show()
        /**********end**********/
        spark.stop()
    }
}

二、数据分析

使用SparkSQL完成数据分析

import com.alibaba.fastjson.JSON
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
object Step2 {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("Step1").master("local").getOrCreate()
        spark.sparkContext.setLogLevel("error")
        /**********begin**********/
        val frame = spark.read.option("header", true).option("delimiter", "\t").csv("/root/data2.csv")
        frame.createTempView("data")
        //1.将时间戳转换成时间
        spark.sql("select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,POLYLINE, from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME from data").createTempView("data2")
        spark.sql("select * from data2").show()
        //2.将POLYLINE字段,分离出startLocation,endLocation 两个字段
        spark.udf.register("startLocation", (x: String) => {
        val arr = JSON.parseArray(x)
        arr.get(0).toString
        })
        spark.udf.register("endLocation", (x: String) => {
        val arr = JSON.parseArray(x)
        arr.get(arr.size() - 1).toString
        })
        spark.sql(
        """
            |select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,POLYLINE,TIME,startLocation(POLYLINE) as startLocation,endLocation(POLYLINE) as endLocation  from data2
        """.stripMargin).createTempView("data3")
        spark.sql("select * from data3").show()
        //3.计算时长,行程的总行程时间定义为(点数-1)×15秒。
        // 例如,POLYLINE中具有101个数据点的行程具有(101-1)* 15 = 1500秒的长度
        spark.udf.register("timeLen", (x: String) => {
        (JSON.parseArray(x).size() - 1) * 15
        })
        spark.sql(
        """
            |select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,POLYLINE,TIME,startLocation(POLYLINE) as startLocation,endLocation(POLYLINE) as endLocation,timeLen(POLYLINE) as  timeLen  from data3
        """.stripMargin).createTempView("data4")
        spark.sql("select * from data4").show()
        //4.统计每天各种呼叫类型的数量并以CALL_TYPE,TIME升序排序
        spark.sql(
        """
            |select CALL_TYPE ,TIME,count(1) as num from data4 group by TIME,CALL_TYPE order by CALL_TYPE,TIME
        """.stripMargin).show()
        /**********end**********/
        spark.stop()
    }
}

三、出租车轨迹图表展示

使用springboot + echarts 编写一个展示的图表程序:

对此你需要了解可视化分为前后端,也就是我们的MVC设计模式:

M层:

MainMapper

package net.educoder.app.mapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.util.List;
@Mapper
public interface MainMapper {
    //参考
    @Select("SELECT _num from taxi_trend WHERE _taxi = #{type} ORDER BY _time")
    List<Integer> findTaxiTrendNumByType(String type);
    /**********begin**********/
    @Select("SELECT _time FROM taxi_trend GROUP BY _time ")
    List<String> findTaxiTrendTime();
    @Select("select _taxi from taxi_trend group by _taxi")
    List<String> findTaxiType();
    @Select("SELECT _type from taxi_servicenum GROUP BY _type")
    List<String> findTaxiPlatform();
    @Select("SELECT _serviceType FROM taxi_servicenum GROUP BY _serviceType ORDER BY _serviceType")
    List<String> findAllTaxiService();
    @Select("SELECT _num FROM taxi_servicenum WHERE _type = #{Platform} order BY _serviceType ")
    List<Integer> findServiceNumByPlatform(String Platform);    
    /**********end**********/
}

V层:

index.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title></title>
</head>
<script src="echarts.min.js"></script>
<script src="jquery-3.1.1.min.js"></script>
<body>
<div id="main" style="width: 1000px;height:600px;"></div>
<div id="main2" style="width: 1000px;height:600px;"></div>
</body>
<script>
    var myChart = echarts.init(document.getElementById('main'));
    $.ajax({
        /**********begin**********/
        url: "/Line_Chart",
        /**********end**********/
        success: function (data) {
            option = {
                title: {
                    text: '各出租车平台年使用率'
                },
                tooltip: {
                    trigger: 'axis'
                },
                legend: {
                    data: ['A', 'B', 'C']
                },
                grid: {
                    left: '3%',
                    right: '4%',
                    bottom: '3%',
                    containLabel: true
                },
                toolbox: {
                    feature: {
                        saveAsImage: {}
                    }
                },
                xAxis: {
                    type: 'category',
                    boundaryGap: false,
                    /**********begin**********/
                    data:data.timeList
                    /**********end**********/
                },
                yAxis: {
                    type: 'value'
                },
                /**********begin**********/
                series:data.resultData
                /**********end**********/
            };
            myChart.setOption(option);
        },
        dataType: "json",
        type: "post"
    });
    var myChart2 = echarts.init(document.getElementById('main2'));
    $.ajax({
        /**********begin**********/
        url:"/Radar_Chart",
        /**********end**********/
        success:function (data) {
            option = {
                title: {
                    text: '各平台各服务数量'
                },
                tooltip: {},
                legend: {
                    /**********begin**********/
                    data:data.taxiPlatform
                    /**********end**********/
                },
                radar: {
                    name: {
                        textStyle: {
                            color: '#fff',
                            backgroundColor: '#999',
                            borderRadius: 3,
                            padding: [3, 5]
                        }
                    },
                    /**********begin**********/
                    indicator:data.indicator
                    /**********end**********/
                },
                series: [{
                    type: 'radar',
                    /**********begin**********/
                    data:data.resultData
                    /**********end**********/
                }]
            };
            myChart2.setOption(option);
        },
        dataType:"json",
        type:"post"
    });
</script>
</html>
• 1

C层:

MainController :

package net.educoder.app.controller;
import net.educoder.app.entity.Chart_Line;
import net.educoder.app.entity.Chart_Radar;
import net.educoder.app.mapper.MainMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Controller
public class MainController {
    /**********begin**********/
    @Autowired
    MainMapper mainMapper;
    @RequestMapping("/index")
    public String index() {
        return "index";
    }
    @RequestMapping("/Line_Chart")
    @ResponseBody
    public Map<String, Object> Line_Chart() {
        List<String> taxiType = mainMapper.findTaxiType();
        Map<String, Object> map = new HashMap<>();
        List<Chart_Line> resultList = new ArrayList<>();
        for (String s : taxiType) {
            List<Integer> list = mainMapper.findTaxiTrendNumByType(s);
            Chart_Line chart_line = new Chart_Line(s, "line", list);
            resultList.add(chart_line);
        }
        List<String> taxiTrendTimeList = mainMapper.findTaxiTrendTime();
        map.put("timeList", taxiTrendTimeList);
        map.put("resultData", resultList);
        return map;
    }
    @RequestMapping("/Radar_Chart")
    @ResponseBody
    public Map<String, Object> Radar_Chart() {
        Map<String, Object> map = new HashMap<>();
        List<String> allTaxiService = mainMapper.findAllTaxiService();
        List<HashMap<String, Object>> indicatorList = new ArrayList<>();
        for (String s : allTaxiService) {
            HashMap<String, Object> stringIntegerHashMap = new HashMap<>();
            stringIntegerHashMap.put("name", s);
            stringIntegerHashMap.put("max", 100);
            indicatorList.add(stringIntegerHashMap);
        }
        List<String> taxiPlatform = mainMapper.findTaxiPlatform();
        List<Chart_Radar> resultList = new ArrayList<>();
        for (String s : taxiPlatform) {
            List<Integer> serviceNumByPlatform = mainMapper.findServiceNumByPlatform(s);
            Chart_Radar chart_radar = new Chart_Radar(s, serviceNumByPlatform);
            resultList.add(chart_radar);
        }
        map.put("resultData", resultList);
        map.put("legendData", taxiPlatform);
        map.put("indicator", indicatorList);
        return map;
    }
    /**********end**********/
}
相关文章
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
59 0
|
21天前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
110 2
|
1月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
39 1
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
55 0
|
1月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
29 0
|
1月前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
43 0
|
1月前
|
存储 SQL 分布式计算
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
38 0
|
16天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
48 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
82 0