创建 DataFrame
在 Scala 中,可以通过以下几种方式创建 DataFrame:
- 从现有的 RDD 转换而来。例如:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate() import spark.implicits._ case class Person(name: String, age: Int) val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30))) val df = rdd.toDF() df.show()
- 从外部数据源读取。例如,从 JSON 文件中读取数据并创建 DataFrame:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate() val df = spark.read.json("path/to/json/file") df.show()
- 通过编程方式创建。例如,使用
createDataFrame
方法:
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate() val schema = StructType( List( StructField("name", StringType, nullable = true), StructField("age", IntegerType, nullable = true) ) ) val data = Seq(Row("Alice", 25), Row("Bob", 30)) val rdd = spark.sparkContext.parallelize(data) val df = spark.createDataFrame(rdd, schema) df.show()
DSL & SQL
在 Spark 中,可以使用两种方式对 DataFrame 进行查询:DSL(Domain-Specific Language)和 SQL。
DSL 是一种特定领域语言,它提供了一组用于操作 DataFrame 的方法。例如,下面是一个使用 DSL 进行查询的例子:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate() import spark.implicits._ val df = Seq( ("Alice", 25), ("Bob", 30), ("Charlie", 35) ).toDF("name", "age") df.select("name", "age") .filter($"age" > 25) .show()
SQL 是一种结构化查询语言,它用于管理关系数据库系统。在 Spark 中,可以使用 SQL 对 DataFrame 进行查询。例如,下面是一个使用 SQL 进行查询的例子:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate() import spark.implicits._ val df = Seq( ("Alice", 25), ("Bob", 30), ("Charlie", 35) ).toDF("name", "age") df.createOrReplaceTempView("people") spark.sql("SELECT name, age FROM people WHERE age > 25").show()
DSL 和 SQL 的区别在于语法和风格。DSL 使用方法调用链来构建查询,而 SQL 使用声明式语言来描述查询。选择哪种方式取决于个人喜好和使用场景。
Spark SQL 数据源
Spark SQL 支持多种数据源,包括 Parquet、JSON、CSV、JDBC、Hive 等。
下面是示例代码:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("Data Sources Example").getOrCreate() // Parquet val df = spark.read.parquet("path/to/parquet/file") // JSON val df = spark.read.json("path/to/json/file") // CSV val df = spark.read.option("header", "true").csv("path/to/csv/file") // JDBC val df = spark.read .format("jdbc") .option("url", "jdbc:mysql://host:port/database") .option("dbtable", "table") .option("user", "username") .option("password", "password") .load() df.show()
load & save
在 Spark 中,load
函数用于从外部数据源读取数据并创建 DataFrame,而 save
函数用于将 DataFrame 保存到外部数据源。
下面是从 Parquet 文件中读取数据并创建 DataFrame 的示例代码:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate() val df = spark.read.load("path/to/parquet/file") df.show()
下面是将 DataFrame 保存到 Parquet 文件的示例代码:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate() import spark.implicits._ val df = Seq( ("Alice", 25), ("Bob", 30), ("Charlie", 35) ).toDF("name", "age") df.write.save("path/to/parquet/file")
函数
Spark SQL 提供了丰富的内置函数,包括数学函数、字符串函数、日期时间函数、聚合函数等。你可以在 Spark SQL 的官方文档中查看所有可用的内置函数。
此外,Spark SQL 还支持自定义函数(User-Defined Function,UDF),可以让用户编写自己的函数并在查询中使用。
下面是一个使用 SQL 语法编写自定义函数的示例代码:
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.udf val spark = SparkSession.builder.appName("UDF Example").getOrCreate() import spark.implicits._ val df = Seq( ("Alice", 25), ("Bob", 30), ("Charlie", 35) ).toDF("name", "age") df.createOrReplaceTempView("people") val square = udf((x: Int) => x * x) spark.udf.register("square", square) spark.sql("SELECT name, square(age) FROM people").show()
在这个示例中,我们首先定义了一个名为 square
的自定义函数,它接受一个整数参数并返回它的平方。然后,我们使用 createOrReplaceTempView
方法创建一个临时视图,并使用 udf.register
方法注册自定义函数。最后,我们使用 spark.sql
方法执行 SQL 查询,并在查询中调用自定义函数。
DataSet
DataSet 是 Spark 1.6 版本中引入的一种新的数据结构,它提供了 RDD 的强类型和 DataFrame 的查询优化能力。
创建DataSet
在 Scala 中,可以通过以下几种方式创建 DataSet:
- 从现有的 RDD 转换而来。例如:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("Create DataSet").getOrCreate() import spark.implicits._ case class Person(name: String, age: Int) val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30))) val ds = rdd.toDS() ds.show()
- 从外部数据源读取。例如,从 JSON 文件中读取数据并创建 DataSet:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("Create DataSet").getOrCreate() import spark.implicits._ case class Person(name: String, age: Long) val ds = spark.read.json("path/to/json/file").as[Person] ds.show()
- 通过编程方式创建。例如,使用
createDataset
方法:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("Create DataSet").getOrCreate() import spark.implicits._ case class Person(name: String, age: Int) val data = Seq(Person("Alice", 25), Person("Bob", 30)) val ds = spark.createDataset(data) ds.show()
DataSet和DataFrame区别
DataSet 和 DataFrame 都是 Spark 中用于处理结构化数据的数据结构。它们都提供了丰富的操作,包括筛选、聚合、分组、排序等。
它们之间的主要区别在于类型安全性。DataFrame 是一种弱类型的数据结构,它的列只有在运行时才能确定类型。这意味着,在编译时无法检测到类型错误,只有在运行时才会抛出异常。
而 DataSet 是一种强类型的数据结构,它的类型在编译时就已经确定。这意味着,如果你试图对一个不存在的列进行操作,或者对一个列进行错误的类型转换,编译器就会报错。
此外,DataSet 还提供了一些额外的操作,例如 map、flatMap、reduce 等。
RDD & DataFrame & Dataset 转化
RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换。
- DataFrame/Dataset转RDD
val rdd1=testDF.rdd val rdd2=testDS.rdd
- RDD转DataFrame
import spark.implicits._ case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型 val testDS = rdd.map {line=> Coltest(line._1,line._2) }.toDS
可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可。
- Dataset转DataFrame
import spark.implicits._ val testDF = testDS.toDF
- DataFrame转Dataset
import spark.implicits._ case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型 val testDS = testDF.as[Coltest]
这种方法就是在给出每一列的类型后,使用as
方法,转成Dataset,这在数据类型在DataFrame需要针对各个字段处理时极为方便。
注意:在使用一些特殊的操作时,一定要加上 import spark.implicits._
不然toDF
、toDS
无法使用。
Spark Streaming
Spark Streaming 的工作原理是将实时数据流拆分为小批量数据,并使用 Spark 引擎对这些小批量数据进行处理。这种微批处理(Micro-Batch Processing)的方式使得 Spark Streaming 能够以近乎实时的延迟处理大规模的数据流。
下面是一个简单的 Spark Streaming 示例代码:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} val conf = new SparkConf().setAppName("Spark Streaming Example") val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination()
我们首先创建了一个 StreamingContext
对象,并指定了批处理间隔为 1 秒。然后,我们使用 socketTextStream
方法从套接字源创建了一个 DStream。接下来,我们对 DStream 进行了一系列操作,包括 flatMap、map 和 reduceByKey。最后,我们使用 print
方法打印出单词计数的结果。
Spark Streaming 优缺点
Spark Streaming 作为一种实时流处理框架,具有以下优点:
- 高性能:Spark Streaming 基于 Spark 引擎,能够快速处理大规模的数据流。
- 易用性:Spark Streaming 提供了丰富的 API,可以让开发人员快速构建实时流处理应用。
- 容错性:Spark Streaming 具有良好的容错性,能够在节点故障时自动恢复。
- 集成性:Spark Streaming 能够与 Spark 生态系统中的其他组件(如 Spark SQL、MLlib 等)无缝集成。
但是,Spark Streaming 也有一些缺点:
- 延迟:由于 Spark Streaming 基于微批处理模型,因此它的延迟相对较高。对于需要极低延迟的应用场景,Spark Streaming 可能不是最佳选择。
- 复杂性:Spark Streaming 的配置和调优相对复杂,需要一定的经验和技能。
DStream
DStream(离散化流)是 Spark Streaming 中用于表示实时数据流的一种抽象。它由一系列连续的 RDD 组成,每个 RDD 包含一段时间内收集到的数据。
在 Spark Streaming 中,可以通过以下几种方式创建 DStream:
- 从输入源创建。例如,从套接字源创建 DStream:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} val conf = new SparkConf().setAppName("DStream Example") val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999) lines.print() ssc.start() ssc.awaitTermination()
- 通过转换操作创建。例如,对现有的 DStream 进行 map 操作:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} val conf = new SparkConf().setAppName("DStream Example") val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) words.print() ssc.start() ssc.awaitTermination()
- 通过连接操作创建。例如,对两个 DStream 进行 union 操作:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} val conf = new SparkConf().setAppName("DStream Example") val ssc = new StreamingContext(conf, Seconds(1)) val lines1 = ssc.socketTextStream("localhost", 9999) val lines2 = ssc.socketTextStream("localhost", 9998) val lines = lines1.union(lines2) lines.print() ssc.start() ssc.awaitTermination()
总结:简单来说 DStream 就是对 RDD 的封装,你对 DStream 进行操作,就是对 RDD 进行操作。对于 DataFrame/DataSet/DStream 来说本质上都可以理解成 RDD。
窗口函数
在 Spark Streaming 中,窗口函数用于对 DStream 中的数据进行窗口化处理。它允许你对一段时间内的数据进行聚合操作。
Spark Streaming 提供了多种窗口函数,包括:
window
:返回一个新的 DStream,它包含了原始 DStream 中指定窗口大小和滑动间隔的数据。countByWindow
:返回一个新的单元素 DStream,它包含了原始 DStream 中指定窗口大小和滑动间隔的元素个数。reduceByWindow
:返回一个新的 DStream,它包含了原始 DStream 中指定窗口大小和滑动间隔的元素经过 reduce 函数处理后的结果。reduceByKeyAndWindow
:类似于reduceByWindow
,但是在进行 reduce 操作之前会先按照 key 进行分组。
下面是一个使用窗口函数的示例代码:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} val conf = new SparkConf().setAppName("Window Example") val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(10)) wordCounts.print() ssc.start() ssc.awaitTermination()
在这个示例中,我们首先创建了一个 DStream,并对其进行了一系列转换操作。然后,我们使用 reduceByKeyAndWindow
函数对 DStream 进行窗口化处理,指定了窗口大小为 30 秒,滑动间隔为 10 秒。最后,我们使用 print
方法打印出单词计数的结果。
输出操作
Spark Streaming允许DStream的数据输出到外部系统,如数据库或文件系统,输出的数据可以被外部系统所使用,该操作类似于RDD的输出操作。Spark Streaming支持以下输出操作:
print()
: 打印DStream中每个RDD的前10个元素到控制台。saveAsTextFiles(prefix, [suffix])
: 将此DStream中每个RDD的所有元素以文本文件的形式保存。每个批次的数据都会保存在一个单独的目录中,目录名为:prefix-TIME_IN_MS[.suffix]
。saveAsObjectFiles(prefix, [suffix])
: 将此DStream中每个RDD的所有元素以Java对象序列化的形式保存。每个批次的数据都会保存在一个单独的目录中,目录名为:prefix-TIME_IN_MS[.suffix]
。saveAsHadoopFiles(prefix, [suffix])
: 将此DStream中每个RDD的所有元素以Hadoop文件(SequenceFile等)的形式保存。每个批次的数据都会保存在一个单独的目录中,目录名为:prefix-TIME_IN_MS[.suffix]
。foreachRDD(func)
: 最通用的输出操作,将函数func应用于DStream中生成的每个RDD。通过此函数,可以将数据写入任何支持写入操作的数据源。
Structured Streaming
Structured Streaming 是 Spark 2.0 版本中引入的一种新的流处理引擎。它基于 Spark SQL 引擎,提供了一种声明式的 API 来处理结构化数据流。
与 Spark Streaming 相比,Structured Streaming 具有以下优点:
- 易用性:Structured Streaming 提供了与 Spark SQL 相同的 API,可以让开发人员快速构建流处理应用。
- 高性能:Structured Streaming 基于 Spark SQL 引擎,能够快速处理大规模的数据流。
- 容错性:Structured Streaming 具有良好的容错性,能够在节点故障时自动恢复。
- 端到端一致性:Structured Streaming 提供了端到端一致性保证,能够确保数据不丢失、不重复。
下面是一个简单的 Structured Streaming 示例代码:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate() val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() import spark.implicits._ val words = lines.as[String].flatMap(_.split(" ")) val wordCounts = words.groupBy("value").count() val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination()
在这个示例中,我们首先创建了一个 SparkSession
对象。然后,我们使用 readStream
方法从套接字源创建了一个 DataFrame。接下来,我们对 DataFrame 进行了一系列操作,包括 flatMap、groupBy 和 count。最后,我们使用 writeStream
方法将结果输出到控制台。
Structured Streaming 同样支持 DSL 和 SQL 语法。
DSL 语法:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate() val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() import spark.implicits._ val words = lines.as[String].flatMap(_.split(" ")) val wordCounts = words.groupBy("value").count() val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination()
SQL 语法:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate() val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() lines.createOrReplaceTempView("lines") val wordCounts = spark.sql( """ |SELECT value, COUNT(*) as count |FROM ( | SELECT explode(split(value, ' ')) as value | FROM lines |) |GROUP BY value """.stripMargin) val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination()
Source
Structured Streaming 支持多种输入源,包括文件源(如文本文件、Parquet 文件、JSON 文件等)、Kafka、Socket 等。下面是一个使用 Scala 语言从 Kafka 中读取数据的例子:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate() // 订阅一个主题 val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
Output
Structured Streaming 支持多种输出方式,包括控制台输出、内存输出、文件输出、数据源输出等。下面是将数据写入到 Parquet 文件中的例子:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate() // 从 socket 中读取数据 val lines = spark .readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 将数据写入到 Parquet 文件中 lines.writeStream .format("parquet") .option("path", "path/to/output/dir") .option("checkpointLocation", "path/to/checkpoint/dir") .start()
output mode
每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。
Output mode 指定了数据写入输出接收器的方式。Structured Streaming 支持以下三种 output mode:
Output Mode | 描述 |
Append | 只将流 DataFrame/Dataset 中的新行写入接收器。 |
Complete | 每当有更新时,将流 DataFrame/Dataset 中的所有行写入接收器。 |
Update | 每当有更新时,只将流 DataFrame/Dataset 中更新的行写入接收器。 |
output sink
Output sink 指定了数据写入的位置。Structured Streaming 支持多种输出接收器,包括文件接收器、Kafka 接收器、Foreach 接收器、控制台接收器和内存接收器等。下面是一些使用 Scala 语言将数据写入到不同输出接收器中的例子:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate() // 从 socket 中读取数据 val lines = spark .readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 将数据写入到 Parquet 文件中 lines.writeStream .format("parquet") .option("path", "path/to/output/dir") .option("checkpointLocation", "path/to/checkpoint/dir") .start() // 将数据写入到 Kafka 中 //selectExpr 是一个 DataFrame 的转换操作,它允许你使用 SQL 表达式来选择 DataFrame 中的列。 //selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 表示选择 key 和 value 列,并将它们的类型转换为字符串类型。 //这是因为 Kafka 接收器要求数据必须是字符串类型或二进制类型。 lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic1") .start() // 将数据写入到控制台中 lines.writeStream .format("console") .start() // 将数据写入到内存中 lines.writeStream .format("memory") .queryName("tableName") .start()
PV,UV统计
下面是用Structured Streaming实现PV,UV统计的例子,我们来感受实战下:
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object PVUVExample { def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("PVUVExample").getOrCreate() import spark.implicits._ // 假设我们有一个包含用户ID和访问的URL的输入流 val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load() val data = lines.as[String].map(line => { val parts = line.split(",") (parts(0), parts(1)) }).toDF("user", "url") // 计算PV val pv = data.groupBy("url").count().withColumnRenamed("count", "pv") val pvQuery = pv.writeStream.outputMode("complete").format("console").start() // 计算UV val uv = data.dropDuplicates().groupBy("url").count().withColumnRenamed("count", "uv") val uvQuery = uv.writeStream.outputMode("complete").format("console").start() pvQuery.awaitTermination() uvQuery.awaitTermination() } }
这段代码演示了如何使用Structured Streaming对数据进行PV和UV统计。它首先从一个socket源读取数据,然后使用groupBy
和count
对数据进行PV统计,最后使用dropDuplicates
、groupBy
和count
对数据进行UV统计。
假设我们在本地启动了一个socket服务器,并向其发送以下数据:
user1,http://example.com/page1 user2,http://example.com/page1 user1,http://example.com/page2 user3,http://example.com/page1 user2,http://example.com/page2 user3,http://example.com/page2
那么程序将输出以下结果:
------------------------------------------- Batch: 0 ------------------------------------------- +--------------------+---+ | url| pv| +--------------------+---+ |http://example.co...| 3| |http://example.co...| 3| +--------------------+---+ ------------------------------------------- Batch: 0 ------------------------------------------- +--------------------+---+ | url| uv| +--------------------+---+ |http://example.co...| 2| |http://example.co...| 3| +--------------------+---+
总结
总之,Spark是一个强大的大数据处理框架,它具有高性能、易用性和灵活性等优点。希望本文能够帮助你入门Spark,并在实际应用中发挥它的强大功能。如果你想深入学习Spark,可以参考官方文档和相关书籍,也可以加入Spark社区,与其他开发人员交流经验。