大数据进阶之路——Spark SQL日志分析

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 文章目录基本方案数据处理流程数据清洗二次清洗视频访问按照省份按照流量优化数据可视化echarts

基本方案

用户行为日志:用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击…)

用户行为轨迹、流量日志


日志数据内容:


1)访问的系统属性: 操作系统、浏览器等等

2)访问特征:点击的url、从哪个url跳转过来的(referer)、页面上的停留时间等

3)访问信息:session_id、访问ip(访问城市)等

2013-05-19 13:00:00     http://www.taobao.com/17/?tracker_u=1624169&type=1      B58W48U4WKZCJ5D1T3Z9ZY88RU7QA7B1        http://hao.360.cn/      1.196.34.243 

数据处理流程

1) 数据采集

Flume: web日志写入到HDFS


2)数据清洗

脏数据

Spark、Hive、MapReduce 或者是其他的一些分布式计算框架

清洗完之后的数据可以存放在HDFS(Hive/Spark SQL)


3)数据处理

按照我们的需要进行相应业务的统计和分析

Spark、Hive、MapReduce 或者是其他的一些分布式计算框架


4)处理结果入库

结果可以存放到RDBMS、NoSQL


5)数据的可视化

通过图形化展示的方式展现出来:饼图、柱状图、地图、折线图

ECharts、HUE、Zeppelin

image.png

数据清洗

首先通过debug 找到分割后各个字段的对应的

image.png

报错

java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.


执行第一步数据清洗时候,数据能打印出来,但是不能写入本地文件,这是因为本地没有hadoop伪分布式系统


装一个插件即可


https://hiszm.lanzous.com/iWyqmhrgk0f


下载上述插件,然后,新建目录并且放入到目录里面

C:\Data\hadoop\bin


然后再系统环境变量添加

HADOOP_HOME

C:\Data\hadoop

package org.sparksql
import org.apache.spark.sql.SparkSession
object SparkFormatApp {
  def main(args: Array[String]): Unit = {
    //SparkSession是spark的入口类
    val spark = SparkSession.builder().appName("SparkFormatApp")
                .master("local[2]").getOrCreate()
    val access = spark.sparkContext.textFile("10000_access.log")
    //access.take(10).foreach(println)
    access.map(line=>{
      val splits = line.split(" ")
      val ip = splits(0)
      val time = splits(3) + " " + splits(4)
      val traffic = splits(9)
      val url =  splits(11).replace("\"","")
     //(ip,DateUtils.parse(time),traffic,traffic,url)
      DateUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip
    }).saveAsTextFile("output")
    //.take(10).foreach(println)
    //.saveAsTextFile("output")
    spark.stop()
  }
}

image.png

一般的日志处理方式,我们是需要进行分区的,

按照日志中的访问时间进行相应的分区,比如:d,h,m5(每5分钟一个分区)


二次清洗

输入:访问时间、访问URL、耗费的流量、访问IP地址信息

输出:URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天

package org.sparksql
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
//访问日志工具转换类
object AccessConvertUtils {
  val struct=StructType(
    Array(
      StructField("url",StringType),
      StructField("cmsType",StringType),
      StructField("cmsId",LongType),
      StructField("traffic",LongType),
      StructField("ip",StringType),
      StructField("city",StringType),
      StructField("time",StringType),
      StructField("day",StringType)
    )
  )
//根据输入的每一行信息转化成输出的样式
  def parseLog(log:String)={
    try{
      val splits=log.split("\t")
      val url =splits(1)
      val traffic = splits(2).toLong
      val ip = splits(3)
      val domain="http://www.imooc.com/"
      val cms=url.substring(url.indexOf(domain) + domain.length)
      val cmsTypeId = cms.split("/")
      var cmsType = ""
      var cmsId = 0l
      if(cmsTypeId.length > 1){
        cmsType = cmsTypeId(0)
        cmsId = cmsTypeId(1).toLong
      }
      val city = IpUtils.getCity(ip)
      val time = splits(0)
      val day =  time.substring(0,10).replaceAll("-","")
      Row(url,cmsType,cmsId,traffic,ip,city,time,day)
    }catch {
      case e : Exception => Row(0)
    }
  }
}
  • IP=>省份

