大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL DuckDB 分析主实例,集群系列 8核16GB
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成的内容如下:


Spark案例编写 Scala

计算圆周率

找共同的好友

Super Word Count

需求背景

  • 给定一段文本
  • 将单词全部转换为小写
  • 去除标点符号
  • 去除停用词
  • count值降序保存
  • 结果保存到MySQL
  • 额外要求:标点符合和停用词可以自定义

编写代码

先实现到MySQL保存前的内容,我们需要先编写测试一下我们的代码是否正确

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SuperWordCount1 {

  private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+")

  private val punctuation = "[\\)\\.,:;'!\\?]"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("ScalaSuperWordCount1")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val lines: RDD[String] = sc.textFile(args(0))
    lines
      .flatMap(_.split("\\s+"))
      .map(_.toLowerCase)
      .map(_.replaceAll(punctuation, ""))
      .filter(word => !stopWords.contains(word) && word.trim.nonEmpty)
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, false)
      .collect()
      .foreach(println)
    
    sc.stop()
  }

}

详细解释

object SuperWordCount1 { … }

SuperWordCount1 是一个 Scala 对象,定义了一个单例对象用于运行单词计数程序。

private val stopWords = “in on to from by a an the is are were was i we you your he his”.split(“\s+”)

这里定义了一个 stopWords 列表,包含了常见的停用词,这些词在统计单词频率时会被过滤掉。

split(“\s+”) 方法将这些停用词用空白字符分割成数组,便于后续的查找和过滤。

private val punctuation = “[\)\.,:;'!\?]”

定义了一个正则表达式 punctuation,用于匹配常见的标点符号。这些标点符号在统计单词频率时会被去除。

def main(args: Array[String]): Unit = { … }

main 方法是程序的入口点,args 是命令行参数,其中 args(0) 通常表示输入文件的路径。

val conf = new SparkConf().setAppName(“ScalaSuperWordCount1”).setMaster(“local[*]”)

SparkConf() 用于配置 Spark 应用程序。setAppName(“ScalaSuperWordCount1”) 设置应用程序的名称。

setMaster(“local[*]”) 指定应用程序以本地模式运行,使用所有可用的 CPU 核心。

val sc = new SparkContext(conf)

SparkContext 是 Spark 应用程序的核心,用于与 Spark 集群进行交互。

sc.setLogLevel(“WARN”)

设置日志级别为 “WARN”,减少日志输出,方便查看重要信息。

val lines: RDD[String] = sc.textFile(args(0))

sc.textFile(args(0)) 从指定的文本文件路径加载数据,创建一个 RDD[String],其中每一行文本都作为一个字符串元素。

lines 是包含输入文本数据的 RDD。

flatMap(_.split(“\s+”))

flatMap 方法将每一行字符串按空白字符拆分成单词,并将其展开成单个单词的 RDD。

map(_.toLowerCase)

将每个单词转换为小写,以确保统计时不区分大小写。

map(_.replaceAll(punctuation, “”))

使用正则表达式 punctuation 去除单词中的标点符号,使得统计结果更加准确。

filter(word => !stopWords.contains(word) && word.trim.nonEmpty)

filter 方法过滤掉停用词和空白单词:

!stopWords.contains(word) 确保当前单词不在停用词列表中。

word.trim.nonEmpty 确保单词在去除前后空白字符后不是空字符串。

map((_, 1))

将每个单词映射为 (word, 1) 的键值对,表示每个单词出现一次。

reduceByKey(_ + _)

reduceByKey 方法根据键(单词)对值(计数)进行累加,统计每个单词的总出现次数。

sortBy(_._2, false)

将统计结果按值(单词出现的次数)从大到小排序。

collect().foreach(println)

collect() 方法将 RDD 中的数据收集到驱动程序中(即本地),然后使用 foreach(println) 输出每个单词及其出现的次数。

由于 collect 会将数据从分布式环境中拉到本地,需要注意数据量大的情况下可能导致内存不足的问题。

sc.stop()

在计算完成后,调用 sc.stop() 方法停止 SparkContext,释放资源。

添加依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.28</version>
</dependency>

同时我们需要在build的部分,也要加入对应的内容,让驱动可以加载进来:

<build>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>4.4.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.3.0</version>
            <configuration>
                <archive>
                    <manifest>
                        <mainClass>cn.lagou.sparkcore.WordCount</mainClass>
                    </manifest>
                </archive>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

创建库表

我们新建一个数据库,也要新建一个数据表


CREATE TABLE `wordcount` (
  `word` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

写入SQL-未优化

我们在 foreach 中保存了数据,此时需要创建大量的MySQL连接,效率是比较低的。

package icu.wzk

import com.mysql.cj.xdevapi.PreparableStatement
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{Connection, DriverManager, PreparedStatement}

object SuperWordCount2 {

  private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+")

  private val punctuation = "[\\)\\.,:;'!\\?]"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("ScalaSuperWordCount2")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val lines: RDD[String] = sc.textFile(args(0))

    val words: RDD[String] = lines
      .flatMap(_.split("\\s+"))
      .map(_.trim.toLowerCase())

    val clearWords: RDD[String] = words
      .filter(!stopWords.contains(_))
      .map(_.replaceAll(punctuation, ""))

    val result: RDD[(String, Int)] = clearWords
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, false)
    result.foreach(println)

    // 输出到 MySQL
    val username = "hive"
    val password = "hive@wzk.icu"
    val url = "jdbc:mysql://h122.wzk.icu:3306/spark-test?useUnicode=true&characterEncoding=utf-8&useSSL=false"

    var conn: Connection = null
    var stmt: PreparedStatement = null
    var sql = "insert into wordcount values(?, ?)"

    result.foreach{
      case (word, count) => try {
        conn = DriverManager.getConnection(url, username, password)
        stmt = conn.prepareStatement(sql)
        stmt.setString(1, word)
        stmt.setInt(2, count)
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (stmt != null) {
          stmt.close()
        }
        if (conn != null) {
          conn.close()
        }
      }
    }

    sc.stop()
  }

}

写入SQL-优化版

优化后使用 foreachPartition 保存数据,一个分区创建一个链接:cache RDD

注意:

  • SparkSQL 有方便的读写MySQL的方法,给参数直接调用即可
  • 但掌握这个方法很重要,因为SparkSQL不是支持所有类型的数据库
package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{Connection, DriverManager, PreparedStatement}

object SuperWordCount3 {

  private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+")
  private val punctuation = "[\\)\\.,:;'!\\?]"
  private val username = "hive"
  private val password = "hive@wzk.icu"
  private val url = "jdbc:mysql://h122.wzk.icu:3306/spark-test?useUnicode=true&characterEncoding=utf-8&useSSL=false"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("ScalaSuperWordCount2")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val lines: RDD[String] = sc.textFile(args(0))

    val words: RDD[String] = lines
      .flatMap(_.split("\\s+"))
      .map(_.trim.toLowerCase())

    val clearWords: RDD[String] = words
      .filter(!stopWords.contains(_))
      .map(_.replaceAll(punctuation, ""))

    val result: RDD[(String, Int)] = clearWords
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, false)
    result.foreach(println)

    result.foreachPartition(saveAsMySQL)

    sc.stop()
  }

  def saveAsMySQL(iter: Iterator[(String, Int)]): Unit = {
    var conn: Connection = null
    var stmt: PreparedStatement = null
    var sql = "insert into wordcount values(?, ?)"

    try {
      conn = DriverManager.getConnection(url, username, password)
      stmt = conn.prepareStatement(sql)
      iter.foreach{
        case (word, count) =>
          stmt.setString(1, word)
          stmt.setInt(2, count)
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (stmt != null) {
        stmt.close()
      }
      if (conn != null) {
        conn.close()
      }
    }
  }

}

打包上传

mvn clean package

打包并上传到项目:

运行项目

不写入SQL版

不写入SQL版

spark-submit --master local[*] --class icu.wzk.SuperWordCount1 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java

运行结果如下图:

写入SQL-未优化版

spark-submit --master local[*] --class icu.wzk.SuperWordCount2 spark-wordcount-1.0-SNA

写入SQL-优化版

spark-submit --master local[*] --class icu.wzk.SuperWordCount3 spark-wordcount-1.0-SN

运行结果如下图:

查看数据

