Spark案例实战教程

简介: 实战代码参考:GitHub - GoAlers/Bigdata_project: 电商大数据项目-推荐系统(java和scala语言)

Spark案例实战


实战代码参考:GitHub - GoAlers/Bigdata_project: 电商大数据项目-推荐系统(java和scala语言)


搭建项目

pom参考

<dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <!-- Spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <!-- SparkSQL -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <!-- SparkSQL  ON  Hive-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <!--SparkStreaming-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.6.0</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <!-- SparkStreaming + Kafka -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.5</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.10.5</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.10.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-it</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.1.1</version>
        </dependency>
        <!--连接 Redis 需要的包-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.6.1</version>
        </dependency>
        <!--mysql依赖的jar包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
        <dependency>
            <groupId>com.google.collections</groupId>
            <artifactId>google-collections</artifactId>
            <version>1.0</version>
        </dependency>
    </dependencies>
    <repositories>
        <repository>
            <id>central</id>
            <name>Maven Repository Switchboard</name>
            <layout>default</layout>
            <url>http://repo2.maven.org/maven2</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <!-- MAVEN 编译使用的JDK版本 -->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.10</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>

d268a9f1ecbbd9b7bbb52ef09516a51d_10816913c9f413c14511a1ddc20b8741.png


1 demo1--WorldCount

项目目录下新建data文件夹,再新建world.csv文件

hello,spark
hello,scala,hadoop
hello,hdfs
hello,spark,hadoop
hello
scala版本---SparkWC.scala
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
  * spark wordcount
  */
object SparkWC {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("wordcount").setMaster("local")
    val sc = new SparkContext(conf)
    sc.textFile("./data/world.csv").flatMap( _.split(",")).map((_,1)).reduceByKey(_+_).foreach(println)
    sc.stop()
//  下面是每一步的分析
//    //conf 可以设置SparkApplication 的名称,设置Spark 运行的模式
//    val conf = new SparkConf()
//    conf.setAppName("wordcount")
//    conf.setMaster("local")
//    //SparkContext 是通往spark 集群的唯一通道
//    val sc = new SparkContext(conf)
//
//    val lines: RDD[String] = sc.textFile("./data/world.csv")
//    val words: RDD[String] = lines.flatMap(line => {
//      line.split(",")
//    })
//    val pairWords: RDD[(String, Int)] = words.map(word=>{new Tuple2(word,1)})
//    val result: RDD[(String, Int)] = pairWords.reduceByKey((v1:Int, v2:Int)=>{v1+v2})
//    result.foreach(one=>{
//      println(one)
//    })
  }
}

测试

9fed576a2d7abe4179b87e36312d4f00_b5ad12fcd64f6cadc8b197ef0cb4c9a0.png


java版本---SparkWC.scala

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class SparkWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("wc");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("./data/world.csv");
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(","));
            }
        });
        JavaPairRDD<String, Integer> pairWords = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });
        JavaPairRDD<String, Integer> result = pairWords.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> tp) throws Exception {
                System.out.println(tp);
            }
        });
        sc.stop();
    }
}

测试

76df407188b237d6452b4092d8671c2d_1c52a4477ee6f1eb853f3c0a145acd73.png


2 demo2--join算子

