Spark项目实战:飞机延误预测项目

简介: Spark项目实战:飞机延误预测项目

0x00 教程内容


  1. 数据准备
  2. 工程实现
  3. 项目讲解
  4. 项目升级

PS:后期还会补充:

1、进行Spark、Scala版本升级

2、继续优化数据,提高预测效果

3、代码优化,代码里有小部分测试代码,应该优化~


0x01 数据准备


1. 下载数据

a. wget参考命令:

wget http://stat-computing.org/dataexpo/2009/2007.csv.bz2 -O /tmp/flights_2007.csv.bz2
wget http://stat-computing.org/dataexpo/2009/2008.csv.bz2 -O /tmp/flights_2008.csv.bz2


PS:

上述链接已失效,请联系博主私下获取。

或者关注公众号,回复:飞机延误预测。


b. 请自行修改名称,如果不是wget方式下载:


flights_2007.csv.bz2

flights_2008.csv.bz2


2. 上传数据到HDFS

a. 上传到HDFS的/tmp/airflightsdelays/路径下


3. 数据字段详细说明

a. 上传到HDFS的/tmp/airflightsdelays/路径下


image.png


说明:可以自己解压一下数据,查看一下前面几条数据(数据信息有待完善及校验!)。


0x02 工程实现


1. 依赖准备
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.shaonaiyi</groupId>
    <artifactId>sparkMLlib</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <spark.version>1.6.3</spark.version>
        <scala.version>2.10.5</scala.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>${scala.version}</version>
        </dependency>
    </dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.3.1</version>
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <scalaVersion>${scala.version}</scalaVersion>
            </configuration>
        </plugin>
        <!--<plugin>-->
            <!--<groupId>org.scala-tools</groupId>-->
            <!--<artifactId>maven-scala-plugin</artifactId>-->
            <!--<executions>-->
                <!--<execution>-->
                    <!--<goals>-->
                        <!--<goal>compile</goal>-->
                        <!--<goal>testCompile</goal>-->
                    <!--</goals>-->
                <!--</execution>-->
            <!--</executions>-->
            <!--<configuration>-->
                <!--<scalaVersion>${scala.version}</scalaVersion>-->
                <!--<args>-->
                    <!--<arg>-target:jvm-1.7</arg>-->
                <!--</args>-->
            <!--</configuration>-->
        <!--</plugin>-->
    </plugins>
</build>
</project>


2. 上传一份数据到本地

a. 项目根路径的/tmp/airflightsdelays/


image.png


3. 数据处理代码实现
package com.shaonaiyi
import org.apache.spark.rdd._
import scala.collection.JavaConverters._
import au.com.bytecode.opencsv.CSVReader
import java.io._
import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, SVMWithSGD}
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.{DecisionTree, RandomForest}
import org.apache.spark.mllib.tree.configuration.Strategy
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.format.DateTimeFormat
import org.joda.time.DateTime
import org.joda.time.Days
/**
  * @Auther: 邵奈一
  * @Date: 2019/05/06 下午 3:08
  * @Description: 飞机延误预测项目
  */
