scala-spark中的groupby、like等的用法

简介: scala-spark中的groupby、like等的用法

groupby的用法

在Scala中使用Spark的groupBy方法可以对RDD或DataFrame进行分组操作。下面分别介绍在RDD和DataFrame中如何使用groupBy方法。

在RDD中使用groupBy方法:

假设你有一个RDD,其中包含键值对(key-value pairs),你可以使用groupBy方法按照键对RDD进行分组。以下是一个简单的示例:

import org.apache.spark.{SparkConf, SparkContext}

object GroupByExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("GroupByExample").setMaster("local")
    val sc = new SparkContext(conf)

    // 创建一个包含键值对的RDD
    val data = List(("cat", 1), ("dog", 2), ("cat", 3), ("dog", 4), ("cat", 5))
    val rdd = sc.parallelize(data)

    // 使用groupBy方法按照键进行分组
    val groupedRDD = rdd.groupBy(pair => pair._1)

    // 打印分组结果
    groupedRDD.foreach { case (key, values) =>
      println(s"$key: ${values.mkString(", ")}")
    }

    sc.stop()
  }
}

在DataFrame中使用groupBy方法

自己生成一个DataFrame

如果你使用DataFrame而不是RDD,你可以使用groupBy方法来对DataFrame进行分组。以下是一个简单的示例:

import org.apache.spark.sql.{SparkSession, DataFrame}

object GroupByDataFrameExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("GroupByDataFrameExample").getOrCreate()

    // 创建一个包含键值对的DataFrame
    val data = Seq(("cat", 1), ("dog", 2), ("cat", 3), ("dog", 4), ("cat", 5))
    val df = spark.createDataFrame(data).toDF("key", "value")

    // 使用groupBy方法按照键进行分组
    val groupedDF: DataFrame = df.groupBy("key")

    // 进行聚合操作,例如求和
    val sumDF: DataFrame = groupedDF.sum("value")

    // 打印结果
    sumDF.show()

    spark.stop()
  }
}

通过查询结果生成DataFrame

如果你想展示根据 dev_id 分组后的结果,包括每个 dev_id 对应的记录数量,你可以使用groupBycount函数来实现。以下是一个示例:

val sql1 = s"select * from t_a where day_id = $acct_month"
val ans1 = spark.sql(sql1)

// 使用groupBy和count函数按照dev_id分组,并计算每个分组的记录数量
val result = ans1.groupBy("dev_id").count()

// 打印结果
result.show()

在这个例子中,groupBy("dev_id")会按照 dev_id 进行分组,然后使用count()函数计算每个分组的记录数量。最后,通过show()方法打印结果。

请注意,这样的操作会生成一个新的DataFrame,其中包含 dev_id 和对应的记录数量。你可以根据需要进一步处理或保存这个DataFrame。

groupyby之后并排序

import org.apache.spark.sql.functions.col

val sql21 = s"select * from t_a"
  val ans21 = spark.sql(sql21)
  println("2-1端口临时表中总共的端口数为"+ans21.count())

  val a211 = ans21.groupBy("bandwidth").count()
  println("不同的带宽数量为"+a211.count())
  a211.orderBy(col("count").desc)
  a211.show(100)

通过引入 import org.apache.spark.sql.functions.col,我们可以使用 col 函数来引用列名。然后,我们可以使用 orderBy 方法对结果进行排序。

gourpby之后对某个字段求和、取最大值、最小值

在Scala中使用Spark进行group by操作后,可以通过agg函数对每个group进行聚合操作,包括求和、取最大值、最小值等。以下是一个简单的示例代码,假设你有一个包含idvalue字段的数据集:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 创建SparkSession
val spark = SparkSession.builder.appName("GroupByExample").getOrCreate()

// 创建示例数据集
val data = Seq(
  (1, 10),
  (1, 20),
  (2, 30),
  (2, 40),
  (3, 50)
)

// 定义数据集的schema
val schema = List("id", "value")

// 将数据集转换为DataFrame
val df = spark.createDataFrame(data).toDF(schema: _*)

// 对id进行group by,并对value字段进行聚合操作
val result = df.groupBy("id")
  .agg(
    sum("value").alias("sum_value"),
    max("value").alias("max_value"),
    min("value").alias("min_value")
  )