使用github上已有的开源项目

1)git clone https://github.com/wzhe06/ipdatabase.git

2)编译下载的项目:mvn clean package -DskipTests

image.png

3)安装jar包到自己的maven仓库

mvn install:install-file -Dfile=C:\Data\ipdatabase\target\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar

image.png

image.png

  1. 拷贝相关文件不然会报错
  2. image.png
java.io.FileNotFoundException: file:/Users/rocky/maven_repos/com/ggstar/ipdatabase/1.0/ipdatabase-1.0.jar!/ipRegion.xlsx (No such file or directory)
  1. 测试
  2. image.png
package org.sparksql
import org.apache.spark.sql.SparkSession
object SparkCleanApp {
  def main(args: Array[String]): Unit = {
    //SparkSession是spark的入口类
    val spark = SparkSession.builder().appName("SparkFormatApp")
      .master("local[2]").getOrCreate()
    val accessRDD = spark.sparkContext.textFile("access.log")
    //accessRDD.take(10).foreach(println)
    val accessDF = spark.createDataFrame(accessRDD.map(x=>AccessConvertUtils.parseLog(x)),AccessConvertUtils.struct)
    accessDF.printSchema()
    accessDF.show()
    spark.stop
  }
}
root
 |-- url: string (nullable = true)
 |-- cmsType: string (nullable = true)
 |-- cmsId: long (nullable = true)
 |-- traffic: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- city: string (nullable = true)
 |-- time: string (nullable = true)
 |-- day: string (nullable = true)
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+
|                 url|cmsType|cmsId|traffic|             ip|city|               time|     day|
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+
|http://www.imooc....|  video| 4500|    304|  218.75.35.226| 浙江省|2017-05-11 14:09:14|20170511|
|http://www.imooc....|  video|14623|     69| 202.96.134.133| 广东省|2017-05-11 15:25:05|20170511|
|http://www.imooc....|article|17894|    115| 202.96.134.133| 广东省|2017-05-11 07:50:01|20170511|
|http://www.imooc....|article|17896|    804|  218.75.35.226| 浙江省|2017-05-11 02:46:43|20170511|
|http://www.imooc....|article|17893|    893|222.129.235.182| 北京市|2017-05-11 09:30:25|20170511|
|http://www.imooc....|article|17891|    407|  218.75.35.226| 浙江省|2017-05-11 08:07:35|20170511|
|http://www.imooc....|article|17897|     78| 202.96.134.133| 广东省|2017-05-11 19:08:13|20170511|
|http://www.imooc....|article|17894|    658|222.129.235.182| 北京市|2017-05-11 04:18:47|20170511|
|http://www.imooc....|article|17893|    161|   58.32.19.255| 上海市|2017-05-11 01:25:21|20170511|
|http://www.imooc....|article|17895|    701|    218.22.9.56| 安徽省|2017-05-11 13:37:22|20170511|
|http://www.imooc....|article|17892|    986|  218.75.35.226| 浙江省|2017-05-11 05:53:47|20170511|
|http://www.imooc....|  video|14540|    987|   58.32.19.255| 上海市|2017-05-11 18:44:56|20170511|
|http://www.imooc....|article|17892|    610|  218.75.35.226| 浙江省|2017-05-11 17:48:51|20170511|
|http://www.imooc....|article|17893|      0|    218.22.9.56| 安徽省|2017-05-11 16:20:03|20170511|
|http://www.imooc....|article|17891|    262|   58.32.19.255| 上海市|2017-05-11 00:38:01|20170511|
|http://www.imooc....|  video| 4600|    465|  218.75.35.226| 浙江省|2017-05-11 17:38:16|20170511|
|http://www.imooc....|  video| 4600|    833|222.129.235.182| 北京市|2017-05-11 07:11:36|20170511|
|http://www.imooc....|article|17895|    320|222.129.235.182| 北京市|2017-05-11 19:25:04|20170511|
|http://www.imooc....|article|17898|    460| 202.96.134.133| 广东省|2017-05-11 15:14:28|20170511|
|http://www.imooc....|article|17899|    389|222.129.235.182| 北京市|2017-05-11 02:43:15|20170511|
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+