代码及测试

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object Taiko extends App {
  val conf = new SparkConf().setMaster("local").setAppName("wc");
  val sc = new SparkContext(conf)
  //demo1-5 data start
  val nameRDD: RDD[(String, Int)] = sc.parallelize(List[(String, Int)](
    ("zhangsan", 18), ("lisi", 19), ("wangwu", 20), ("zhaoliu", 21)
  ))
  val sourceRDD: RDD[(String, Int)] = sc.parallelize(List[(String, Int)](
    ("zhangsan", 100), ("lisi", 200), ("wangwu", 300), ("tianqi", 400)
  ))
  //demo1-5 data end
  //demo1 join
  //val result: RDD[(String, (Int, Int))] = nameRDD.join(sourceRDD)
  //result.foreach(println)
  /** demo1结果
    * (zhangsan,(18,100))
    * (wangwu,(20,300))
    * (lisi,(19,200))
    */
  //demo2 leftOuterJoin
  //val result: RDD[(String, (Int, Option[Int]))] = nameRDD.leftOuterJoin(sourceRDD)
  //result.foreach(println)
  /** demo2结果
    * (zhangsan,(18,Some(100)))
    * (wangwu,(20,Some(300)))
    * (zhaoliu,(21,None))
    * (lisi,(19,Some(200)))
    */
  /* result.foreach(res => {
     val name = res._1
     val v1 = res._2._1
     val v2 = res._2._2.getOrElse("没有分数")
     println(s"name=$name,age=$v1,scoure=$v2")
   })*/
  /** demo2结果
    * name=zhangsan,age=18,scoure=100
    * name=wangwu,age=20,scoure=300
    * name=zhaoliu,age=21,scoure=没有分数
    * name=lisi,age=19,scoure=200
    */
  //demo3 rightOuterJoin
  //val result: RDD[(String, (Option[Int], Int))] = nameRDD.rightOuterJoin(sourceRDD)
  //result.foreach(println)
  /** demo3结果
    * (zhangsan,(Some(18),100))
    * (wangwu,(Some(20),300))
    * (tianqi,(None,400))
    * (lisi,(Some(19),200))
    */
  //demo4 fullOuterJoin
  //val result: RDD[(String, (Option[Int], Option[Int]))] = nameRDD.fullOuterJoin(sourceRDD)
  //result.foreach(println)
  /** demo4结果
    * (zhangsan,(Some(18),Some(100)))
    * (wangwu,(Some(20),Some(300)))
    * (zhaoliu,(Some(21),None))
    * (tianqi,(None,Some(400)))
    * (lisi,(Some(19),Some(200)))
    */
  //demo5 union
  //val result: RDD[(String, Int)] = nameRDD.union(sourceRDD)
  //result.foreach(println)
  /** demo5结果
    * (zhangsan,18)
    * (lisi,19)
    * (wangwu,20)
    * (zhaoliu,21)
    * (zhangsan,100)
    * (lisi,200)
    * (wangwu,300)
    * (tianqi,400)
    */
  //demo6 分区
  val nameRDD1: RDD[(String, Int)] = sc.parallelize(List[(String, Int)](
    ("zhangsan", 18), ("lisi", 19), ("wangwu", 20), ("zhaoliu", 21)
  ), 3)
  val sourceRDD1: RDD[(String, Int)] = sc.parallelize(List[(String, Int)](
    ("zhangsan", 100), ("lisi", 200), ("wangwu", 300), ("tianqi", 400)
  ), 4)
  val p1: Int = nameRDD1.getNumPartitions
  val p2: Int = sourceRDD1.getNumPartitions
  //val result: RDD[(String, (Int, Int))] = nameRDD1.join(sourceRDD1)
  //val p3: Int = result.getNumPartitions
  //println(s"p1:$p1,p2:$p2,p3:$p3")
  /** p1:3,p2:4,p3:4  和多的分区保持一致 */
  //val result: RDD[(String, Int)] = nameRDD1.union(sourceRDD1)
  //val p3: Int = result.getNumPartitions
  //println(s"p1:$p1,p2:$p2,p3:$p3")
  /** p1:3,p2:4,p3:7  数据其实没有移动,只是把分区加在了一起 */
  //demo7 intersection交集   subtract差集
  val rdd1: RDD[Int] = sc.parallelize(List[Int](1, 2, 3))
  val rdd2: RDD[Int] = sc.parallelize(List[Int](2, 3, 5))
  //rdd1.intersection(rdd2).foreach(println)
  /**
    * 3
    * 2
    */
  //rdd1.subtract(rdd2).foreach(println)
  /** 1  */
  //rdd2.subtract(rdd1).foreach(println)
  /** 5  */
  //demo8 优化频繁操作 mapPartitions分区数据处理
  val rdd: RDD[String] = sc.parallelize(List[String]("hello1", "hello2", "hello3", "hello4"), 2)
  /*  rdd.map(one => {
      println("建立数据库连接...")
      println(s"插入数据库数据:$one")
      println("关闭数据库连接...")
      one + "!"
    }).count()*/
  /** 频繁建立数据库连接!!!!!!!
    * 建立数据库连接...
    * 插入数据库数据:hello1
    * 关闭数据库连接...
    * 建立数据库连接...
    * 插入数据库数据:hello2
    * 关闭数据库连接...
    * 建立数据库连接...
    * 插入数据库数据:hello3
    * 关闭数据库连接...
    * 建立数据库连接...
    * 插入数据库数据:hello4
    * 关闭数据库连接...
    */
  rdd.mapPartitions(iter => {
    val list = new ListBuffer[String]
    println("建立数据库连接...")
    while (iter.hasNext) {
      val str = iter.next()
      println(s"插入数据库数据:$str")
      list.+=(str)
    }
    println("关闭数据库连接...")
    list.iterator
  }).count()
  /**
    * 建立数据库连接...
    * 插入数据库数据:hello1
    * 插入数据库数据:hello2
    * 关闭数据库连接...
    * 建立数据库连接...
    * 插入数据库数据:hello3
    * 插入数据库数据:hello4
    * 关闭数据库连接...
    */
}


