在scala中使用spark

简介: 在scala中使用spark

scala版本和spark版本的对应关系

Spark 2.x 系列支持 Scala 2.11 和 Scala 2.12。Spark 3.x 系列支持 Scala 2.12 和 Scala 2.13。

Spark 的版本与 Scala 的版本对应关系如下表所示:

Spark 版本 Scala 版本
2.4.x 2.11
2.4.x 2.12
3.0.x 2.12
3.0.x 2.13
3.1.x 2.12
3.1.x 2.13
3.2.x 2.12
3.2.x 2.13

因此,如果您要使用 Spark 3.2.1,则需要使用 Scala 2.12 或 Scala 2.13。

需要注意的是,Spark 和 Scala 的版本对应关系并不是强制的。但尽量保持 Spark 和 Scala 版本的兼容,否则可能会遇到一些问题。

引入依赖

使用 sbt 引入依赖可以使用以下步骤:

  1. 在项目的根目录下创建 build.sbt 文件。
  2. build.sbt 文件中添加依赖。

以下是 build.sbt 文件的示例:

scalaVersion := "2.13.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.2.1",
  "org.apache.spark" %% "spark-sql" % "3.2.1"
)

上述代码添加了 Spark 的 spark-corespark-sql 依赖。

libraryDependencies 属性用于指定项目的依赖。Seq() 函数用于创建一个列表。+= 运算符用于将依赖添加到列表中。

依赖的格式如下:


groupId %% artifactId % version

groupId 是依赖的组织 ID。artifactId 是依赖的项目 ID。version 是依赖的版本。

您可以使用 %% 关键字来指定依赖的版本。例如,以下代码指定了 Spark 的 3.2.1 版本:


libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.1"

您还可以使用 provided 关键字来指定依赖不是打包到 JAR 文件中。例如,以下代码指定了 Spark 依赖不是打包到 JAR 文件中:


libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.1" % "provided"

在这种情况下,Spark 依赖将在运行 Spark 应用程序时由 Spark 框架提供。

要查看项目的依赖,可以使用 sbt dependencyTree 命令:


sbt dependencyTree

该命令将打印出项目的依赖树。

在scala中使用spark-sql

在 Scala 中使用 Spark SQL 通常涉及创建 SparkSession 对象,这是 Spark 2.0 版本以后引入的一种新的入口,用于使用 Spark 的各种功能,包括 Spark SQL。以下是一个简单的 Scala 示例,演示如何使用 Spark SQL 进行一些基本的操作。

首先,确保你的项目中添加了 Spark SQL 的依赖。如果使用 sbt,可以在 build.sbt 文件中添加以下依赖:


libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"

这里假设你正在使用 Spark 3.0.0 版本。请根据你的实际情况调整版本号。

然后,你可以按照以下步骤在 Scala 中使用 Spark SQL:

  1. 「导入 Spark SQL 相关的类:」
import org.apache.spark.sql.{SparkSession, Row}

// 如果需要使用DataFrame的DSL风格,还需要导入以下:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
  1. 「创建 SparkSession 对象:」
val spark = SparkSession.builder
  .appName("SparkSQLExample")
  .master("local")
  .getOrCreate()

这里的 "SparkSQLExample" 是你的应用程序的名称,"local" 表示在本地运行。

  1. 「创建 DataFrame:」
val data = Seq(("John", 25), ("Jane", 30), ("Doe", 40))
val schema = List("Name", "Age")

val df = spark.createDataFrame(data).toDF(schema: _*)

这里创建了一个简单的 DataFrame,包含两列 "Name" 和 "Age"。

  1. 「使用 Spark SQL 进行查询:」
df.createOrReplaceTempView("people")

val result = spark.sql("SELECT * FROM people WHERE Age >= 30")
result.show()

这里首先使用 createOrReplaceTempView 方法将 DataFrame 注册为一个临时表 "people",然后使用 Spark SQL 进行查询。

如果你更喜欢使用 DataFrame 的 DSL 风格,你也可以这样写:


val resultDF = df.filter(col("Age") >= 30)resultDF.show()
  1. 「关闭 SparkSession:」