调优点:

  1. 控制文件输出的大小: coalesce
  2. 分区字段的数据类型调整:spark.sql.sources.partitionColumnTypeInference.enabled
  3. 批量插入数据库数据,提交使用batch操作
package org.sparksql
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
object TopNApp {
  //最受欢迎
  def videoAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
    import spark.implicits._
    val videoTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
      .groupBy("day","cmsId").agg(count("cmsId")
      .as("times")).orderBy($"times".desc)
    videoTopNDF.show()
    accessDF.createOrReplaceTempView("access_log")
    val videoTopNDF1 = spark.sql("select day,cmsId,count(1) as times from access_log where day='20170511' and cmsType = 'video' group by day,cmsId order by times desc")
    videoTopNDF1.show()
  }
  def main(args: Array[String]): Unit = {
    //SparkSession是spark的入口类
    val spark = SparkSession.builder().appName("SparkFormatApp")
      .config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
      .master("local[2]").getOrCreate()
    val accessDF= spark.read.format("parquet").load("output2/")
    accessDF.printSchema()
    accessDF.show(false)
    videoAccessTopN(spark,accessDF)
    spark.stop()
  }
}
+--------+-----+------+
|     day|cmsId| times|
+--------+-----+------+
|20170511|14540|111027|
|20170511| 4000| 55734|
|20170511|14704| 55701|
|20170511|14390| 55683|
|20170511|14623| 55621|
|20170511| 4600| 55501|
|20170511| 4500| 55366|
|20170511|14322| 55102|
+--------+-----+------+

视频访问

package org.sparksql
import java.sql.{Connection, DriverManager, PreparedStatement}
object MySqlUtils {
  def getConnection() ={
//    if (!conn.isClosed) System.out.println("已连接上数据库!")
//    else System.out.println("没有连接到数据库!")
    DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_user?user=root&password=root")
  }
//释放数据库连接资源
  def release(connection:Connection,pstmt:PreparedStatement): Unit ={
    try{
      if(pstmt != null){
        pstmt.close()
      }
    }catch{
      case e:Exception => e.printStackTrace()
    }finally {
      if(connection!=null){
        connection.close()
      }
    }
  }
  def main(args: Array[String]): Unit = {
    println(getConnection())
  }
}

image.png

create table day_video_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
times bigint(10) not null,
primary key (day, cms_id)
);
package org.sparksql
import java.sql.{Connection, PreparedStatement}
import scala.collection.mutable.ListBuffer
object StatisticsDAO {
  def insertDayVideoAccessTopN(list:ListBuffer[DayVideoAccessStatistics]): Unit ={
    var connection:Connection = null
    var pstmt:PreparedStatement = null
    try{
      connection= MySqlUtils.getConnection()
      //取消自动提交
      connection.setAutoCommit(false)
      val sql = "insert into day_video_access_topn_stat(day,cms_id,times) value (? ,? ,? )"
      pstmt = connection.prepareStatement(sql)
      for(i<-list){
        pstmt.setString(1,i.day)
        pstmt.setLong(2,i.cmsId)
        pstmt.setLong(3,i.times)
        pstmt.addBatch()
      }
        pstmt.executeBatch()//批量处理
      //手动提交
      connection.commit()
    }catch {
      case e:Exception=>e.printStackTrace()
    }finally {
      MySqlUtils.release(connection,pstmt)
    }
  }
}
package org.sparksql
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
object TopNApp {
  //最受欢迎
  def videoAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
    import spark.implicits._
    val videoTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
      .groupBy("day","cmsId").agg(count("cmsId")
      .as("times")).orderBy($"times".desc)
    videoTopNDF.show()
      try{
        videoTopNDF.foreachPartition(partitionOfRecords =>{
          val list = new ListBuffer[DayVideoAccessStatistics]
          partitionOfRecords.foreach(info =>{
            val day = info.getAs[String]("day")
            val cmsId = info.getAs[Long]("cmsId")
            val times = info.getAs[Long]("times")
            list.append(DayVideoAccessStatistics(day,cmsId,times))
          })
          StatisticsDAO.insertDayVideoAccessTopN(list)
        })
      }catch {
        case e:Exception =>e.printStackTrace()
      }
  }

