Spark案例库V1.0版

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Spark案例库V1.0版

Spark案例库

案例一:使用SparkRDD实现词频统计

pom.xml文件

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>
<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
</properties>
<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
</dependencies>
<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

实现代码

object SparkWordCount {
  def main(args: Array[String]): Unit = {
    // TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息
    val sc: SparkContext = {
      // 其一、构建SparkConf对象,设置应用名称和master
      val sparkConf: SparkConf = new SparkConf()
          .setAppName("SparkWordCount")
          .setMaster("local[2]")
      // 其二、创建SparkContext实例,传递sparkConf对象
      new SparkContext(sparkConf)
    }
    // TODO: 第一步、读取文件数据,sc.textFile方法,将数据封装到RDD中
    val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data")
    // TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
    val resultRDD: RDD[(String, Int)] = inputRDD
      // 按照分隔符分割单词
      .flatMap(line => line.split("\\s+"))
      // 转换单词为二元组,表示每个单词出现一次
      .map(word => word -> 1)
      // 按照单词分组,对组内执进行聚合reduce操作,求和
      .reduceByKey((tmp, item) => tmp + item)
    // TODO: 第三步、将最终处理结果打印控制台
    resultRDD.foreach(tuple => println(tuple))
    // 应用结束,关闭资源
    sc.stop()
  }
}

案例二:WordCount程序,按照词频降序排序取Top3

pom.xml

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>
<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
</properties>
<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
</dependencies>
<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

代码实现

object SparkTopKey {
  def main(args: Array[String]): Unit = {
    // TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息
    val sc: SparkContext = {
      // 其一、构建SparkConf对象,设置应用名称和master
      val sparkConf: SparkConf = new SparkConf()
          .setAppName("SparkWordCount")
          .setMaster("local[2]")
      // 其二、创建SparkContext实例,传递sparkConf对象
      new SparkContext(sparkConf)
    }
    // TODO: 第一步、读取文件数据,sc.textFile方法,将数据封装到RDD中
    val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data")
    // TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
    val resultRDD: RDD[(String, Int)] = inputRDD
      // 按照分隔符分割单词
      .flatMap(line => line.split("\\s+"))
      // 转换单词为二元组,表示每个单词出现一次
      .map(word => word -> 1)
      // 按照单词分组,对组内执进行聚合reduce操作,求和
      .reduceByKey((tmp, item) => tmp + item)
    resultRDD
      .sortBy(tuple => tuple._2, ascending = false)
      // 打印结果
      .take(3)
      .foreach(tuple => println(tuple))
    // 应用结束,关闭资源
    sc.stop()
  }
}

案例三:采用并行化的方式构建集合Seq中的数据为RDD,进行词频统计

pom.xml

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>
<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
</properties>
<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
</dependencies>
<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

代码实现

object _01SparkParallelizeTest {
  def main(args: Array[String]): Unit = {
    val sc: SparkContext = {
      // sparkConf对象
      val sparkConf = new SparkConf()
        // _01SparkParallelizeTest$  ->(.stripSuffix("$"))   ->  _01SparkParallelizeTest
          .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
          .setMaster("local[2]")
      // sc 实例对象
      SparkContext.getOrCreate(sparkConf)
    }
    // TODO: 1、Scala中集合Seq序列存储数据
    val linesSeq: Seq[String] = Seq(
      "hadoop scala hive spark scala sql sql", 
      "hadoop scala spark hdfs hive spark", 
      "spark hdfs spark hdfs scala hive spark"
    )
    // TODO: 2、并行化集合
    val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)
    // TODO: 3、词频统计
    val resultRDD = inputRDD
      .flatMap(line => line.split("\\s+"))
      .map(word => (word, 1))
      .reduceByKey((tmp, item) => tmp + item)
    // TODO: 4、输出结果
    resultRDD.foreach(println)
    // 应用结束,关闭资源
    sc.stop()
  }
}

案例四:采用wholeTextFiles()方法读取小文件

pom.xml

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>
<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
</properties>
<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
</dependencies>
<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

