scala版本和spark版本的对应关系
Spark 2.x 系列支持 Scala 2.11 和 Scala 2.12。Spark 3.x 系列支持 Scala 2.12 和 Scala 2.13。
Spark 的版本与 Scala 的版本对应关系如下表所示:
Spark 版本 | Scala 版本 |
2.4.x | 2.11 |
2.4.x | 2.12 |
3.0.x | 2.12 |
3.0.x | 2.13 |
3.1.x | 2.12 |
3.1.x | 2.13 |
3.2.x | 2.12 |
3.2.x | 2.13 |
因此,如果您要使用 Spark 3.2.1,则需要使用 Scala 2.12 或 Scala 2.13。
需要注意的是,Spark 和 Scala 的版本对应关系并不是强制的。但尽量保持 Spark 和 Scala 版本的兼容,否则可能会遇到一些问题。
引入依赖
使用 sbt 引入依赖可以使用以下步骤:
- 在项目的根目录下创建
build.sbt
文件。 - 在
build.sbt
文件中添加依赖。
以下是 build.sbt
文件的示例:
scalaVersion := "2.13.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "3.2.1", "org.apache.spark" %% "spark-sql" % "3.2.1" )
上述代码添加了 Spark 的 spark-core
和 spark-sql
依赖。
libraryDependencies
属性用于指定项目的依赖。Seq()
函数用于创建一个列表。+=
运算符用于将依赖添加到列表中。
依赖的格式如下:
groupId %% artifactId % version
groupId
是依赖的组织 ID。artifactId
是依赖的项目 ID。version
是依赖的版本。
您可以使用 %%
关键字来指定依赖的版本。例如,以下代码指定了 Spark 的 3.2.1 版本:
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.1"
您还可以使用 provided
关键字来指定依赖不是打包到 JAR 文件中。例如,以下代码指定了 Spark 依赖不是打包到 JAR 文件中:
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.1" % "provided"
在这种情况下,Spark 依赖将在运行 Spark 应用程序时由 Spark 框架提供。
要查看项目的依赖,可以使用 sbt dependencyTree
命令:
sbt dependencyTree
该命令将打印出项目的依赖树。
在scala中使用spark-sql
在 Scala 中使用 Spark SQL 通常涉及创建 SparkSession
对象,这是 Spark 2.0 版本以后引入的一种新的入口,用于使用 Spark 的各种功能,包括 Spark SQL。以下是一个简单的 Scala 示例,演示如何使用 Spark SQL 进行一些基本的操作。
首先,确保你的项目中添加了 Spark SQL 的依赖。如果使用 sbt,可以在 build.sbt
文件中添加以下依赖:
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
这里假设你正在使用 Spark 3.0.0 版本。请根据你的实际情况调整版本号。
然后,你可以按照以下步骤在 Scala 中使用 Spark SQL:
- 「导入 Spark SQL 相关的类:」
import org.apache.spark.sql.{SparkSession, Row} // 如果需要使用DataFrame的DSL风格,还需要导入以下: import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._
- 「创建 SparkSession 对象:」
val spark = SparkSession.builder .appName("SparkSQLExample") .master("local") .getOrCreate()
这里的 "SparkSQLExample" 是你的应用程序的名称,"local" 表示在本地运行。
- 「创建 DataFrame:」
val data = Seq(("John", 25), ("Jane", 30), ("Doe", 40)) val schema = List("Name", "Age") val df = spark.createDataFrame(data).toDF(schema: _*)
这里创建了一个简单的 DataFrame,包含两列 "Name" 和 "Age"。
- 「使用 Spark SQL 进行查询:」
df.createOrReplaceTempView("people") val result = spark.sql("SELECT * FROM people WHERE Age >= 30") result.show()
这里首先使用 createOrReplaceTempView
方法将 DataFrame 注册为一个临时表 "people",然后使用 Spark SQL 进行查询。
如果你更喜欢使用 DataFrame 的 DSL 风格,你也可以这样写:
val resultDF = df.filter(col("Age") >= 30)resultDF.show()
- 「关闭 SparkSession:」
spark.stop()
这个简单的例子演示了如何在 Scala 中使用 Spark SQL 进行基本的数据查询。在实际应用中,你可能需要连接到外部数据源、执行更复杂的查询和使用更多的 Spark SQL 功能。希望这个例子能够帮助你入门在 Scala 中使用 Spark SQL。
利用spark进行简单的sql操作和数据验证
spark可以直接在hadoop集群中连接hive定义的数据库元数据,所以我们可以使用spark进行快速分析。
执行简单的sql
定义三个表,然后我们批量
val spark = SparkSession.builder .appName("SparkSQLExample") .master("local") .getOrCreate() val tables = List("t1", "t2", "t2") println(tables.length) for (t <- tables){ println(t) } println("1、检查表的最新账期和数量") for (t <- tables) { val sql = "select day_id,count(*) as cnt from "+t+" group by day_id order by day_id desc" val ans = spark.sql(sql) println(t+" 表") ans.select("day_id", "cnt").show(20) }
SparkSession的常用API
常见的成员变量
SparkSession 的成员变量如下:
- 「catalog」:代表 Spark 的元数据存储系统,用于存储数据库、表、视图等元数据信息。
- 「conf」:代表 Spark 的配置信息,包括 Spark 的运行模式、内存配置、调度器等信息。
- 「emptyDataFrame」:代表一个空的 DataFrame,用于创建新 DataFrame 或初始化 DataFrame 操作。
- 「sessionState」:代表 SparkSession 的会话状态,包括 SparkSession 的创建时间、运行时间、当前操作等信息。
- 「sharedState」:代表 SparkSession 共享的状态信息,包括 SparkSession 的广播变量、累加器等信息。
- 「sparkContext」:代表 Spark 的上下文,用于访问 Spark 的资源,如 RDD、DAG、任务等。
- 「sqlContext」:代表 Spark SQL 上下文,用于执行 SQL 查询。
具体来说:
- 「catalog」 是 Spark 中非常重要的一个组件,它用于存储 Spark 的数据库、表、视图等元数据信息。SparkSession 的 catalog 成员变量指向的就是 Spark 的元数据存储系统。
- 「conf」 是 Spark 的配置信息,它用于控制 Spark 的运行行为。SparkSession 的 conf 成员变量指向的就是 Spark 的配置信息。
- 「emptyDataFrame」 是一个空的 DataFrame,它用于创建新 DataFrame 或初始化 DataFrame 操作。SparkSession 的 emptyDataFrame 成员变量指向的就是这个空的 DataFrame。
- 「sessionState」 是 SparkSession 的会话状态,它用于跟踪 SparkSession 的运行情况。SparkSession 的 sessionState 成员变量指向的就是这个会话状态。
- 「sharedState」 是 SparkSession 共享的状态信息,它用于存储 SparkSession 中共享的资源,如广播变量、累加器等。SparkSession 的 sharedState 成员变量指向的就是这个共享状态。
- 「sparkContext」 是 Spark 的上下文,它用于访问 Spark 的资源,如 RDD、DAG、任务等。SparkSession 的 sparkContext 成员变量指向的就是这个上下文。
- 「sqlContext」 是 Spark SQL 上下文,它用于执行 SQL 查询。SparkSession 的 sqlContext 成员变量指向的就是这个上下文。
sql函数
在Scala中使用Spark的spark.sql()
方法同样有多个重载函数。以下是一些常见的用法和示例:
1. 执行简单的SQL查询:
// 使用字符串形式的SQL查询 val resultDF = spark.sql("SELECT * FROM table_name")
2. 注册临时表并执行查询:
// 注册DataFrame为临时表 df.createOrReplaceTempView("my_temp_table") // 执行查询 val resultDF = spark.sql("SELECT * FROM my_temp_table")
3. 使用参数化的SQL查询:
// 使用问号占位符 val paramValue = "some_value" val resultDF = spark.sql("SELECT * FROM table_name WHERE column_name = ?", paramValue)
在Spark中,对于参数化的SQL查询,你可以使用?
作为占位符,并按照参数的顺序将参数传递给spark.sql()
方法。如果有多个参数,你可以在SQL查询字符串中使用多个占位符,并按照相应的顺序传递参数。
以下是一个使用多个参数的例子:
// 假设有两个参数 val param1 = "value1" val param2 = 42 // 使用两个问号占位符 val sqlQuery = "SELECT * FROM table_name WHERE column1 = ? AND column2 = ?" // 传递参数并执行查询 val resultDF = spark.sql(sqlQuery, param1, param2)
在这个例子中,sqlQuery
字符串中有两个问号占位符,分别对应两个参数。当调用spark.sql()
方法时,依次传递了param1
和param2
,它们会替换SQL查询中的占位符。
注意:确保参数的顺序和数量与SQL查询中的占位符一致,以避免错误。此外,Spark SQL的占位符是按照1-based索引的,而不是0-based索引。
您提供的图像中的函数如下:
「@time [T] (f: => T)」
是一个装饰器函数,用于测量函数的执行时间。
这个函数将函数 f
的执行时间保存在一个 T
类型的变量中。例如,以下代码将函数 f
的执行时间保存在一个 Double
类型的变量 time
中:
@time [Double] def f(): Unit = { // ... } val time = f()
「baseRelationToDataFrame (baseRel: BaseRelation, schema: StructType)」
将一个 BaseRelation 转换为一个 DataFrame。
这个函数将一个 BaseRelation 转换为一个 DataFrame。BaseRelation 是 Spark SQL 中表示关系数据的抽象类。StructType 是 Spark SQL 中表示 DataFrame 列类型的结构体。
例如,以下代码将一个 Hive 表转换为一个 DataFrame:
val hiveTable = spark.sqlContext.sql("SELECT * FROM myTable") val df = hiveTable.baseRelationToDataFrame(hiveTable.schema)
「executeCommand (runner: String, command: String)」
这个函数用于执行一个命令。runner
参数指定命令的执行器,command
参数指定命令的字符串。
例如,以下代码将执行 echo "Hello, world!"
命令:
spark.executeCommand("echo", "Hello, world!")
「emptyDataset [T] (implicit evidence$1: Encoder [T])」
这个函数用于创建一个空的 DataFrame。T
参数指定 DataFrame 的列类型。evidence$1
参数是隐式的编码器,用于将 T
类型的值转换为 Spark SQL 的内部表示。
例如,以下代码将创建一个空的 String
类型 DataFrame:
spark.emptyDataset[String]
「experimental」
这个函数用于访问 Spark 的实验性功能。
例如,以下代码将使用实验性功能 spark.sql.shuffle.partitions
设置 Shuffle 分区的数量:
spark.experimental.sql.shuffle.partitions = 10
「listenerManager」
这个函数用于访问 Spark 的执行监听器管理器。执行监听器可以用来监控 Spark 的执行状态。
例如,以下代码将注册一个执行监听器,该监听器将在 Spark 作业完成时打印日志:
spark.listenerManager.register(new SparkListener { override def onJobEnd(job: SparkJob): Unit = { println(s"Job ${job.jobId} finished.") } })
「newSession()」
这个函数用于创建一个新的 SparkSession。
例如,以下代码将创建一个新的 SparkSession:
val newSparkSession = spark.newSession()
「ne (: AnyRef)」
这个函数用于判断两个对象是否相等。
例如,以下代码将判断两个字符串是否相等:
val str1 = "Hello, world!" val str2 = "Hello, world!" println(str1 ne str2) // false
「range (end: Long)」
这个函数用于创建一个从 0 到 end
的整数序列。
例如,以下代码将创建一个从 0 到 10 的整数序列:
val range = spark.range(10) println(range.collect()) // Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
「read (source: String, format: String)」
这个函数用于读取数据源,并将其转换为 DataFrame。source
参数指定数据源的 URL 或路径。format
参数指定数据源的格式。
例如,以下代码将读取一个 CSV 文件,并将其转换为 DataFrame:
val df = spark.read.csv("data.csv")
「readStream (source: String, format: String)」
这个函数用于读取流式数据源,并将其转换为 DataStreamWriter。source
参数指定流式数据源的 URL 或路径。format
参数指定流式数据源的格式。
例如,以下代码将读取一个 Kafka 流,并将其转换为 DataStreamWriter:
val df = spark.readStream.kafka("localhost:9092", "my-topic")
「streams」
这个函数用于获取 Spark 的流式查询管理器。流式查询管理器用于管理流式查询。
例如,以下代码将创建一个流式查询:
val query = spark.streams.start(df.writeStream.format("console").start())
「table (tableName: String)」
这个函数用于获取指定名称的表。
例如,以下代码将获取名为 myTable
的表:
val df = spark.table("myTable")
「udf (udf: UDF)」
在Spark中,sparkSession.udf
方法用于注册用户定义的函数(User Defined Function,简称UDF)。通过注册UDF,你可以在Spark SQL中使用自定义的函数来处理数据。以下是使用sparkSession.udf
方法的一般步骤:
- 「定义自定义函数:」 首先,你需要定义一个自定义函数,这可以是Scala或Java中的函数。这个函数将用于对DataFrame中的列进行转换或操作。
- 「注册UDF:」 使用
sparkSession.udf.register
方法将自定义函数注册为UDF。 - 「在SQL查询中使用UDF:」 通过SQL查询中的
SELECT
语句来使用注册的UDF。
下面是一个简单的示例,演示如何使用sparkSession.udf
:
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.udf // 创建SparkSession val spark = SparkSession.builder.appName("UDFExample").getOrCreate() // 创建一个示例DataFrame val data = Seq(("Alice", 25), ("Bob", 30), ("Charlie", 22)) val df = spark.createDataFrame(data).toDF("Name", "Age") // 1. 定义自定义函数 val squareUDF = udf((x: Int) => x * x) // 2. 注册UDF spark.udf.register("square", squareUDF) // 3. 在SQL查询中使用UDF val resultDF = spark.sql("SELECT Name, Age, square(Age) as AgeSquared FROM people") resultDF.show()
「version」
这个函数用于获取 SparkSession 的版本。
例如,以下代码将获取 SparkSession 的版本:
val version = spark.version
「implicits」
这个函数用于获取 SparkSession 的隐式转换。隐式转换可以用于简化 Spark 的编程。
例如,以下代码将使用隐式转换将一个 RDD 转换为 DataFrame:
val rdd = sc.parallelize(Seq(1, 2, 3)) val df = rdd.toDF("a")