object DelayRecProject {
  def main(args: Array[String]): Unit = {
    //打包到集群时,注释掉本地测试代码
    val conf = new SparkConf().setMaster("local[5]").setAppName("DelayRecProject")
//    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    //设置log打印级别
    sc.setLogLevel("WARN")
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    //阶段一:数据预处理
    //打包到集群时,注释掉本地测试代码
    val data_2007tmp = prepFlightDelays("tmp/airflightsdelays/flights_2007.csv.bz2",sc)
//    val data_2007tmp = prepFlightDelays("/tmp/airflightsdelays/flights_2007.csv.bz2",sc)
    val data_2007 = data_2007tmp.map(rec => rec.gen_features._2)
    //打包到集群时,注释掉本地测试代码
    val data_2008 = prepFlightDelays("tmp/airflightsdelays/flights_2008.csv.bz2",sc).map(rec => rec.gen_features._2)
//    val data_2008 = prepFlightDelays("/tmp/airflightsdelays/flights_2008.csv.bz2",sc).map(rec => rec.gen_features._2)
    data_2007tmp.toDF().registerTempTable("data_2007tmp")
    data_2007.take(5).map(x => x mkString ",").foreach(println)
    //阶段二:使用Spark和ML-Lib建模
    // Prepare training set
    val parsedTrainData = data_2007.map(parseData)
    parsedTrainData.cache
    val scaler = new StandardScaler(withMean = true, withStd = true).fit(parsedTrainData.map(x => x.features))
    val scaledTrainData = parsedTrainData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))
    scaledTrainData.cache
    // Prepare test/validation set
    val parsedTestData = data_2008.map(parseData)
    parsedTestData.cache
    val scaledTestData = parsedTestData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))
    scaledTestData.cache
    scaledTrainData.take(3).map(x => (x.label, x.features)).foreach(println)
    //阶段三:评估分类指标
    // Function to compute evaluation metrics
    def eval_metrics(labelsAndPreds: RDD[(Double, Double)]) : Tuple2[Array[Double], Array[Double]] = {
      val tp = labelsAndPreds.filter(r => r._1==1 && r._2==1).count.toDouble
      val tn = labelsAndPreds.filter(r => r._1==0 && r._2==0).count.toDouble
      val fp = labelsAndPreds.filter(r => r._1==1 && r._2==0).count.toDouble
      val fn = labelsAndPreds.filter(r => r._1==0 && r._2==1).count.toDouble
      val precision = tp / (tp+fp)
      val recall = tp / (tp+fn)
      val F_measure = 2*precision*recall / (precision+recall)
      val accuracy = (tp+tn) / (tp+tn+fp+fn)
      new Tuple2(Array(tp, tn, fp, fn), Array(precision, recall, F_measure, accuracy))
    }
    class Metrics(labelsAndPreds: RDD[(Double, Double)]) extends java.io.Serializable {
      private def filterCount(lftBnd:Int,rtBnd:Int):Double = labelsAndPreds
        .map(x => (x._1.toInt, x._2.toInt))
        .filter(_ == (lftBnd,rtBnd)).count()
      lazy val tp = filterCount(1,1)  // true positives
      lazy val tn = filterCount(0,0)  // true negatives
      lazy val fp = filterCount(0,1)  // false positives
      lazy val fn = filterCount(1,0)  // false negatives
      lazy val precision = tp / (tp+fp)
      lazy val recall = tp / (tp+fn)
      lazy val F1 = 2*precision*recall / (precision+recall)
      lazy val accuracy = (tp+tn) / (tp+tn+fp+fn)
    }
    //阶段四:构建回归模型
    // Build the Logistic Regression model
    val model_lr = LogisticRegressionWithSGD.train(scaledTrainData, numIterations=100)
    // Predict
    val labelsAndPreds_lr = scaledTestData.map { point =>
      val pred = model_lr.predict(point.features)
      (pred, point.label)
    }
    val m_lr = eval_metrics(labelsAndPreds_lr)._2
    println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_lr(0), m_lr(1), m_lr(2), m_lr(3)))
    println(model_lr.weights)
    //阶段五:构建向量机算法模型
    // Build the SVM model
    val svmAlg = new SVMWithSGD()
    svmAlg.optimizer.setNumIterations(100)
      .setRegParam(1.0)
      .setStepSize(1.0)
    val model_svm = svmAlg.run(scaledTrainData)
    // Predict
    val labelsAndPreds_svm = scaledTestData.map { point =>
      val pred = model_svm.predict(point.features)
      (pred, point.label)
    }
    val m_svm = eval_metrics(labelsAndPreds_svm)._2
    println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_svm(0), m_svm(1), m_svm(2), m_svm(3)))
    //阶段六:构建决策树算法模型
    // Build the Decision Tree model
    val numClasses = 2
    val categoricalFeaturesInfo = Map[Int, Int]()
    val impurity = "gini"
    val maxDepth = 10
    val maxBins = 100
    val model_dt = DecisionTree.trainClassifier(parsedTrainData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins)
    // Predict
    val labelsAndPreds_dt = parsedTestData.map { point =>
      val pred = model_dt.predict(point.features)
      (pred, point.label)
    }
    val m_dt = eval_metrics(labelsAndPreds_dt)._2
    println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_dt(0), m_dt(1), m_dt(2), m_dt(3)))
    //阶段七:构建随机森林算法模型
    val treeStrategy = Strategy.defaultStrategy("Classification")
    val numTrees = 100
    val featureSubsetStrategy = "auto" // Let the algorithm choose
    val model_rf = RandomForest.trainClassifier(parsedTrainData, treeStrategy, numTrees, featureSubsetStrategy, seed = 123)
    // Predict
    val labelsAndPreds_rf = parsedTestData.map { point =>
      val pred = model_rf.predict(point.features)
      (point.label, pred)
    }
    val m_rf = new Metrics(labelsAndPreds_rf)
    println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f"
      .format(m_rf.precision, m_rf.recall, m_rf.F1, m_rf.accuracy))
  }
  case class DelayRec(year: String,
                      month: String,
                      dayOfMonth: String,
                      dayOfWeek: String,
                      crsDepTime: String,
                      depDelay: String,
                      origin: String,
                      distance: String,
                      cancelled: String) {
    val holidays = List("01/01/2007", "01/15/2007", "02/19/2007", "05/28/2007", "06/07/2007", "07/04/2007",
      "09/03/2007", "10/08/2007" ,"11/11/2007", "11/22/2007", "12/25/2007",
      "01/01/2008", "01/21/2008", "02/18/2008", "05/22/2008", "05/26/2008", "07/04/2008",
      "09/01/2008", "10/13/2008" ,"11/11/2008", "11/27/2008", "12/25/2008")
    def gen_features: (String, Array[Double]) = {
      val values = Array(
        depDelay.toDouble,
        month.toDouble,
        dayOfMonth.toDouble,
        dayOfWeek.toDouble,
        get_hour(crsDepTime).toDouble,
        distance.toDouble,
        days_from_nearest_holiday(year.toInt, month.toInt, dayOfMonth.toInt)
      )
      new Tuple2(to_date(year.toInt, month.toInt, dayOfMonth.toInt), values)
    }
    def get_hour(depTime: String) : String = "%04d".format(depTime.toInt).take(2)
    def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day)
    def days_from_nearest_holiday(year:Int, month:Int, day:Int): Int = {
      val sampleDate = new DateTime(year, month, day, 0, 0)
      holidays.foldLeft(3000) { (r, c) =>
        val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c)
        val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays)
        math.min(r, distance)
      }
    }
  }
  def prepFlightDelays(infile: String, sc: SparkContext): RDD[DelayRec] = {
    val data = sc.textFile(infile)
    data.map { line =>
      val reader = new CSVReader(new StringReader(line))
      reader.readAll().asScala.toList.map(rec => DelayRec(rec(0),rec(1),rec(2),rec(3),rec(5),rec(15),rec(16),rec(18),rec(21)))
    }.map(list => list(0))
      .filter(rec => rec.year != "Year")
      .filter(rec => rec.cancelled == "0")
      .filter(rec => rec.origin == "ORD")
  }
  def parseData(vals: Array[Double]): LabeledPoint = {
    LabeledPoint(if (vals(0)>=15) 1.0 else 0.0, Vectors.dense(vals.drop(1)))
  }
}