代码实现

object _02SparkWholeTextFileTest {
  def main(args: Array[String]): Unit = {
    val sc: SparkContext = {
      // sparkConf对象
      val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // sc 实例对象
      SparkContext.getOrCreate(sparkConf)
    }
    /*
      def wholeTextFiles(
          path: String,
          minPartitions: Int = defaultMinPartitions
      ): RDD[(String, String)]
      Key: 每个小文件名称路径
      Value:每个小文件的内容
     */
    val inputRDD: RDD[(String, String)] = sc.wholeTextFiles("datas/ratings100", minPartitions = 2)
    println(s"RDD 分区数目 = ${inputRDD.getNumPartitions}")
    inputRDD.take(2).foreach(tuple => println(tuple))
    // 应用结束,关闭资源
    sc.stop()
  }
} 

案例五:RDD中缓存函数,将数据缓存到内存或磁盘、释放缓存

pom.xml

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>
<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
</properties>
<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
</dependencies>
<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

代码实现

object _05SparkCacheTest {
  def main(args: Array[String]): Unit = {
    // 创建应用程序入口SparkContext实例对象
    val sc: SparkContext = {
      // 1.a 创建SparkConf对象,设置应用的配置信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 1.b 传递SparkConf对象,构建Context实例
      new SparkContext(sparkConf)
    }
    // 读取文本文件数据
    val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2)
    // 缓存数据: 将数据缓存至内存
    inputRDD.persist()
    // 使用Action函数触发缓存
    inputRDD.count()
    // 释放缓存
    inputRDD.unpersist()
    //缓存数据:选择缓存级别
    inputRDD.persist(StorageLevel.MEMORY_AND_DISK)
    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

案例六:RDD数据Checkpoint设置案例

pom.xml

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>
<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
</properties>
<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
</dependencies>
<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

代码实现

object _06SparkCkptTest {
  def main(args: Array[String]): Unit = {
    // 创建应用程序入口SparkContext实例对象
    val sc: SparkContext = {
      // 1.a 创建SparkConf对象,设置应用的配置信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 1.b 传递SparkConf对象,构建Context实例
      new SparkContext(sparkConf)
    }
    // TODO: 设置检查点目录,将RDD数据保存到那个目录
    sc.setCheckpointDir("datas/ckpt/")
    // 读取文件数据
    val datasRDD = sc.textFile("datas/wordcount.data")
    // TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发
    datasRDD.checkpoint()
    datasRDD.count()
    // TODO: 再次执行count函数, 此时从checkpoint读取数据
    println(datasRDD.count())
    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

案例七:广播变量和累加器案例

基于Spark框架使用Scala语言编程实现词频统计WordCount程序,将符号数据过滤,并统计出现的次数

-a. 过滤标点符号数据

使用广播变量

-b. 统计出标点符号数据出现次数

使用累加器

代码实现

object _05SparkSharedVariableTest {
  def main(args: Array[String]): Unit = {
    // 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
    val sc: SparkContext = {
      // 创建SparkConf对象,设置应用相关信息,比如名称和master
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 构建SparkContext实例对象,传递SparkConf
      new SparkContext(sparkConf)
    }
    // 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
    val inputRDD: RDD[String] = sc.textFile("datas/filter/datas.input", minPartitions = 2)
    // TODO: 字典数据,只要有这些单词就过滤: 特殊字符存储列表List中
    val list: List[String] = List(",", ".", "!", "#", "$", "%")
    // TODO: 将字典数据进行广播变量
    val broadcastList: Broadcast[List[String]] = sc.broadcast(list)
    // TODO: 定义计数器
    val accumulator: LongAccumulator = sc.longAccumulator("number_accu")
    // 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
    val resultRDD: RDD[(String, Int)] = inputRDD
      // 过滤空行数据
      .filter(line => null != line && line.trim.length > 0)
      // 分割为单词
      .flatMap(line => line.trim.split("\\s+"))
      // TODO: 过滤非单词字符
            .filter{word =>
          // 获取广播变量的值
              val wordsList: List[String] = broadcastList.value
              // 判断每个单词是否时非单词字符
              val flag: Boolean = wordsList.contains(word)
              if(flag){
                // 如果是非单词字符,累加器加1
                accumulator.add(1L)
              }
              // 返回
              ! flag
        }
      // 按照单词分组,进行聚合操作
            .map(word => (word, 1))
            .reduceByKey(_ + _)
    // 4. 第三步、将最终处理结果RDD保存到HDFS或打印控制台
    resultRDD.foreach(println)
    // 可以累加器的值,必须使用RDD Action函数进行触发
    println("Accumulator: " + accumulator.value)
    // 5. 当应用运行结束以后,关闭资源
    sc.stop()
  }
}

案例八:将RDD数据保存至MySQL表中一般模式

a. 对结果数据降低分区数目
    b. 针对每个分区数据进行操作
      每个分区数据插入数据库时,创建一个连接Connection

pom.xml

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>
<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
    <hbase.version>1.2.0-cdh5.16.2</hbase.version>
    <mysql.version>8.0.19</mysql.version>
</properties>
<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <!-- HBase Client 依赖 -->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-hadoop2-compat</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <!-- MySQL Client 依赖 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
    <dependency>
        <groupId>com.hankcs</groupId>
        <artifactId>hanlp</artifactId>
        <version>portable-1.7.7</version>
    </dependency>
</dependencies>
<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

代码实现:

object _04SparkWriteMySQL {
  def main(args: Array[String]): Unit = {
    // 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
    val sc: SparkContext = {
      // 创建SparkConf对象,设置应用相关信息,比如名称和master
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 构建SparkContext实例对象,传递SparkConf
      new SparkContext(sparkConf)
    }
    // 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
    val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data")
    // 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
    val resultRDD: RDD[(String, Int)] = inputRDD
      // TODO: 过滤
      .filter(line => null != line && line.trim.length > 0 )
      // a. 对每行数据按照分割符分割
      .flatMap(line => line.trim.split("\\s+"))
      // b. 将每个单词转换为二元组,表示出现一次
      .map(word => (word ,1))
      .reduceByKey((temp, item) => temp + item)
    // TODO: 将结果数据resultRDD保存至MySQL表中
    resultRDD
      // 降低RDD分区数目
      .coalesce(1)
      .foreachPartition{iter =>
        // val xx: Iterator[(String, Int)] = iter
        // 直接调用保存分区数据到MySQL表的方法
        saveToMySQL(iter)
      }
    // 5. 当应用运行结束以后,关闭资源
    sc.stop()
  }
  /**
   * 定义一个方法,将RDD中分区数据保存至MySQL表
   */
  def saveToMySQL(iter: Iterator[(String, Int)]): Unit = {
    // step1. 加载驱动类
    Class.forName("com.mysql.cj.jdbc.Driver")
    // 声明变量
    var conn: Connection = null
    var pstmt: PreparedStatement = null
    try{
      // step2. 创建连接
      conn = DriverManager.getConnection(
        "jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
        "root",
        "123456"
      )
      pstmt = conn.prepareStatement("INSERT INTO db_test.tb_wordcount (word, count) VALUES(?, ?)")
      // step3. 插入数据
      iter.foreach{case (word, count) =>
        pstmt.setString(1, word)
        pstmt.setInt(2, count)
        pstmt.execute()
      }
    }catch {
      case e: Exception => e.printStackTrace()
    }finally {
      // step4. 关闭连接
      if(null != pstmt) pstmt.close()
      if(null != conn) conn.close()
    }
  }
}

案例九:将RDD数据保存至MySQL表中高级模式

要求:a. 对结果数据降低分区数目

b. 针对每个分区数据进行操作

每个分区数据插入数据库时,创建一个连接Connection

c. 批次插入每个分区数据

addBatch

executeBatch

d. 事务性

手动提交事务,并且还原原来事务

e. 考虑主键存在时,如何保存数据数据

存在,更新数据;不存在,插入数据

pom.xml

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>
<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
    <hbase.version>1.2.0-cdh5.16.2</hbase.version>
    <mysql.version>8.0.19</mysql.version>
</properties>
<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <!-- HBase Client 依赖 -->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-hadoop2-compat</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <!-- MySQL Client 依赖 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
    <dependency>
        <groupId>com.hankcs</groupId>
        <artifactId>hanlp</artifactId>
        <version>portable-1.7.7</version>
    </dependency>
</dependencies>
<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

代码实现:

object _04SparkWriteMySQLV3 {
  def main(args: Array[String]): Unit = {
    // 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
    val sc: SparkContext = {
      // 创建SparkConf对象,设置应用相关信息,比如名称和master
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 构建SparkContext实例对象,传递SparkConf
      new SparkContext(sparkConf)
    }
    // 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
    val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data")
    // 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
    val resultRDD: RDD[(String, Int)] = inputRDD
      // TODO: 过滤
      .filter(line => null != line && line.trim.length > 0 )
      // a. 对每行数据按照分割符分割
      .flatMap(line => line.trim.split("\\s+"))
      // b. 将每个单词转换为二元组,表示出现一次
      .map(word => (word ,1))
      .reduceByKey((temp, item) => temp + item)
    // TODO: 将结果数据resultRDD保存至MySQL表中
    resultRDD.coalesce(1).foreachPartition(saveToMySQL)
    // 4. 当应用运行结束以后,关闭资源
    sc.stop()
  }
  /**
   * 定义一个方法,将RDD中分区数据保存至MySQL表
   */
  def saveToMySQL(iter: Iterator[(String, Int)]): Unit = {
    // step1. 加载驱动类
    Class.forName("com.mysql.cj.jdbc.Driver")
    // 声明变量
    var conn: Connection = null
    var pstmt: PreparedStatement = null
    try{
      // step2. 创建连接
      conn = DriverManager.getConnection(
        "jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
        "root",
        "123456"
      )
      pstmt = conn.prepareStatement("replace INTO db_test.tb_wordcount (word, count) VALUES(?, ?)")
      // TODO: 考虑事务性,一个分区数据要全部保存,要不都不保存
      val autoCommit: Boolean = conn.getAutoCommit // 获取数据库默认事务提交方式
      conn.setAutoCommit(false)
      // step3. 插入数据
      iter.foreach{case (word, count) =>
        pstmt.setString(1, word)
        pstmt.setInt(2, count)
        // TODO: 加入一个批次中
        pstmt.addBatch()
      }
      // TODO:批量执行批次
      pstmt.executeBatch()
      conn.commit() // 手动提交事务,进行批量插入
      // 还原数据库原来事务
      conn.setAutoCommit(autoCommit)
    }catch {
      case e: Exception => e.printStackTrace()
    }finally {
      // step4. 关闭连接
      if(null != pstmt) pstmt.close()
      if(null != conn) conn.close()
    }
  }
}

案例十:从HBase 表中读取数据,封装到RDD数据集

pom.xml

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>
<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
    <hbase.version>1.2.0-cdh5.16.2</hbase.version>
    <mysql.version>8.0.19</mysql.version>
</properties>
<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <!-- HBase Client 依赖 -->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-hadoop2-compat</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <!-- MySQL Client 依赖 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
    <dependency>
        <groupId>com.hankcs</groupId>
        <artifactId>hanlp</artifactId>
        <version>portable-1.7.7</version>
    </dependency>
</dependencies>
<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

代码实现:

object _03SparkReadHBase {
  def main(args: Array[String]): Unit = {
    // 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
    val sc: SparkContext = {
      // 创建SparkConf对象,设置应用相关信息,比如名称和master
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
        // TODO: 设置使用Kryo 序列化方式
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        // TODO: 注册序列化的数据类型
        .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
      // 构建SparkContext实例对象,传递SparkConf
      new SparkContext(sparkConf)
    }
    // TODO: 从HBase表读取数据,调用RDD方法:newAPIHadoopRDD
    val conf: Configuration = HBaseConfiguration.create()
    // 设置连接Zookeeper属性
    conf.set("hbase.zookeeper.quorum", "node1")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("zookeeper.znode.parent", "/hbase")
    // 设置将数据保存的HBase表的名称
    conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
      conf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )
    // 打印HBase表样本数据
    hbaseRDD
      .take(6)
      .foreach{case (rowKey, result) =>
        result.rawCells().foreach{cell =>
          println(s"RowKey = ${Bytes.toString(result.getRow)}")
          println(s"\t${Bytes.toString(CellUtil.cloneFamily(cell))}:" +
            s"${Bytes.toString(CellUtil.cloneQualifier(cell))} = " +
            s"${Bytes.toString(CellUtil.cloneValue(cell))}")
        }
      }
    // 5. 当应用运行结束以后,关闭资源
    sc.stop()
  }
}

案例十一:将RDD数据保存至HBase表中

pom.xml

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>
<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
    <hbase.version>1.2.0-cdh5.16.2</hbase.version>
    <mysql.version>8.0.19</mysql.version>
</properties>
<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <!-- HBase Client 依赖 -->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-hadoop2-compat</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    <!-- MySQL Client 依赖 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
    <dependency>
        <groupId>com.hankcs</groupId>
        <artifactId>hanlp</artifactId>
        <version>portable-1.7.7</version>
    </dependency>
</dependencies>
<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

代码实现:

object _02SparkWriteHBase {
  def main(args: Array[String]): Unit = {
    // 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
    val sc: SparkContext = {
      // 创建SparkConf对象,设置应用相关信息,比如名称和master
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 构建SparkContext实例对象,传递SparkConf
      new SparkContext(sparkConf)
    }
    // 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
    val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data")
    // 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
    val resultRDD: RDD[(String, Int)] = inputRDD
      // 过滤
      .filter(line => null != line && line.trim.length > 0 )
      // a. 对每行数据按照分割符分割
      .flatMap(line => line.trim.split("\\s+"))
      // b. 将每个单词转换为二元组,表示出现一次
      .map(word => (word ,1))
      .reduceByKey((temp, item) => temp + item)
    // TODO: step 1. 转换RDD为RDD[(RowKey, Put)]
    /*
      * HBase表的设计:
        * 表的名称:htb_wordcount
        * Rowkey: word
        * 列簇: info
        * 字段名称: count
      create 'htb_wordcount', 'info'
     */
    val putsRDD: RDD[(ImmutableBytesWritable, Put)] = resultRDD.map{case (word, count) =>
      // 其一、构建RowKey对象
      val rowKey: ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(word))
      // 其二、构建Put对象
      val put: Put = new Put(rowKey.get())
      // 设置字段的值
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(count + ""))
      // 其三、返回二元组(RowKey, Put)
      rowKey -> put
    }
    // TODO: step2. 调用RDD中saveAsNewAPIHadoopFile保存数据
    val conf: Configuration = HBaseConfiguration.create()
    // 设置连接Zookeeper属性
    conf.set("hbase.zookeeper.quorum", "node1")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("zookeeper.znode.parent", "/hbase")
    // 设置将数据保存的HBase表的名称
    conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")
    putsRDD.saveAsNewAPIHadoopFile(
      "datas/hbase/htb_wordcount/",
      classOf[ImmutableBytesWritable],
      classOf[Put],
      classOf[TableOutputFormat[ImmutableBytesWritable]],
      conf
    )
    // 5. 当应用运行结束以后,关闭资源
    sc.stop()
  }
}


相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
8月前
|
SQL 分布式计算 数据可视化
Spark SQL案例【电商购买数据分析】
Spark SQL案例【电商购买数据分析】
|
SQL 分布式计算 资源调度
线上 hive on spark 作业执行超时问题排查案例分享
线上 hive on spark 作业执行超时问题排查案例分享
|
3月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
80 5
|
3月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
61 3
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
81 0
|
3月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
53 1
|
3月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
126 0
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
70 0
|
3月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
61 0
|
3月前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
60 0
下一篇
开通oss服务