点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(正在更新!)
章节内容
上节完成的内容如下:
Spark案例编写 Scala
计算圆周率
找共同的好友
Super Word Count
需求背景
- 给定一段文本
- 将单词全部转换为小写
- 去除标点符号
- 去除停用词
- count值降序保存
- 结果保存到MySQL
- 额外要求:标点符合和停用词可以自定义
编写代码
先实现到MySQL保存前的内容,我们需要先编写测试一下我们的代码是否正确
package icu.wzk import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SuperWordCount1 { private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+") private val punctuation = "[\\)\\.,:;'!\\?]" def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("ScalaSuperWordCount1") .setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val lines: RDD[String] = sc.textFile(args(0)) lines .flatMap(_.split("\\s+")) .map(_.toLowerCase) .map(_.replaceAll(punctuation, "")) .filter(word => !stopWords.contains(word) && word.trim.nonEmpty) .map((_, 1)) .reduceByKey(_ + _) .sortBy(_._2, false) .collect() .foreach(println) sc.stop() } }
详细解释
object SuperWordCount1 { … }
SuperWordCount1 是一个 Scala 对象,定义了一个单例对象用于运行单词计数程序。
private val stopWords = “in on to from by a an the is are were was i we you your he his”.split(“\s+”)
这里定义了一个 stopWords 列表,包含了常见的停用词,这些词在统计单词频率时会被过滤掉。
split(“\s+”) 方法将这些停用词用空白字符分割成数组,便于后续的查找和过滤。
private val punctuation = “[\)\.,:;'!\?]”
定义了一个正则表达式 punctuation,用于匹配常见的标点符号。这些标点符号在统计单词频率时会被去除。
def main(args: Array[String]): Unit = { … }
main 方法是程序的入口点,args 是命令行参数,其中 args(0) 通常表示输入文件的路径。
val conf = new SparkConf().setAppName(“ScalaSuperWordCount1”).setMaster(“local[*]”)
SparkConf() 用于配置 Spark 应用程序。setAppName(“ScalaSuperWordCount1”) 设置应用程序的名称。
setMaster(“local[*]”) 指定应用程序以本地模式运行,使用所有可用的 CPU 核心。
val sc = new SparkContext(conf)
SparkContext 是 Spark 应用程序的核心,用于与 Spark 集群进行交互。
sc.setLogLevel(“WARN”)
设置日志级别为 “WARN”,减少日志输出,方便查看重要信息。
val lines: RDD[String] = sc.textFile(args(0))
sc.textFile(args(0)) 从指定的文本文件路径加载数据,创建一个 RDD[String],其中每一行文本都作为一个字符串元素。
lines 是包含输入文本数据的 RDD。
flatMap(_.split(“\s+”))
flatMap 方法将每一行字符串按空白字符拆分成单词,并将其展开成单个单词的 RDD。
map(_.toLowerCase)
将每个单词转换为小写,以确保统计时不区分大小写。
map(_.replaceAll(punctuation, “”))
使用正则表达式 punctuation 去除单词中的标点符号,使得统计结果更加准确。
filter(word => !stopWords.contains(word) && word.trim.nonEmpty)
filter 方法过滤掉停用词和空白单词:
!stopWords.contains(word) 确保当前单词不在停用词列表中。
word.trim.nonEmpty 确保单词在去除前后空白字符后不是空字符串。
map((_, 1))
将每个单词映射为 (word, 1) 的键值对,表示每个单词出现一次。
reduceByKey(_ + _)
reduceByKey 方法根据键(单词)对值(计数)进行累加,统计每个单词的总出现次数。
sortBy(_._2, false)
将统计结果按值(单词出现的次数)从大到小排序。
collect().foreach(println)
collect() 方法将 RDD 中的数据收集到驱动程序中(即本地),然后使用 foreach(println) 输出每个单词及其出现的次数。
由于 collect 会将数据从分布式环境中拉到本地,需要注意数据量大的情况下可能导致内存不足的问题。
sc.stop()
在计算完成后,调用 sc.stop() 方法停止 SparkContext,释放资源。
添加依赖
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.28</version> </dependency>
同时我们需要在build的部分,也要加入对应的内容,让驱动可以加载进来:
<build> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>4.4.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <archive> <manifest> <mainClass>cn.lagou.sparkcore.WordCount</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
创建库表
我们新建一个数据库,也要新建一个数据表
CREATE TABLE `wordcount` ( `word` varchar(255) DEFAULT NULL, `count` int(11) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
写入SQL-未优化
我们在 foreach 中保存了数据,此时需要创建大量的MySQL连接,效率是比较低的。
package icu.wzk import com.mysql.cj.xdevapi.PreparableStatement import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import java.sql.{Connection, DriverManager, PreparedStatement} object SuperWordCount2 { private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+") private val punctuation = "[\\)\\.,:;'!\\?]" def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("ScalaSuperWordCount2") .setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val lines: RDD[String] = sc.textFile(args(0)) val words: RDD[String] = lines .flatMap(_.split("\\s+")) .map(_.trim.toLowerCase()) val clearWords: RDD[String] = words .filter(!stopWords.contains(_)) .map(_.replaceAll(punctuation, "")) val result: RDD[(String, Int)] = clearWords .map((_, 1)) .reduceByKey(_ + _) .sortBy(_._2, false) result.foreach(println) // 输出到 MySQL val username = "hive" val password = "hive@wzk.icu" val url = "jdbc:mysql://h122.wzk.icu:3306/spark-test?useUnicode=true&characterEncoding=utf-8&useSSL=false" var conn: Connection = null var stmt: PreparedStatement = null var sql = "insert into wordcount values(?, ?)" result.foreach{ case (word, count) => try { conn = DriverManager.getConnection(url, username, password) stmt = conn.prepareStatement(sql) stmt.setString(1, word) stmt.setInt(2, count) } catch { case e: Exception => e.printStackTrace() } finally { if (stmt != null) { stmt.close() } if (conn != null) { conn.close() } } } sc.stop() } }
写入SQL-优化版
优化后使用 foreachPartition 保存数据,一个分区创建一个链接:cache RDD
注意:
- SparkSQL 有方便的读写MySQL的方法,给参数直接调用即可
- 但掌握这个方法很重要,因为SparkSQL不是支持所有类型的数据库
package icu.wzk import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import java.sql.{Connection, DriverManager, PreparedStatement} object SuperWordCount3 { private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+") private val punctuation = "[\\)\\.,:;'!\\?]" private val username = "hive" private val password = "hive@wzk.icu" private val url = "jdbc:mysql://h122.wzk.icu:3306/spark-test?useUnicode=true&characterEncoding=utf-8&useSSL=false" def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("ScalaSuperWordCount2") .setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val lines: RDD[String] = sc.textFile(args(0)) val words: RDD[String] = lines .flatMap(_.split("\\s+")) .map(_.trim.toLowerCase()) val clearWords: RDD[String] = words .filter(!stopWords.contains(_)) .map(_.replaceAll(punctuation, "")) val result: RDD[(String, Int)] = clearWords .map((_, 1)) .reduceByKey(_ + _) .sortBy(_._2, false) result.foreach(println) result.foreachPartition(saveAsMySQL) sc.stop() } def saveAsMySQL(iter: Iterator[(String, Int)]): Unit = { var conn: Connection = null var stmt: PreparedStatement = null var sql = "insert into wordcount values(?, ?)" try { conn = DriverManager.getConnection(url, username, password) stmt = conn.prepareStatement(sql) iter.foreach{ case (word, count) => stmt.setString(1, word) stmt.setInt(2, count) } } catch { case e: Exception => e.printStackTrace() } finally { if (stmt != null) { stmt.close() } if (conn != null) { conn.close() } } } }
打包上传
mvn clean package
打包并上传到项目:
运行项目
不写入SQL版
不写入SQL版
spark-submit --master local[*] --class icu.wzk.SuperWordCount1 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java
运行结果如下图:
写入SQL-未优化版
spark-submit --master local[*] --class icu.wzk.SuperWordCount2 spark-wordcount-1.0-SNA
写入SQL-优化版
spark-submit --master local[*] --class icu.wzk.SuperWordCount3 spark-wordcount-1.0-SN
运行结果如下图:
查看数据
查看数据库,内容如下: