scala-spark中的groupby、like等的用法

简介: scala-spark中的groupby、like等的用法

groupby的用法

在Scala中使用Spark的groupBy方法可以对RDD或DataFrame进行分组操作。下面分别介绍在RDD和DataFrame中如何使用groupBy方法。

在RDD中使用groupBy方法:

假设你有一个RDD,其中包含键值对(key-value pairs),你可以使用groupBy方法按照键对RDD进行分组。以下是一个简单的示例:

import org.apache.spark.{SparkConf, SparkContext}

object GroupByExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("GroupByExample").setMaster("local")
    val sc = new SparkContext(conf)

    // 创建一个包含键值对的RDD
    val data = List(("cat", 1), ("dog", 2), ("cat", 3), ("dog", 4), ("cat", 5))
    val rdd = sc.parallelize(data)

    // 使用groupBy方法按照键进行分组
    val groupedRDD = rdd.groupBy(pair => pair._1)

    // 打印分组结果
    groupedRDD.foreach { case (key, values) =>
      println(s"$key: ${values.mkString(", ")}")
    }

    sc.stop()
  }
}

在DataFrame中使用groupBy方法

自己生成一个DataFrame

如果你使用DataFrame而不是RDD,你可以使用groupBy方法来对DataFrame进行分组。以下是一个简单的示例:

import org.apache.spark.sql.{SparkSession, DataFrame}

object GroupByDataFrameExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("GroupByDataFrameExample").getOrCreate()

    // 创建一个包含键值对的DataFrame
    val data = Seq(("cat", 1), ("dog", 2), ("cat", 3), ("dog", 4), ("cat", 5))
    val df = spark.createDataFrame(data).toDF("key", "value")

    // 使用groupBy方法按照键进行分组
    val groupedDF: DataFrame = df.groupBy("key")

    // 进行聚合操作,例如求和
    val sumDF: DataFrame = groupedDF.sum("value")

    // 打印结果
    sumDF.show()

    spark.stop()
  }
}

通过查询结果生成DataFrame

如果你想展示根据 dev_id 分组后的结果,包括每个 dev_id 对应的记录数量,你可以使用groupBycount函数来实现。以下是一个示例:

val sql1 = s"select * from t_a where day_id = $acct_month"
val ans1 = spark.sql(sql1)

// 使用groupBy和count函数按照dev_id分组,并计算每个分组的记录数量
val result = ans1.groupBy("dev_id").count()

// 打印结果
result.show()

在这个例子中,groupBy("dev_id")会按照 dev_id 进行分组,然后使用count()函数计算每个分组的记录数量。最后,通过show()方法打印结果。

请注意,这样的操作会生成一个新的DataFrame,其中包含 dev_id 和对应的记录数量。你可以根据需要进一步处理或保存这个DataFrame。

groupyby之后并排序

import org.apache.spark.sql.functions.col

val sql21 = s"select * from t_a"
  val ans21 = spark.sql(sql21)
  println("2-1端口临时表中总共的端口数为"+ans21.count())

  val a211 = ans21.groupBy("bandwidth").count()
  println("不同的带宽数量为"+a211.count())
  a211.orderBy(col("count").desc)
  a211.show(100)

通过引入 import org.apache.spark.sql.functions.col,我们可以使用 col 函数来引用列名。然后,我们可以使用 orderBy 方法对结果进行排序。

gourpby之后对某个字段求和、取最大值、最小值

在Scala中使用Spark进行group by操作后,可以通过agg函数对每个group进行聚合操作,包括求和、取最大值、最小值等。以下是一个简单的示例代码,假设你有一个包含idvalue字段的数据集:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 创建SparkSession
val spark = SparkSession.builder.appName("GroupByExample").getOrCreate()

// 创建示例数据集
val data = Seq(
  (1, 10),
  (1, 20),
  (2, 30),
  (2, 40),
  (3, 50)
)

// 定义数据集的schema
val schema = List("id", "value")

// 将数据集转换为DataFrame
val df = spark.createDataFrame(data).toDF(schema: _*)

// 对id进行group by,并对value字段进行聚合操作
val result = df.groupBy("id")
  .agg(
    sum("value").alias("sum_value"),
    max("value").alias("max_value"),
    min("value").alias("min_value")
  )