// 显示结果
result.show()

上述代码中,groupBy("id")表示按照"id"字段进行分组,然后使用agg函数对每个group进行聚合操作,其中sum("value")表示对"value"字段求和,max("value")表示取"value"字段的最大值,min("value")表示取"value"字段的最小值。最后,通过alias函数为聚合后的字段起别名,方便结果显示。

使用like语句

在Spark DataFrame中对某个字段进行类似于SQL中的LIKE操作,你可以使用filter方法结合like函数。以下是一个简单的示例代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 创建SparkSession
val spark = SparkSession.builder.appName("LikeExample").getOrCreate()

// 创建示例数据集
val data = Seq(
  (1, "apple"),
  (2, "banana"),
  (3, "orange"),
  (4, "grape"),
  (5, "kiwi")
)

// 定义数据集的schema
val schema = List("id", "fruit")

// 将数据集转换为DataFrame
val df = spark.createDataFrame(data).toDF(schema: _*)

// 对"fruit"字段进行LIKE操作
val result = df.filter(col("fruit").like("%app%"))

// 显示结果
result.show()

上述代码中,col("fruit").like("%app%")表示对"fruit"字段进行LIKE操作,模式字符串"%app%"表示匹配包含"app"的任何字符串。你可以根据实际需求调整模式字符串来进行灵活的匹配操作。在filter方法中,传入条件表达式即可实现对DataFrame的筛选操作。

如何将spark的DataSet转换成DataFrame

在Spark中,DatasetDataFrame是两种不同的API,但它们之间可以相互转换。通常,你可以使用toDF方法将Dataset转换为DataFrame。以下是一个简单的示例:

import org.apache.spark.sql.{SparkSession, Dataset}

// 创建SparkSession
val spark = SparkSession.builder.appName("DatasetToDataFrameExample").getOrCreate()

// 创建一个样例类表示数据结构
case class Person(id: Int, name: String, age: Int)

// 创建一个Dataset
val personDS: Dataset[Person] = Seq(
  Person(1, "Alice", 25),
  Person(2, "Bob", 30),
  Person(3, "Charlie", 35)
).toDS()

// 将Dataset转换为DataFrame
val personDF = personDS.toDF()

// 显示DataFrame
personDF.show()

在上述示例中,toDS()方法将Seq转换为Dataset[Person],然后使用toDF()方法将Dataset转换为DataFrame。最后,通过show()方法显示DataFrame的内容。

请注意,如果你有自定义的类(例如上面的Person类),Spark会自动推断字段名和数据类型。如果没有自定义类,你可以使用createDataFrame方法或toDF方法指定字段名。例如:

val personDF = Seq(
  (1, "Alice", 25),
  (2, "Bob", 30),
  (3, "Charlie", 35)
).toDF("id", "name", "age")

这样可以直接将Seq转换为DataFrame并指定字段名。

目录
相关文章
|
3月前
|
存储 Scala 索引
scala中常见数据结构的用法
scala中常见数据结构的用法
21 1
|
3月前
|
SQL JSON 分布式计算
Spark SQL简介与基本用法
Spark SQL简介与基本用法
|
11月前
|
前端开发 Java 程序员
Scala高级用法 3
Scala高级用法
32 0
|
11月前
|
Java Scala
Scala高级用法 2
Scala高级用法
35 0
|
11月前
|
分布式计算 Java Scala
Scala高级用法 1
Scala高级用法
44 0
|
Java Scala
Scala的高级用法
Scala的高级用法
|
Java Scala
scala 匿名函数的用法实操
1. => 什么意思 => 匿名函数(Anonymous Functions),表示创建一个函数实例。 比如:(x: Int) => x + 1 和如下JAVA方法表示的含义一样:
107 0
scala之list用法史上最全
Scala 列表类似于数组,它们所有元素的类型都相同,但是它们也有所不同:列表是不可变的,值一旦被定义了就不能改变,其次列表 具有递归的结构(也就是链接表结构)而数组不是 下面是list的常用方法,当然了这不是所有的.但都是最常用的.具体看下面的demo.具体可以看代码里面的注释
|
1月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
84 1
Spark快速大数据分析PDF下载读书分享推荐
|
13天前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
57 3