image.png

java.sql.SQLException: No value specified for parameter 2
• 1

检查插入参数和类型是否一直

按照省份

create table day_video_city_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
city varchar(20) not null,
times bigint(10) not null,
times_rank int not null,
primary key (day, cms_id, city)
);
  def cityAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
    import spark.implicits._
    val cityTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
      .groupBy("day","city","cmsId").agg(count("cmsId")
      .as("times")).orderBy($"times".desc)
    cityTopNDF.show()
   val top3DF =  cityTopNDF.select(
      cityTopNDF("day"),
      cityTopNDF("city"),
      cityTopNDF("cmsId"),
      cityTopNDF("times"),
      row_number().over(Window.partitionBy(cityTopNDF("city"))
        .orderBy(cityTopNDF("times").desc)).as("times_rank")
    ).filter("times_rank <=3")//.show()
    try{
      top3DF.foreachPartition(partitionOfRecords =>{
        val list = new ListBuffer[DayCityAccessStatistics]
        partitionOfRecords.foreach(info =>{
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val city = info.getAs[String]("city")
          val times = info.getAs[Long]("times")
          val timesRank = info.getAs[Int]("times_rank")
          list.append(DayCityAccessStatistics(day,cmsId,city,times,timesRank))
        })
        StatisticsDAO.insertCityVideoAccessTopN(list)
      })
    }catch {
      case e:Exception =>e.printStackTrace()
    }
  }

image.png

按照流量

create table day_video_traffics_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day, cms_id)
);
  def trafficAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
    import spark.implicits._
    val trafficTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
      .groupBy("day","cmsId").agg(sum("traffic").as("traffics"))
     .orderBy($"traffics".desc)
    trafficTopNDF.show()
    try{
      trafficTopNDF.foreachPartition(partitionOfRecords =>{
        val list = new ListBuffer[DayTrafficAccessStatistics]
        partitionOfRecords.foreach(info =>{
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val traffics = info.getAs[Long]("traffics")
          list.append(DayTrafficAccessStatistics(day,cmsId,traffics))
        })
        StatisticsDAO.insertTrafficVideoAccessTopN(list)
      })
    }catch {
      case e:Exception =>e.printStackTrace()
    }
  }

优化

  • 每次更新删除前面的数据
  def deleteData(day:String)={
    val tables= Array("day_video_traffics_topn_stat","day_video_city_access_topn_stat","day_video_access_topn_stat")
    var connection:Connection = null
    var pstmt:PreparedStatement = null
    try{
      connection = MySqlUtils.getConnection()
      for(table<- tables){
        val deleteSQL = s"delete from $table where day = ?"
        pstmt = connection.prepareStatement(deleteSQL)
        pstmt.setString(1,day)
        pstmt.executeUpdate()
      }
    }catch {
      case e:Exception => e.printStackTrace()
    }finally {
      MySqlUtils.release(connection, pstmt)
    }
  }

数据可视化

数据可视化:一副图片最伟大的价值莫过于它能够使得我们实际看到的比我们期望看到的内容更加丰富

常见的可视化框架

1)echarts

2)highcharts

3)D3.js

4)HUE

5)Zeppelin

echarts

image.png

