Spark案例库
案例一:使用SparkRDD实现词频统计
pom.xml文件
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
实现代码
object SparkWordCount { def main(args: Array[String]): Unit = { // TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息 val sc: SparkContext = { // 其一、构建SparkConf对象,设置应用名称和master val sparkConf: SparkConf = new SparkConf() .setAppName("SparkWordCount") .setMaster("local[2]") // 其二、创建SparkContext实例,传递sparkConf对象 new SparkContext(sparkConf) } // TODO: 第一步、读取文件数据,sc.textFile方法,将数据封装到RDD中 val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data") // TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey val resultRDD: RDD[(String, Int)] = inputRDD // 按照分隔符分割单词 .flatMap(line => line.split("\\s+")) // 转换单词为二元组,表示每个单词出现一次 .map(word => word -> 1) // 按照单词分组,对组内执进行聚合reduce操作,求和 .reduceByKey((tmp, item) => tmp + item) // TODO: 第三步、将最终处理结果打印控制台 resultRDD.foreach(tuple => println(tuple)) // 应用结束,关闭资源 sc.stop() } }
案例二:WordCount程序,按照词频降序排序取Top3
pom.xml
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
代码实现
object SparkTopKey { def main(args: Array[String]): Unit = { // TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息 val sc: SparkContext = { // 其一、构建SparkConf对象,设置应用名称和master val sparkConf: SparkConf = new SparkConf() .setAppName("SparkWordCount") .setMaster("local[2]") // 其二、创建SparkContext实例,传递sparkConf对象 new SparkContext(sparkConf) } // TODO: 第一步、读取文件数据,sc.textFile方法,将数据封装到RDD中 val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data") // TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey val resultRDD: RDD[(String, Int)] = inputRDD // 按照分隔符分割单词 .flatMap(line => line.split("\\s+")) // 转换单词为二元组,表示每个单词出现一次 .map(word => word -> 1) // 按照单词分组,对组内执进行聚合reduce操作,求和 .reduceByKey((tmp, item) => tmp + item) resultRDD .sortBy(tuple => tuple._2, ascending = false) // 打印结果 .take(3) .foreach(tuple => println(tuple)) // 应用结束,关闭资源 sc.stop() } }
案例三:采用并行化的方式构建集合Seq中的数据为RDD,进行词频统计
pom.xml
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
代码实现
object _01SparkParallelizeTest { def main(args: Array[String]): Unit = { val sc: SparkContext = { // sparkConf对象 val sparkConf = new SparkConf() // _01SparkParallelizeTest$ ->(.stripSuffix("$")) -> _01SparkParallelizeTest .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // sc 实例对象 SparkContext.getOrCreate(sparkConf) } // TODO: 1、Scala中集合Seq序列存储数据 val linesSeq: Seq[String] = Seq( "hadoop scala hive spark scala sql sql", "hadoop scala spark hdfs hive spark", "spark hdfs spark hdfs scala hive spark" ) // TODO: 2、并行化集合 val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2) // TODO: 3、词频统计 val resultRDD = inputRDD .flatMap(line => line.split("\\s+")) .map(word => (word, 1)) .reduceByKey((tmp, item) => tmp + item) // TODO: 4、输出结果 resultRDD.foreach(println) // 应用结束,关闭资源 sc.stop() } }
案例四:采用wholeTextFiles()方法读取小文件
pom.xml
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
代码实现
object _02SparkWholeTextFileTest { def main(args: Array[String]): Unit = { val sc: SparkContext = { // sparkConf对象 val sparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // sc 实例对象 SparkContext.getOrCreate(sparkConf) } /* def wholeTextFiles( path: String, minPartitions: Int = defaultMinPartitions ): RDD[(String, String)] Key: 每个小文件名称路径 Value:每个小文件的内容 */ val inputRDD: RDD[(String, String)] = sc.wholeTextFiles("datas/ratings100", minPartitions = 2) println(s"RDD 分区数目 = ${inputRDD.getNumPartitions}") inputRDD.take(2).foreach(tuple => println(tuple)) // 应用结束,关闭资源 sc.stop() } }
案例五:RDD中缓存函数,将数据缓存到内存或磁盘、释放缓存
pom.xml
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
代码实现
object _05SparkCacheTest { def main(args: Array[String]): Unit = { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext = { // 1.a 创建SparkConf对象,设置应用的配置信息 val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // 1.b 传递SparkConf对象,构建Context实例 new SparkContext(sparkConf) } // 读取文本文件数据 val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2) // 缓存数据: 将数据缓存至内存 inputRDD.persist() // 使用Action函数触发缓存 inputRDD.count() // 释放缓存 inputRDD.unpersist() //缓存数据:选择缓存级别 inputRDD.persist(StorageLevel.MEMORY_AND_DISK) // 应用程序运行结束,关闭资源 sc.stop() } }
案例六:RDD数据Checkpoint设置案例
pom.xml
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
代码实现
object _06SparkCkptTest { def main(args: Array[String]): Unit = { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext = { // 1.a 创建SparkConf对象,设置应用的配置信息 val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // 1.b 传递SparkConf对象,构建Context实例 new SparkContext(sparkConf) } // TODO: 设置检查点目录,将RDD数据保存到那个目录 sc.setCheckpointDir("datas/ckpt/") // 读取文件数据 val datasRDD = sc.textFile("datas/wordcount.data") // TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发 datasRDD.checkpoint() datasRDD.count() // TODO: 再次执行count函数, 此时从checkpoint读取数据 println(datasRDD.count()) // 应用程序运行结束,关闭资源 sc.stop() } }
案例七:广播变量和累加器案例
基于Spark框架使用Scala语言编程实现词频统计WordCount程序,将符号数据过滤,并统计出现的次数
-a. 过滤标点符号数据
使用广播变量
-b. 统计出标点符号数据出现次数
使用累加器
代码实现
object _05SparkSharedVariableTest { def main(args: Array[String]): Unit = { // 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行 val sc: SparkContext = { // 创建SparkConf对象,设置应用相关信息,比如名称和master val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // 构建SparkContext实例对象,传递SparkConf new SparkContext(sparkConf) } // 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中 val inputRDD: RDD[String] = sc.textFile("datas/filter/datas.input", minPartitions = 2) // TODO: 字典数据,只要有这些单词就过滤: 特殊字符存储列表List中 val list: List[String] = List(",", ".", "!", "#", "$", "%") // TODO: 将字典数据进行广播变量 val broadcastList: Broadcast[List[String]] = sc.broadcast(list) // TODO: 定义计数器 val accumulator: LongAccumulator = sc.longAccumulator("number_accu") // 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey val resultRDD: RDD[(String, Int)] = inputRDD // 过滤空行数据 .filter(line => null != line && line.trim.length > 0) // 分割为单词 .flatMap(line => line.trim.split("\\s+")) // TODO: 过滤非单词字符 .filter{word => // 获取广播变量的值 val wordsList: List[String] = broadcastList.value // 判断每个单词是否时非单词字符 val flag: Boolean = wordsList.contains(word) if(flag){ // 如果是非单词字符,累加器加1 accumulator.add(1L) } // 返回 ! flag } // 按照单词分组,进行聚合操作 .map(word => (word, 1)) .reduceByKey(_ + _) // 4. 第三步、将最终处理结果RDD保存到HDFS或打印控制台 resultRDD.foreach(println) // 可以累加器的值,必须使用RDD Action函数进行触发 println("Accumulator: " + accumulator.value) // 5. 当应用运行结束以后,关闭资源 sc.stop() } }
案例八:将RDD数据保存至MySQL表中一般模式
a. 对结果数据降低分区数目 b. 针对每个分区数据进行操作 每个分区数据插入数据库时,创建一个连接Connection
pom.xml
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> <hbase.version>1.2.0-cdh5.16.2</hbase.version> <mysql.version>8.0.19</mysql.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- HBase Client 依赖 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <!-- MySQL Client 依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp --> <dependency> <groupId>com.hankcs</groupId> <artifactId>hanlp</artifactId> <version>portable-1.7.7</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
代码实现:
object _04SparkWriteMySQL { def main(args: Array[String]): Unit = { // 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行 val sc: SparkContext = { // 创建SparkConf对象,设置应用相关信息,比如名称和master val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // 构建SparkContext实例对象,传递SparkConf new SparkContext(sparkConf) } // 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中 val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data") // 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey val resultRDD: RDD[(String, Int)] = inputRDD // TODO: 过滤 .filter(line => null != line && line.trim.length > 0 ) // a. 对每行数据按照分割符分割 .flatMap(line => line.trim.split("\\s+")) // b. 将每个单词转换为二元组,表示出现一次 .map(word => (word ,1)) .reduceByKey((temp, item) => temp + item) // TODO: 将结果数据resultRDD保存至MySQL表中 resultRDD // 降低RDD分区数目 .coalesce(1) .foreachPartition{iter => // val xx: Iterator[(String, Int)] = iter // 直接调用保存分区数据到MySQL表的方法 saveToMySQL(iter) } // 5. 当应用运行结束以后,关闭资源 sc.stop() } /** * 定义一个方法,将RDD中分区数据保存至MySQL表 */ def saveToMySQL(iter: Iterator[(String, Int)]): Unit = { // step1. 加载驱动类 Class.forName("com.mysql.cj.jdbc.Driver") // 声明变量 var conn: Connection = null var pstmt: PreparedStatement = null try{ // step2. 创建连接 conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", "root", "123456" ) pstmt = conn.prepareStatement("INSERT INTO db_test.tb_wordcount (word, count) VALUES(?, ?)") // step3. 插入数据 iter.foreach{case (word, count) => pstmt.setString(1, word) pstmt.setInt(2, count) pstmt.execute() } }catch { case e: Exception => e.printStackTrace() }finally { // step4. 关闭连接 if(null != pstmt) pstmt.close() if(null != conn) conn.close() } } }
案例九:将RDD数据保存至MySQL表中高级模式
要求:a. 对结果数据降低分区数目
b. 针对每个分区数据进行操作
每个分区数据插入数据库时,创建一个连接Connection
c. 批次插入每个分区数据
addBatch
executeBatch
d. 事务性
手动提交事务,并且还原原来事务
e. 考虑主键存在时,如何保存数据数据
存在,更新数据;不存在,插入数据
pom.xml
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> <hbase.version>1.2.0-cdh5.16.2</hbase.version> <mysql.version>8.0.19</mysql.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- HBase Client 依赖 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <!-- MySQL Client 依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp --> <dependency> <groupId>com.hankcs</groupId> <artifactId>hanlp</artifactId> <version>portable-1.7.7</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
代码实现:
object _04SparkWriteMySQLV3 { def main(args: Array[String]): Unit = { // 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行 val sc: SparkContext = { // 创建SparkConf对象,设置应用相关信息,比如名称和master val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // 构建SparkContext实例对象,传递SparkConf new SparkContext(sparkConf) } // 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中 val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data") // 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey val resultRDD: RDD[(String, Int)] = inputRDD // TODO: 过滤 .filter(line => null != line && line.trim.length > 0 ) // a. 对每行数据按照分割符分割 .flatMap(line => line.trim.split("\\s+")) // b. 将每个单词转换为二元组,表示出现一次 .map(word => (word ,1)) .reduceByKey((temp, item) => temp + item) // TODO: 将结果数据resultRDD保存至MySQL表中 resultRDD.coalesce(1).foreachPartition(saveToMySQL) // 4. 当应用运行结束以后,关闭资源 sc.stop() } /** * 定义一个方法,将RDD中分区数据保存至MySQL表 */ def saveToMySQL(iter: Iterator[(String, Int)]): Unit = { // step1. 加载驱动类 Class.forName("com.mysql.cj.jdbc.Driver") // 声明变量 var conn: Connection = null var pstmt: PreparedStatement = null try{ // step2. 创建连接 conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", "root", "123456" ) pstmt = conn.prepareStatement("replace INTO db_test.tb_wordcount (word, count) VALUES(?, ?)") // TODO: 考虑事务性,一个分区数据要全部保存,要不都不保存 val autoCommit: Boolean = conn.getAutoCommit // 获取数据库默认事务提交方式 conn.setAutoCommit(false) // step3. 插入数据 iter.foreach{case (word, count) => pstmt.setString(1, word) pstmt.setInt(2, count) // TODO: 加入一个批次中 pstmt.addBatch() } // TODO:批量执行批次 pstmt.executeBatch() conn.commit() // 手动提交事务,进行批量插入 // 还原数据库原来事务 conn.setAutoCommit(autoCommit) }catch { case e: Exception => e.printStackTrace() }finally { // step4. 关闭连接 if(null != pstmt) pstmt.close() if(null != conn) conn.close() } } }
案例十:从HBase 表中读取数据,封装到RDD数据集
pom.xml
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> <hbase.version>1.2.0-cdh5.16.2</hbase.version> <mysql.version>8.0.19</mysql.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- HBase Client 依赖 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <!-- MySQL Client 依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp --> <dependency> <groupId>com.hankcs</groupId> <artifactId>hanlp</artifactId> <version>portable-1.7.7</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
代码实现:
object _03SparkReadHBase { def main(args: Array[String]): Unit = { // 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行 val sc: SparkContext = { // 创建SparkConf对象,设置应用相关信息,比如名称和master val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // TODO: 设置使用Kryo 序列化方式 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // TODO: 注册序列化的数据类型 .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result])) // 构建SparkContext实例对象,传递SparkConf new SparkContext(sparkConf) } // TODO: 从HBase表读取数据,调用RDD方法:newAPIHadoopRDD val conf: Configuration = HBaseConfiguration.create() // 设置连接Zookeeper属性 conf.set("hbase.zookeeper.quorum", "node1") conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("zookeeper.znode.parent", "/hbase") // 设置将数据保存的HBase表的名称 conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount") val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD( conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result] ) // 打印HBase表样本数据 hbaseRDD .take(6) .foreach{case (rowKey, result) => result.rawCells().foreach{cell => println(s"RowKey = ${Bytes.toString(result.getRow)}") println(s"\t${Bytes.toString(CellUtil.cloneFamily(cell))}:" + s"${Bytes.toString(CellUtil.cloneQualifier(cell))} = " + s"${Bytes.toString(CellUtil.cloneValue(cell))}") } } // 5. 当应用运行结束以后,关闭资源 sc.stop() } }
案例十一:将RDD数据保存至HBase表中
pom.xml
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> <hbase.version>1.2.0-cdh5.16.2</hbase.version> <mysql.version>8.0.19</mysql.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- HBase Client 依赖 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <!-- MySQL Client 依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp --> <dependency> <groupId>com.hankcs</groupId> <artifactId>hanlp</artifactId> <version>portable-1.7.7</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
代码实现:
object _02SparkWriteHBase { def main(args: Array[String]): Unit = { // 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行 val sc: SparkContext = { // 创建SparkConf对象,设置应用相关信息,比如名称和master val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // 构建SparkContext实例对象,传递SparkConf new SparkContext(sparkConf) } // 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中 val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data") // 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey val resultRDD: RDD[(String, Int)] = inputRDD // 过滤 .filter(line => null != line && line.trim.length > 0 ) // a. 对每行数据按照分割符分割 .flatMap(line => line.trim.split("\\s+")) // b. 将每个单词转换为二元组,表示出现一次 .map(word => (word ,1)) .reduceByKey((temp, item) => temp + item) // TODO: step 1. 转换RDD为RDD[(RowKey, Put)] /* * HBase表的设计: * 表的名称:htb_wordcount * Rowkey: word * 列簇: info * 字段名称: count create 'htb_wordcount', 'info' */ val putsRDD: RDD[(ImmutableBytesWritable, Put)] = resultRDD.map{case (word, count) => // 其一、构建RowKey对象 val rowKey: ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(word)) // 其二、构建Put对象 val put: Put = new Put(rowKey.get()) // 设置字段的值 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(count + "")) // 其三、返回二元组(RowKey, Put) rowKey -> put } // TODO: step2. 调用RDD中saveAsNewAPIHadoopFile保存数据 val conf: Configuration = HBaseConfiguration.create() // 设置连接Zookeeper属性 conf.set("hbase.zookeeper.quorum", "node1") conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("zookeeper.znode.parent", "/hbase") // 设置将数据保存的HBase表的名称 conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount") putsRDD.saveAsNewAPIHadoopFile( "datas/hbase/htb_wordcount/", classOf[ImmutableBytesWritable], classOf[Put], classOf[TableOutputFormat[ImmutableBytesWritable]], conf ) // 5. 当应用运行结束以后,关闭资源 sc.stop() } }