3 demo3-- spark集群验证 yarn集群验证

user_item_score.txt
1 100001 5
1 100002 3
1 100003 4
3 100001 2
3 100002 5
2 100001 1
2 100002 2
2 100003 4
2 100004 5
userwatchlist
package com.test.scala.spark
import org.apache.spark.{SparkConf, SparkContext}
object userwatchlist {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("userwatchlist test")
    val sc = new SparkContext(conf)
    val input_path = sc.textFile("./data/user_item_score.txt")
    val output_path = "./data/userwatchlist_output"
    //过滤掉分数小于2的数据
    val data = input_path.filter(x => {
      val fields = x.split(" ")
      fields(2).toDouble > 2
    }).map(x => {
      /*
       原始数据
       user item score
       ->
       (user, (item1 score1))
       (user, (item2 score2))
       ->
       (user,((item1 score1) (item2 score2)))
       ->目标
       user -> item item  item
      */
      val fields = x.split(" ")
      (fields(0).toString, (fields(1).toString, fields(2).toString))
    }).groupByKey().map(x => {
      val userid = x._1
      val item_score_tuple_list = x._2
      //根据score进行排序
      val tmp_arr = item_score_tuple_list.toArray.sortWith(_._2 > _._2)
      var watchlen = tmp_arr.length
      //取前5个
      if (watchlen > 5) {
        watchlen = 5
      }
      val strbuf = new StringBuilder
      for (i <- 0 until watchlen) {
        strbuf ++= tmp_arr(i)._1
        strbuf.append(":")
        strbuf ++= tmp_arr(i)._2
        strbuf.append(" ")
      }
      userid + "\t" + strbuf
    })
    data.saveAsTextFile(output_path)
  }
}


3.1本地验证结果

c0ff8a68e2b95bf0fd327c453963e5b8_27bf810e16ddfb5f46f8fc0a20770c68.png


3.2通过spark集群验证

修改scala类

f88707eb50fb0e76783430396e761ac9_069f935ee5711c666a231d2dd908e021.png

maven打包

将文件和jar包上传到linux上

在将文件上传到hdfs上

428735cbeb3d660e7e61335285cbf9b8_889ea7e7e07429687c214cdc74350b83.png

新建run.sh

e20de3f0fb021a1f2d01f41b2a98925b_faa5ae9e47e82ce5c19544497a961b8f.png