查看数据库,内容如下:

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
8月前
|
存储 关系型数据库 MySQL
大数据新视界--大数据大厂之MySQL 数据库课程设计:开启数据宇宙的传奇之旅
本文全面剖析数据库课程设计 MySQL,展现其奇幻魅力与严峻挑战。通过实际案例凸显数据库设计重要性,详述数据安全要点及学习目标。深入阐述备份与恢复方法,并分享优秀实践项目案例。为开发者提供 MySQL 数据库课程设计的全面指南,助力提升数据库设计与管理能力,保障数据安全稳定。
大数据新视界--大数据大厂之MySQL 数据库课程设计:开启数据宇宙的传奇之旅
|
7月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
6月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
317 0
|
9月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
413 79
|
7月前
|
存储 关系型数据库 MySQL
大数据新视界 --面向数据分析师的大数据大厂之 MySQL 基础秘籍:轻松创建数据库与表,踏入大数据殿堂
本文详细介绍了在 MySQL 中创建数据库和表的方法。包括安装 MySQL、用命令行和图形化工具创建数据库、选择数据库、创建表(含数据类型介绍与选择建议、案例分析、最佳实践与注意事项)以及查看数据库和表的内容。文章专业、严谨且具可操作性,对数据管理有实际帮助。
大数据新视界 --面向数据分析师的大数据大厂之 MySQL 基础秘籍:轻松创建数据库与表,踏入大数据殿堂
|
8月前
|
关系型数据库 MySQL 数据安全/隐私保护
大数据新视界--大数据大厂之MySQL 数据库课程设计:数据安全深度剖析与未来展望
本文深入探讨数据库课程设计 MySQL 的数据安全。以医疗、电商、企业案例,详述用户管理、数据加密、备份恢复及网络安全等措施,结合数据安全技术发展趋势,与《大数据新视界 -- 大数据大厂之 MySQL 数据库课程设计》紧密关联,为 MySQL 数据安全提供全面指南。
大数据新视界--大数据大厂之MySQL 数据库课程设计:数据安全深度剖析与未来展望
|
8月前
|
负载均衡 算法 关系型数据库
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL集群架构负载均衡故障排除与解决方案
本文深入探讨 MySQL 集群架构负载均衡的常见故障及排除方法。涵盖请求分配不均、节点无法响应、负载均衡器故障等现象,介绍多种负载均衡算法及故障排除步骤,包括检查负载均衡器状态、调整算法、诊断修复节点故障等。还阐述了预防措施与确保系统稳定性的方法,如定期监控维护、备份恢复策略、团队协作与知识管理等。为确保 MySQL 数据库系统高可用性提供全面指导。
|
8月前
|
SQL 关系型数据库 MySQL
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL 数据库 SQL 语句调优方法详解(2-1)
本文深入介绍 MySQL 数据库 SQL 语句调优方法。涵盖分析查询执行计划,如使用 EXPLAIN 命令及理解关键指标;优化查询语句结构,包括避免子查询、减少函数使用、合理用索引列及避免 “OR”。还介绍了索引类型知识,如 B 树索引、哈希索引等。结合与 MySQL 数据库课程设计相关文章,强调 SQL 语句调优重要性。为提升数据库性能提供实用方法,适合数据库管理员和开发人员。
|
8月前
|
关系型数据库 MySQL 大数据
大数据新视界--大数据大厂之MySQL 数据库课程设计:MySQL 数据库 SQL 语句调优的进阶策略与实际案例(2-2)
本文延续前篇,深入探讨 MySQL 数据库 SQL 语句调优进阶策略。包括优化索引使用,介绍多种索引类型及避免索引失效等;调整数据库参数,如缓冲池、连接数和日志参数;还有分区表、垂直拆分等其他优化方法。通过实际案例分析展示调优效果。回顾与数据库课程设计相关文章,强调全面认识 MySQL 数据库重要性。为读者提供综合调优指导,确保数据库高效运行。
|
Scala 分布式计算 Spark
安装Scala-2.11.7——集群学习日记
安装Scala-2.11.7——集群学习日记 前言 在安装Spark之前,我们需要安装Scala语言的支持。在此我选择的是scala-2.11.7版本。 scala-2.11.7下载 为了方便,我现在我的SparkMaster主机上先安装,把目录打开到/usr目录下,与我的Java目录相一致。
1764 0

热门文章

最新文章