spark.stop()

这个简单的例子演示了如何在 Scala 中使用 Spark SQL 进行基本的数据查询。在实际应用中,你可能需要连接到外部数据源、执行更复杂的查询和使用更多的 Spark SQL 功能。希望这个例子能够帮助你入门在 Scala 中使用 Spark SQL。

利用spark进行简单的sql操作和数据验证

spark可以直接在hadoop集群中连接hive定义的数据库元数据,所以我们可以使用spark进行快速分析。

执行简单的sql

定义三个表,然后我们批量

val spark = SparkSession.builder
  .appName("SparkSQLExample")
  .master("local")
  .getOrCreate()

val tables = List("t1", "t2", "t2")

println(tables.length)
for (t <- tables){
  println(t)
}
println("1、检查表的最新账期和数量")
for (t <- tables) {
  val sql = "select day_id,count(*) as cnt from "+t+" group by day_id order by day_id desc"
  val ans = spark.sql(sql)
  println(t+" 表")
  ans.select("day_id", "cnt").show(20)
}

SparkSession的常用API

常见的成员变量

SparkSession 的成员变量如下:

  • 「catalog」:代表 Spark 的元数据存储系统,用于存储数据库、表、视图等元数据信息。
  • 「conf」:代表 Spark 的配置信息,包括 Spark 的运行模式、内存配置、调度器等信息。
  • 「emptyDataFrame」:代表一个空的 DataFrame,用于创建新 DataFrame 或初始化 DataFrame 操作。
  • 「sessionState」:代表 SparkSession 的会话状态,包括 SparkSession 的创建时间、运行时间、当前操作等信息。
  • 「sharedState」:代表 SparkSession 共享的状态信息,包括 SparkSession 的广播变量、累加器等信息。
  • 「sparkContext」:代表 Spark 的上下文,用于访问 Spark 的资源,如 RDD、DAG、任务等。
  • 「sqlContext」:代表 Spark SQL 上下文,用于执行 SQL 查询。

具体来说:

  • 「catalog」 是 Spark 中非常重要的一个组件,它用于存储 Spark 的数据库、表、视图等元数据信息。SparkSession 的 catalog 成员变量指向的就是 Spark 的元数据存储系统。
  • 「conf」 是 Spark 的配置信息,它用于控制 Spark 的运行行为。SparkSession 的 conf 成员变量指向的就是 Spark 的配置信息。
  • 「emptyDataFrame」 是一个空的 DataFrame,它用于创建新 DataFrame 或初始化 DataFrame 操作。SparkSession 的 emptyDataFrame 成员变量指向的就是这个空的 DataFrame。
  • 「sessionState」 是 SparkSession 的会话状态,它用于跟踪 SparkSession 的运行情况。SparkSession 的 sessionState 成员变量指向的就是这个会话状态。
  • 「sharedState」 是 SparkSession 共享的状态信息,它用于存储 SparkSession 中共享的资源,如广播变量、累加器等。SparkSession 的 sharedState 成员变量指向的就是这个共享状态。
  • 「sparkContext」 是 Spark 的上下文,它用于访问 Spark 的资源,如 RDD、DAG、任务等。SparkSession 的 sparkContext 成员变量指向的就是这个上下文。
  • 「sqlContext」 是 Spark SQL 上下文,它用于执行 SQL 查询。SparkSession 的 sqlContext 成员变量指向的就是这个上下文。

sql函数

在Scala中使用Spark的spark.sql()方法同样有多个重载函数。以下是一些常见的用法和示例:

1. 执行简单的SQL查询:


// 使用字符串形式的SQL查询
val resultDF = spark.sql("SELECT * FROM table_name")

2. 注册临时表并执行查询:

// 注册DataFrame为临时表
df.createOrReplaceTempView("my_temp_table")

// 执行查询
val resultDF = spark.sql("SELECT * FROM my_temp_table")

3. 使用参数化的SQL查询:

// 使用问号占位符
val paramValue = "some_value"
val resultDF = spark.sql("SELECT * FROM table_name WHERE column_name = ?", paramValue)

