scala-spark中的groupby、like等的用法

简介: scala-spark中的groupby、like等的用法

groupby的用法

在Scala中使用Spark的groupBy方法可以对RDD或DataFrame进行分组操作。下面分别介绍在RDD和DataFrame中如何使用groupBy方法。

在RDD中使用groupBy方法:

假设你有一个RDD,其中包含键值对(key-value pairs),你可以使用groupBy方法按照键对RDD进行分组。以下是一个简单的示例:

import org.apache.spark.{SparkConf, SparkContext}

object GroupByExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("GroupByExample").setMaster("local")
    val sc = new SparkContext(conf)

    // 创建一个包含键值对的RDD
    val data = List(("cat", 1), ("dog", 2), ("cat", 3), ("dog", 4), ("cat", 5))
    val rdd = sc.parallelize(data)

    // 使用groupBy方法按照键进行分组
    val groupedRDD = rdd.groupBy(pair => pair._1)

    // 打印分组结果
    groupedRDD.foreach { case (key, values) =>
      println(s"$key: ${values.mkString(", ")}")
    }

    sc.stop()
  }
}

在DataFrame中使用groupBy方法

自己生成一个DataFrame

如果你使用DataFrame而不是RDD,你可以使用groupBy方法来对DataFrame进行分组。以下是一个简单的示例:

import org.apache.spark.sql.{SparkSession, DataFrame}

object GroupByDataFrameExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("GroupByDataFrameExample").getOrCreate()

    // 创建一个包含键值对的DataFrame
    val data = Seq(("cat", 1), ("dog", 2), ("cat", 3), ("dog", 4), ("cat", 5))
    val df = spark.createDataFrame(data).toDF("key", "value")

    // 使用groupBy方法按照键进行分组
    val groupedDF: DataFrame = df.groupBy("key")

    // 进行聚合操作,例如求和
    val sumDF: DataFrame = groupedDF.sum("value")

    // 打印结果
    sumDF.show()

    spark.stop()
  }
}

通过查询结果生成DataFrame

如果你想展示根据 dev_id 分组后的结果,包括每个 dev_id 对应的记录数量,你可以使用groupBycount函数来实现。以下是一个示例:

val sql1 = s"select * from t_a where day_id = $acct_month"
val ans1 = spark.sql(sql1)

// 使用groupBy和count函数按照dev_id分组,并计算每个分组的记录数量
val result = ans1.groupBy("dev_id").count()

// 打印结果
result.show()

在这个例子中,groupBy("dev_id")会按照 dev_id 进行分组,然后使用count()函数计算每个分组的记录数量。最后,通过show()方法打印结果。

请注意,这样的操作会生成一个新的DataFrame,其中包含 dev_id 和对应的记录数量。你可以根据需要进一步处理或保存这个DataFrame。

groupyby之后并排序

import org.apache.spark.sql.functions.col

val sql21 = s"select * from t_a"
  val ans21 = spark.sql(sql21)
  println("2-1端口临时表中总共的端口数为"+ans21.count())

  val a211 = ans21.groupBy("bandwidth").count()
  println("不同的带宽数量为"+a211.count())
  a211.orderBy(col("count").desc)
  a211.show(100)

通过引入 import org.apache.spark.sql.functions.col,我们可以使用 col 函数来引用列名。然后,我们可以使用 orderBy 方法对结果进行排序。

gourpby之后对某个字段求和、取最大值、最小值

在Scala中使用Spark进行group by操作后,可以通过agg函数对每个group进行聚合操作,包括求和、取最大值、最小值等。以下是一个简单的示例代码,假设你有一个包含idvalue字段的数据集:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 创建SparkSession
val spark = SparkSession.builder.appName("GroupByExample").getOrCreate()

// 创建示例数据集
val data = Seq(
  (1, 10),
  (1, 20),
  (2, 30),
  (2, 40),
  (3, 50)
)

// 定义数据集的schema
val schema = List("id", "value")

// 将数据集转换为DataFrame
val df = spark.createDataFrame(data).toDF(schema: _*)

// 对id进行group by,并对value字段进行聚合操作
val result = df.groupBy("id")
  .agg(
    sum("value").alias("sum_value"),
    max("value").alias("max_value"),
    min("value").alias("min_value")
  )