// 显示结果
result.show()

上述代码中,groupBy("id")表示按照"id"字段进行分组,然后使用agg函数对每个group进行聚合操作,其中sum("value")表示对"value"字段求和,max("value")表示取"value"字段的最大值,min("value")表示取"value"字段的最小值。最后,通过alias函数为聚合后的字段起别名,方便结果显示。

使用like语句

在Spark DataFrame中对某个字段进行类似于SQL中的LIKE操作,你可以使用filter方法结合like函数。以下是一个简单的示例代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 创建SparkSession
val spark = SparkSession.builder.appName("LikeExample").getOrCreate()

// 创建示例数据集
val data = Seq(
  (1, "apple"),
  (2, "banana"),
  (3, "orange"),
  (4, "grape"),
  (5, "kiwi")
)

// 定义数据集的schema
val schema = List("id", "fruit")

// 将数据集转换为DataFrame
val df = spark.createDataFrame(data).toDF(schema: _*)

// 对"fruit"字段进行LIKE操作
val result = df.filter(col("fruit").like("%app%"))

// 显示结果
result.show()

上述代码中,col("fruit").like("%app%")表示对"fruit"字段进行LIKE操作,模式字符串"%app%"表示匹配包含"app"的任何字符串。你可以根据实际需求调整模式字符串来进行灵活的匹配操作。在filter方法中,传入条件表达式即可实现对DataFrame的筛选操作。

如何将spark的DataSet转换成DataFrame

在Spark中,DatasetDataFrame是两种不同的API,但它们之间可以相互转换。通常,你可以使用toDF方法将Dataset转换为DataFrame。以下是一个简单的示例:

import org.apache.spark.sql.{SparkSession, Dataset}

// 创建SparkSession
val spark = SparkSession.builder.appName("DatasetToDataFrameExample").getOrCreate()

// 创建一个样例类表示数据结构
case class Person(id: Int, name: String, age: Int)

// 创建一个Dataset
val personDS: Dataset[Person] = Seq(
  Person(1, "Alice", 25),
  Person(2, "Bob", 30),
  Person(3, "Charlie", 35)
).toDS()

// 将Dataset转换为DataFrame
val personDF = personDS.toDF()

// 显示DataFrame
personDF.show()

在上述示例中,toDS()方法将Seq转换为Dataset[Person],然后使用toDF()方法将Dataset转换为DataFrame。最后,通过show()方法显示DataFrame的内容。

请注意,如果你有自定义的类(例如上面的Person类),Spark会自动推断字段名和数据类型。如果没有自定义类,你可以使用createDataFrame方法或toDF方法指定字段名。例如:

val personDF = Seq(
  (1, "Alice", 25),
  (2, "Bob", 30),
  (3, "Charlie", 35)
).toDF("id", "name", "age")

这样可以直接将Seq转换为DataFrame并指定字段名。

目录
相关文章
|
7月前
|
存储 Scala 索引
scala中常见数据结构的用法
scala中常见数据结构的用法
53 1
|
7月前
|
SQL JSON 分布式计算
Spark SQL简介与基本用法
Spark SQL简介与基本用法
|
前端开发 Java 程序员
Scala高级用法 3
Scala高级用法
51 0
|
Java Scala
Scala高级用法 2
Scala高级用法
50 0
|
分布式计算 Java Scala
Scala高级用法 1
Scala高级用法
61 0
|
Java Scala
Scala的高级用法
Scala的高级用法
|
Java Scala
scala 匿名函数的用法实操
1. => 什么意思 => 匿名函数(Anonymous Functions),表示创建一个函数实例。 比如:(x: Int) => x + 1 和如下JAVA方法表示的含义一样:
127 0
scala之list用法史上最全
Scala 列表类似于数组,它们所有元素的类型都相同,但是它们也有所不同:列表是不可变的,值一旦被定义了就不能改变,其次列表 具有递归的结构(也就是链接表结构)而数组不是 下面是list的常用方法,当然了这不是所有的.但都是最常用的.具体看下面的demo.具体可以看代码里面的注释
|
2月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
67 5
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
54 3
下一篇
DataWorks