在Spark中,对于参数化的SQL查询,你可以使用?作为占位符,并按照参数的顺序将参数传递给spark.sql()方法。如果有多个参数,你可以在SQL查询字符串中使用多个占位符,并按照相应的顺序传递参数。

以下是一个使用多个参数的例子:

// 假设有两个参数
val param1 = "value1"
val param2 = 42

// 使用两个问号占位符
val sqlQuery = "SELECT * FROM table_name WHERE column1 = ? AND column2 = ?"

// 传递参数并执行查询
val resultDF = spark.sql(sqlQuery, param1, param2)

在这个例子中,sqlQuery字符串中有两个问号占位符,分别对应两个参数。当调用spark.sql()方法时,依次传递了param1param2,它们会替换SQL查询中的占位符。

注意:确保参数的顺序和数量与SQL查询中的占位符一致,以避免错误。此外,Spark SQL的占位符是按照1-based索引的,而不是0-based索引。

您提供的图像中的函数如下:

「@time [T] (f: => T)」

是一个装饰器函数,用于测量函数的执行时间。

这个函数将函数 f 的执行时间保存在一个 T 类型的变量中。例如,以下代码将函数 f 的执行时间保存在一个 Double 类型的变量 time 中:

@time [Double] def f(): Unit = {
  // ...
}

val time = f()

「baseRelationToDataFrame (baseRel: BaseRelation, schema: StructType)」

将一个 BaseRelation 转换为一个 DataFrame。

这个函数将一个 BaseRelation 转换为一个 DataFrame。BaseRelation 是 Spark SQL 中表示关系数据的抽象类。StructType 是 Spark SQL 中表示 DataFrame 列类型的结构体。

例如,以下代码将一个 Hive 表转换为一个 DataFrame:

val hiveTable = spark.sqlContext.sql("SELECT * FROM myTable")
val df = hiveTable.baseRelationToDataFrame(hiveTable.schema)

「executeCommand (runner: String, command: String)」

这个函数用于执行一个命令。runner 参数指定命令的执行器,command 参数指定命令的字符串。

例如,以下代码将执行 echo "Hello, world!" 命令:


spark.executeCommand("echo", "Hello, world!")

「emptyDataset [T] (implicit evidence$1: Encoder [T])」

这个函数用于创建一个空的 DataFrame。T 参数指定 DataFrame 的列类型。evidence$1 参数是隐式的编码器,用于将 T 类型的值转换为 Spark SQL 的内部表示。

例如,以下代码将创建一个空的 String 类型 DataFrame:


spark.emptyDataset[String]

「experimental」

这个函数用于访问 Spark 的实验性功能。

例如,以下代码将使用实验性功能 spark.sql.shuffle.partitions 设置 Shuffle 分区的数量:


spark.experimental.sql.shuffle.partitions = 10

「listenerManager」

这个函数用于访问 Spark 的执行监听器管理器。执行监听器可以用来监控 Spark 的执行状态。

例如,以下代码将注册一个执行监听器,该监听器将在 Spark 作业完成时打印日志:

spark.listenerManager.register(new SparkListener {
  override def onJobEnd(job: SparkJob): Unit = {
    println(s"Job ${job.jobId} finished.")
  }
})

「newSession()」

这个函数用于创建一个新的 SparkSession。

例如,以下代码将创建一个新的 SparkSession:


val newSparkSession = spark.newSession()

「ne (: AnyRef)」

这个函数用于判断两个对象是否相等。

例如,以下代码将判断两个字符串是否相等:

val str1 = "Hello, world!"
val str2 = "Hello, world!"
println(str1 ne str2) // false

「range (end: Long)」

这个函数用于创建一个从 0 到 end 的整数序列。

例如,以下代码将创建一个从 0 到 10 的整数序列:

val range = spark.range(10)
println(range.collect()) // Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

「read (source: String, format: String)」

这个函数用于读取数据源,并将其转换为 DataFrame。source 参数指定数据源的 URL 或路径。format 参数指定数据源的格式。