// 显示结果
result.show()

上述代码中,groupBy("id")表示按照"id"字段进行分组,然后使用agg函数对每个group进行聚合操作,其中sum("value")表示对"value"字段求和,max("value")表示取"value"字段的最大值,min("value")表示取"value"字段的最小值。最后,通过alias函数为聚合后的字段起别名,方便结果显示。

使用like语句

在Spark DataFrame中对某个字段进行类似于SQL中的LIKE操作,你可以使用filter方法结合like函数。以下是一个简单的示例代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 创建SparkSession
val spark = SparkSession.builder.appName("LikeExample").getOrCreate()

// 创建示例数据集
val data = Seq(
  (1, "apple"),
  (2, "banana"),
  (3, "orange"),
  (4, "grape"),
  (5, "kiwi")
)

// 定义数据集的schema
val schema = List("id", "fruit")

// 将数据集转换为DataFrame
val df = spark.createDataFrame(data).toDF(schema: _*)

// 对"fruit"字段进行LIKE操作
val result = df.filter(col("fruit").like("%app%"))

// 显示结果
result.show()

上述代码中,col("fruit").like("%app%")表示对"fruit"字段进行LIKE操作,模式字符串"%app%"表示匹配包含"app"的任何字符串。你可以根据实际需求调整模式字符串来进行灵活的匹配操作。在filter方法中,传入条件表达式即可实现对DataFrame的筛选操作。

如何将spark的DataSet转换成DataFrame

在Spark中,DatasetDataFrame是两种不同的API,但它们之间可以相互转换。通常,你可以使用toDF方法将Dataset转换为DataFrame。以下是一个简单的示例:

import org.apache.spark.sql.{SparkSession, Dataset}

// 创建SparkSession
val spark = SparkSession.builder.appName("DatasetToDataFrameExample").getOrCreate()

// 创建一个样例类表示数据结构
case class Person(id: Int, name: String, age: Int)

// 创建一个Dataset
val personDS: Dataset[Person] = Seq(
  Person(1, "Alice", 25),
  Person(2, "Bob", 30),
  Person(3, "Charlie", 35)
).toDS()

// 将Dataset转换为DataFrame
val personDF = personDS.toDF()

// 显示DataFrame
personDF.show()

在上述示例中,toDS()方法将Seq转换为Dataset[Person],然后使用toDF()方法将Dataset转换为DataFrame。最后,通过show()方法显示DataFrame的内容。

请注意,如果你有自定义的类(例如上面的Person类),Spark会自动推断字段名和数据类型。如果没有自定义类,你可以使用createDataFrame方法或toDF方法指定字段名。例如:

val personDF = Seq(
  (1, "Alice", 25),
  (2, "Bob", 30),
  (3, "Charlie", 35)
).toDF("id", "name", "age")

这样可以直接将Seq转换为DataFrame并指定字段名。

目录
相关文章
|
7月前
|
存储 Scala 索引
scala中常见数据结构的用法
scala中常见数据结构的用法
53 1
|
7月前
|
SQL JSON 分布式计算
Spark SQL简介与基本用法
Spark SQL简介与基本用法
|
前端开发 Java 程序员
Scala高级用法 3
Scala高级用法
51 0
|
Java Scala
Scala高级用法 2
Scala高级用法
50 0
|
分布式计算 Java Scala
Scala高级用法 1
Scala高级用法
61 0
|
Java Scala
Scala的高级用法
Scala的高级用法
|
Java Scala
scala 匿名函数的用法实操
1. => 什么意思 => 匿名函数(Anonymous Functions),表示创建一个函数实例。 比如:(x: Int) => x + 1 和如下JAVA方法表示的含义一样:
127 0
scala之list用法史上最全
Scala 列表类似于数组,它们所有元素的类型都相同,但是它们也有所不同:列表是不可变的,值一旦被定义了就不能改变,其次列表 具有递归的结构(也就是链接表结构)而数组不是 下面是list的常用方法,当然了这不是所有的.但都是最常用的.具体看下面的demo.具体可以看代码里面的注释
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
130 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
72 0
下一篇
DataWorks