4. 执行效果展示

a. 执行:

image.png


0x03 项目讲解


1. 项目整体介绍

在本项目中,我们将演示如何使用Hadoop构建预测模型,这次我们将使用Apache Spark和ML-Lib。


教程通过其Scala API使用Apache Spark来生成我们的特征矩阵,并使用ML-Lib(Spark的机器学习库)来构建和评估我们的分类模型。


构建航班延误的预测模型,源数据集位于我们下载的数据,其中包括1987年至2008年间美国航班的详细信息。后期会加上天气信息丰富数据,包括每日温度(最小/最大),风速,降雪条件和降水量。


我们会建立一个监督学习模型,来预测离开奥黑尔国际机场(ORD)的航班延误情况。最后我们将使用2007年的数据来构建模型,并使用2008年的数据测试其有效性。


2. 使用Hadoop和Spark进行预处理

Apache Spark的基本数据抽象是RDD(弹性分布式数据集),它是一个容错的元素集合,可以在Hadoop集群中并行运行。


Spark的API(以Scala,Python或Java提供)支持各种转换,例如map()和flatMap(),filter(),join()等,以创建和操作RDD。有关API的完整说明,请查看Spark API编程指南:http://spark.apache.org/docs/1.6.3/programming-guide.html


与Scikit-learn演示类似,在我们的第一次迭代中,我们为每个航班生成以下功能:


month(月份):冬季应该比夏季月份延迟更多

day of month(每月的哪一天):这可能不是一个非常具有预测性的变量,但无论如何都要使用它

day of week(星期几):周末与工作日

hour of the day(一天中第几个小时):晚些时候往往有更多的延误

Distance(距离):有趣的是看这个变量是否是延迟的良好预测因子

Days from nearest holiday(距离最近的假期天数):距离最近的美国假期的天数


我们将使用Spark RDD执行相同的预处理,将原始飞行延迟数据集转换为两个特征矩阵:data_2007(我们的训练集)和data_2008(我们的测试集)。

封装航班延误记录的Case类DelayRec表示特征向量,其方法执行大部分繁重工作:


to_date()是一种将年/月/日转换为字符串的辅助方法

gen_features(row)接受一行输入并生成一个键/值元组,其中键是日期字符串(to_date的输出),值是特征值。我们将在第二次迭代中使用它来与天气数据连接。

get_hour()方法提取出发时间的2位小时部分