例如,以下代码将读取一个 CSV 文件,并将其转换为 DataFrame:


val df = spark.read.csv("data.csv")

「readStream (source: String, format: String)」

这个函数用于读取流式数据源,并将其转换为 DataStreamWriter。source 参数指定流式数据源的 URL 或路径。format 参数指定流式数据源的格式。

例如,以下代码将读取一个 Kafka 流,并将其转换为 DataStreamWriter:


val df = spark.readStream.kafka("localhost:9092", "my-topic")

「streams」

这个函数用于获取 Spark 的流式查询管理器。流式查询管理器用于管理流式查询。

例如,以下代码将创建一个流式查询:


val query = spark.streams.start(df.writeStream.format("console").start())

「table (tableName: String)」

这个函数用于获取指定名称的表。

例如,以下代码将获取名为 myTable 的表:


val df = spark.table("myTable")


「udf (udf: UDF)」

在Spark中,sparkSession.udf方法用于注册用户定义的函数(User Defined Function,简称UDF)。通过注册UDF,你可以在Spark SQL中使用自定义的函数来处理数据。以下是使用sparkSession.udf方法的一般步骤:

  1. 「定义自定义函数:」 首先,你需要定义一个自定义函数,这可以是Scala或Java中的函数。这个函数将用于对DataFrame中的列进行转换或操作。
  2. 「注册UDF:」 使用sparkSession.udf.register方法将自定义函数注册为UDF。
  3. 「在SQL查询中使用UDF:」 通过SQL查询中的SELECT语句来使用注册的UDF。

下面是一个简单的示例,演示如何使用sparkSession.udf

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf

// 创建SparkSession
val spark = SparkSession.builder.appName("UDFExample").getOrCreate()

// 创建一个示例DataFrame
val data = Seq(("Alice", 25), ("Bob", 30), ("Charlie", 22))
val df = spark.createDataFrame(data).toDF("Name", "Age")

// 1. 定义自定义函数
val squareUDF = udf((x: Int) => x * x)

// 2. 注册UDF
spark.udf.register("square", squareUDF)

// 3. 在SQL查询中使用UDF
val resultDF = spark.sql("SELECT Name, Age, square(Age) as AgeSquared FROM people")

resultDF.show()

「version」

这个函数用于获取 SparkSession 的版本。

例如,以下代码将获取 SparkSession 的版本:


val version = spark.version

「implicits」

这个函数用于获取 SparkSession 的隐式转换。隐式转换可以用于简化 Spark 的编程。

例如,以下代码将使用隐式转换将一个 RDD 转换为 DataFrame:


val rdd = sc.parallelize(Seq(1, 2, 3))
val df = rdd.toDF("a")


目录
相关文章
|
SQL 消息中间件 分布式计算
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
734 0
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
|
18天前
|
分布式计算 Java Scala
spark 与 scala 的对应版本查看、在idea中maven版本不要选择17,弄了好久,换成11就可以啦
spark 与 scala 的对应版本查看、.在idea中maven版本不要选择17,弄了好久,换成11就可以啦
123 2
|
18天前
|
分布式计算 数据处理 Scala
Spark 集群和 Scala 编程语言的关系
Spark 集群和 Scala 编程语言的关系
41 0
|
18天前
|
分布式计算 Java Scala
Spark编程语言选择:Scala、Java和Python
Spark编程语言选择:Scala、Java和Python
Spark编程语言选择:Scala、Java和Python
|
存储 分布式计算 Scala
Spark-RDD 键值对的操作(Scala版)
Spark-RDD 键值对的操作(Scala版)
|
SQL 存储 JSON
人人都懂Spark-SQL基础操作(Scala版)
人人都懂Spark-SQL基础操作(Scala版)
|
JSON 分布式计算 算法
Spark-编程进阶(Scala版)
Spark-编程进阶(Scala版)
|
JSON 分布式计算 Hadoop
Spark-数据读取与保存(Scala版)
Spark-数据读取与保存(Scala版)
|
存储 缓存 分布式计算
Spark RDD编程基础(Scala版)
Spark RDD编程基础(Scala版)