/usr/local/src/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \
      --master spark://master:7077 \
      --num-executors 2 \
      --executor-memory 1g \
      --executor-cores 1 \
      --driver-memory 1g \
      --class com.test.scala.spark.userwatchlist /root/test_spark/test-1.0-SNAPSHOT.jar
运行bash run.sh


cae01fcee54a8690ad61465cec363ce7_1db52431eedafb843c8f6020b7e802de.pngcae01fcee54a8690ad61465cec363ce7_1db52431eedafb843c8f6020b7e802de.pngcae01fcee54a8690ad61465cec363ce7_1db52431eedafb843c8f6020b7e802de.pngcae01fcee54a8690ad61465cec363ce7_1db52431eedafb843c8f6020b7e802de.png

cae01fcee54a8690ad61465cec363ce7_1db52431eedafb843c8f6020b7e802de.png

a5368b88ad756f0731f95bb6d54679a5_905ede1b458ed39640ed77468e7dda2b.png


3.3通过hadoop集群验证

修改run.sh

6d449fd4382d2cc4fed07b11e709b27d_b884c9ececf5aab135fa221263049719.png

/usr/local/src/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \
      --master yarn-cluster \
      --num-executors 2 \
      --executor-memory 1g \
      --executor-cores 1 \
      --driver-memory 1g \
      --class com.test.scala.spark.userwatchlist /root/test_spark/test-1.0-SNAPSHOT.jar

删除刚刚的输出路径(如果没操作spark集群验证的则不用)hadoop fs -rmr- /userwatchlist_output

94eb7ad5d52450e97a1b3aac89409142_c1923c53366a5f7b616e12f8d864079a.png

20c51fc098cf750074a87b8d2779f7fe_abe5338a1e2c3bd446b975b926e72683.png



4 demo4-- cf算法