days_from_nearest_holiday()方法计算列表假期中任何假日提供的年/月/日的最小距离(以天为单位)。


使用DelayRec,我们的处理将执行以下步骤(在函数prepFlightDelays中):


1、我们使用Spark的SparkContext.textFile方法读取原始输入文件,从而生成RDD。

2、每行使用CSVReader解析为字段,并填充到DelayRec对象中

3、然后,我们在输入RDD上执行一系列RDD转换,以确保我们只有与未被取消并且源自ORD的航班相对应的行。

4、最后,我们使用gen_features方法生成每行的最终特征向量,作为一组双精度。


3. 使用Spark和ML-Lib建模

使用data_2007数据集(用于训练)和data_2008数据集(用于验证)作为RDD,然后用Spark的ML-Lib机器学习库构建预测模型。


ML-Lib是Spark的可扩展机器学习库,包括各种学习算法和实用程序,包括分类,回归,聚类,协同过滤,降维等。


要使用ML-Lib的机器学习算法,首先我们将我们的特征矩阵解析为LabeledPoint对象的RDD(用于训练和测试数据集)。LabeledPoint是ML-Lib对带有标签的特征向量的抽象。


我们将15分钟或更长时间的航班延误视为“延迟”,并将其标记为1.0,并在15分钟内标记为“非延迟”,并将其标记为0.0。


我们还使用ML-Lib的StandardScaler类来标准化训练和验证集的特征值。这很重要,因为ML-Lib使用随机梯度下降,如果特征向量被归一化,该随机梯度下降表现会最佳。


0x04 打包到服务器执行


注释掉相应的代码:

spark-submit --master yarn --class com.shainaiyi.DelayRecProject --name DelayRecProject /home/hadoop-sny/sparkMLlib-1.0-SNAPSHOT.jar

效果与本地是一样的!


0x05 项目升级


待补充!


0xFF 总结


通过本实验,我们综合了前面所学习到的知识,比如,构建Maven项目,IDEA编写Scala代码,打包到服务器,本地测试代码,机器学习建模操作,结果校验等等操作,认真学习一定会收获满满的,请自行查看更多知识,举一反三,学以致用。

执行的过程中,如果发现跑起来非常慢,可以考虑减少数据量,相应的准确率也一样会降低,另外,跑作业的时候,尽量注释掉不需要执行的代码,可以加速执行作业的效率。


相关文章
|
20天前
|
数据采集 分布式计算 Linux
Spark实时(数据采集)项目小知识点--sed -i命令详解及入门攻略
Spark实时(数据采集)项目小知识点--sed -i命令详解及入门攻略
67 0
|
2月前
|
机器学习/深度学习 分布式计算 大数据
【云计算与大数据技术】Spark实战项目之判别西瓜好坏(附源码和数据集)
【云计算与大数据技术】Spark实战项目之判别西瓜好坏(附源码和数据集)
30 0
|
3月前
|
分布式计算 Java Scala
配置spark,并在idea中搭建项目
配置spark,并在idea中搭建项目
40 0
|
8月前
|
存储 分布式计算 数据可视化
【大数据学习篇12】 Spark项目实战-数据可视化(三)
【大数据学习篇12】 Spark项目实战-数据可视化
221 0
|
8月前
|
分布式计算 数据可视化 Java
【大数据学习篇12】 Spark项目实战-数据可视化(二)
【大数据学习篇12】 Spark项目实战-数据可视化
248 0
|
8月前
|
SQL 分布式计算 数据可视化
【大数据学习篇12】 Spark项目实战-数据可视化(一)
【大数据学习篇12】 Spark项目实战-数据可视化
220 0
|
8月前
|
SQL JSON 分布式计算
【大数据学习篇10】Spark项目实战~网站转化率统计
【大数据学习篇10】Spark项目实战~网站转化率统计
391 0
【大数据学习篇10】Spark项目实战~网站转化率统计
|
9月前
|
机器学习/深度学习 人工智能 分布式计算
SparK项目原作解读:卷积模型的首个BERT预训练
SparK项目原作解读:卷积模型的首个BERT预训练
156 0
|
消息中间件 分布式计算 Kafka
Spark Streaming实时流处理项目实战笔记——使用KafkaSInk将Flume收集到的数据输出到Kafka
Spark Streaming实时流处理项目实战笔记——使用KafkaSInk将Flume收集到的数据输出到Kafka
|
分布式计算 流计算 Spark
Spark Streaming实时流处理项目实战笔记——实战之黑名单过滤
Spark Streaming实时流处理项目实战笔记——实战之黑名单过滤
Spark Streaming实时流处理项目实战笔记——实战之黑名单过滤

相关产品

  • 云迁移中心