package org.sparkSQL.Utils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class MySqlUtils {
    private  static final String USERNAME = "root";
    private  static final String PASSWORD = "root";
    private  static final String DRIVERCLASS = "com.mysql.jdbc.Driver";
    private  static final String URL = "jdbc:mysql://localhost:3306/imooc_user";
    public static Connection getConnection(){
        Connection connection =  null;
        try {
            Class.forName(DRIVERCLASS);
            connection  = DriverManager.getConnection(URL,USERNAME,PASSWORD);
        }catch (Exception e){
            e.printStackTrace();
        }
        return connection;
    }
    public static void  release(Connection connection, PreparedStatement pstmt , ResultSet rs){
        if(rs != null){
            try{
                rs.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        if(connection != null){
            try{
                connection.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        if(pstmt != null){
            try{
                pstmt.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        System.out.println(getConnection());
    }
}
<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>ECharts</title>
    <!-- 引入 echarts.js -->
    <script src="https://cdn.bootcss.com/echarts/3.7.1/echarts.min.js"></script>
    <script src="https://s3.pstatp.com/cdn/expire-1-M/jquery/3.1.1/jquery.min.js"></script>
</head>
<body>
<!-- 为ECharts准备一个具备大小(宽高)的Dom -->
<div id="main" style="width: 600px;height:400px;"></div>
<script type="text/javascript">
    // 基于准备好的dom,初始化echarts实例
    var myChart = echarts.init(document.getElementById('main'));
    // 指定图表的配置项和数据
    var option = {
        title: {
            text: '最受欢迎的TOPN',
            subtext: '测试',
            left: 'center'
        },
        tooltip: {
            trigger: 'item',
            formatter: '{a} <br/>{b} : {c} ({d}%)'
        },
        legend: {
            orient: 'vertical',
            left: 'left',
            data: ['直接访问', '邮件营销', '联盟广告', '视频广告', '搜索引擎']
        },
        series: [
            {
                name: '访问次数',
                type: 'pie',
                radius: '55%',
                center: ['50%', '60%'],
                data: (function(){
                    var courses= [];
                    $.ajax({
                        type:"GET",
                        url:"stat?day=20170511",
                        dataType:'json',
                        async:false,
                        success:function (result){
                            for(var i=0;i<result.length;i++){
                                courses.push({"value":result[i].value,"name":result[i].name})
                            }
                        }
                    })
                    return courses;
                })(),
                emphasis: {
                    itemStyle: {
                        shadowBlur: 10,
                        shadowOffsetX: 0,
                        shadowColor: 'rgba(0, 0, 0, 0.5)'
                    }
                }
            }
        ]
    };
    // 使用刚指定的配置项和数据显示图表。
    myChart.setOption(option);
</script>
</body>
</html>

image.png

[hadoop@hadoop001 software]$ tar -zxvf zeppelin-0.7.1-bin-all.tgz -C ~/app/

image.png

[hadoop@hadoop001 bin]$ ./zeppelin-daemon.sh start
Log dir doesn't exist, create /home/hadoop/app/zeppelin-0.7.1-bin-all/logs
Pid dir doesn't exist, create /home/hadoop/app/zeppelin-0.7.1-bin-all/run
Zeppelin start       

image.png



相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
3月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
221 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
17天前
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
|
3月前
|
SQL 算法 大数据
为什么大数据平台会回归SQL
在大数据领域,尽管非结构化数据占据了大数据平台80%以上的存储空间,结构化数据分析依然是核心任务。SQL因其广泛的应用基础和易于上手的特点成为大数据处理的主要语言,各大厂商纷纷支持SQL以提高市场竞争力。然而,SQL在处理复杂计算时表现出的性能和开发效率低下问题日益凸显,如难以充分利用现代硬件能力、复杂SQL优化困难等。为了解决这些问题,出现了像SPL这样的开源计算引擎,它通过提供更高效的开发体验和计算性能,以及对多种数据源的支持,为大数据处理带来了新的解决方案。
|
3月前
|
SQL 存储 算法
比 SQL 快出数量级的大数据计算技术
SQL 是大数据计算中最常用的工具,但在实际应用中,SQL 经常跑得很慢,浪费大量硬件资源。例如,某银行的反洗钱计算在 11 节点的 Vertica 集群上跑了 1.5 小时,而用 SPL 重写后,单机只需 26 秒。类似地,电商漏斗运算和时空碰撞任务在使用 SPL 后,性能也大幅提升。这是因为 SQL 无法写出低复杂度的算法,而 SPL 提供了更强大的数据类型和基础运算,能够实现高效计算。
|
3月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
140 6
|
3月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
164 2
|
3月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
155 1
|
3月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
3月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
92 1
|
4月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
100 1