package com.test.scala.spark
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import scala.math._
object cf {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("CF")
    val sc = new SparkContext(conf)
    val input_path = args(0).toString
    val output_path = args(1).toString
    val lines = sc.textFile(input_path)
    val max_prefs_per_user = 20
    val topn = 5
    //step1
    val ui_rdd = lines.map { x =>
      val fileds = x.split(" ")
      (fileds(0).toString, (fileds(1).toString, fileds(2).toDouble))
    }.groupByKey().flatMap { x =>
      val user = x._1
      val item_score_list = x._2
      var is_arr = item_score_list.toArray
      var is_list_len = is_arr.length
      if (is_list_len > max_prefs_per_user) {
        is_list_len = max_prefs_per_user
      }
      //转置
      var i_us_arr = new ArrayBuffer[(String, (String, Double))]()
      for (i <- 0 until is_list_len) {
        i_us_arr += ((is_arr(i)._1,(user,is_arr(i)._2)))
      }
      i_us_arr
    }.groupByKey().flatMap{x=>
      //归一化
      val item = x._1
      val u_list = x._2
      val us_arr = u_list.toArray
      var sum:Double = 0.0
      for(i <- 0 until us_arr.length){
        sum += pow(us_arr(i)._2,2)
      }
      sum = sqrt(sum)
      var u_is_arr = new ArrayBuffer[(String, (String, Double))]()
      for(i <- 0 until us_arr.length){
        u_is_arr += ((us_arr(i)._1,(item,us_arr(i)._2 / sum)))
      }
      u_is_arr
      /*
      设置参数测试
      (2,CompactBuffer((100002,0.3244428422615251), (100003,0.7071067811865475), (100004,1.0), (100001,0.18257418583505536)))
      (3,CompactBuffer((100002,0.8111071056538127), (100001,0.3651483716701107)))
      (1,CompactBuffer((100002,0.48666426339228763), (100003,0.7071067811865475), (100001,0.9128709291752769)))
      */
    }.groupByKey()
    //step2
    val unpack_rdd = ui_rdd.flatMap{x=>
      val is_arr = x._2.toArray
      var ii_s_arr = new ArrayBuffer[((String,String),Double)]()
      for(i <- 0 until is_arr.length-1){
        for(j <- 0 until is_arr.length){
          ii_s_arr += (((is_arr(i)._1,is_arr(j)._1),is_arr(i)._2 * is_arr(j)._2))
          ii_s_arr += (((is_arr(j)._1,is_arr(i)._1),is_arr(i)._2 * is_arr(j)._2))
        }
      }
      ii_s_arr
    /*测试
      ((100002,100002),0.10526315789473685)
      ((100002,100002),0.10526315789473685)
      ((100002,100003),0.22941573387056174)
      ((100003,100002),0.22941573387056174)
      ((100002,100004),0.3244428422615251)
      ((100004,100002),0.3244428422615251)
      ((100002,100001),0.05923488777590923)
      ((100001,100002),0.05923488777590923)
      ((100003,100002),0.22941573387056174)
      ((100002,100003),0.22941573387056174)
      ((100003,100003),0.4999999999999999)
      ((100003,100003),0.4999999999999999)*/
    }
    //step3
    unpack_rdd.groupByKey().map{x=>
      val ii_pair = x._1
      val s_list = x._2
      val s_arr = s_list.toArray
      var score:Double = 0.0
      for(i <- 0 until s_arr.length){
        score += s_arr(i)
      }
      (ii_pair._1,(ii_pair._2,score))
     /*测试
      (100002,(100002,2.0))
      (100002,(100001,0.7996709849747747))
      (100001,(100003,0.7745966692414834))
      (100003,(100002,1.1470786693528088))
      (100001,(100004,0.18257418583505536))
      (100004,(100001,0.18257418583505536))
      (100004,(100002,0.6488856845230502))
      (100004,(100004,2.0))
      (100003,(100001,0.7745966692414834))
      (100003,(100003,1.9999999999999996))
      (100002,(100004,0.6488856845230502))
      (100001,(100002,0.7996709849747747))
      (100003,(100004,1.414213562373095))
      (100004,(100003,1.414213562373095))
      (100002,(100003,1.1470786693528088))*/
    }.groupByKey().map{x=>
      val item_a = x._1
      val item_list = x._2
      val bs_arr = item_list.toArray.sortWith(_._2 > _._2)
      var len = bs_arr.length
      if(len > topn){
        len=topn
      }
      val s = new StringBuilder
      for(i <- 0 until len){
        val item = bs_arr(i)._1
        val score = "%1.4f" format bs_arr(i)._2
        s.append(item+":"+score)
        if(i<len-1){
          s.append(",")
        }
      }
      item_a + "\t" + s
    }.saveAsTextFile(output_path)
  }
}

设置参数测试:

053fe6bd6fd7d1e76dbbee83aea210fd_4af9ec2337ef735bdfcb40a765bc96b1.png

结果:

a7a72c01ec88fc483957057d279e3617_c1cf85c22f75e3972cac277f54e071b8.png

目录
相关文章
|
4月前
|
SQL 分布式计算 Spark
Spark 教程系列
Spark 教程系列
28 0
|
4月前
|
SQL 分布式计算 数据可视化
Spark SQL案例【电商购买数据分析】
Spark SQL案例【电商购买数据分析】
|
4月前
|
SQL 分布式计算 大数据
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
43 0
|
4月前
|
分布式计算 Java 大数据
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
225 0
|
4月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
103 0
|
4月前
|
分布式计算 大数据 Scala
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
94 1
|
6天前
|
分布式计算 Hadoop Scala
Spark【环境搭建 01】spark-3.0.0-without 单机版(安装+配置+测试案例)
【4月更文挑战第13天】Spark【环境搭建 01】spark-3.0.0-without 单机版(安装+配置+测试案例)
8 0
|
4月前
|
SQL 分布式计算 数据挖掘
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
77 0
|
1月前
|
存储 分布式计算 Spark
实战|使用Spark Streaming写入Hudi
实战|使用Spark Streaming写入Hudi
46 0
|
2月前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析
125 0

相关实验场景

更多