【指标计算】Spark 计算指定用户与其他用户购买的相同商品

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时计算 Flink 版,5000CU*H 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
简介: 该代码示例使用Spark SQL解决查找指定用户(user01)与其他用户共同购买商品的问题。首先,创建SparkSession和模拟购买数据,然后通过SQL查询获取user01购买的商品集合。接着,对比所有用户购买记录,筛选出购买过相同商品且非user01的用户。输出显示了这些匹配用户的商品ID。关键在于使用`array_contains`函数检查商品是否在指定用户的购买列表中。遇到类似需求时,可参考Spark SQL官方函数文档。欢迎讨论复杂指标计算问题。

@[toc]

需求说明

计算某一个指定用户与其他用户购买的共同商品,假设用户 user01 购买了商品 123,那么需要找出其他购买过商品 123 的用户。

需求分析

1.指定用户

  • 获取指定用户所购买的商品

2.其他用户

  • 判断其他用户所购买的商品是否被指定用户购买

需求实现

获取指定用户 user01 与其他用户购买过的相同商品。

import org.apache.spark.sql.{DataFrame, SparkSession}

object Test{

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession 对象
        val spark: SparkSession = SparkSession
                .builder()
                .appName("Test")
                .master("local[*]")
                .enableHiveSupport()
                .getOrCreate()

        import spark.implicits._

        // 创建模拟数据
        val df: DataFrame = Seq(
            ("user01", 1),
            ("user01", 2),
            ("user01", 3),
            ("user02", 1),
            ("user02", 2),
            ("user03", 3),
            ("user04", 1),
            ("user04", 4),
            ("user05", 5),
            ("user05", 6),
            ("user06", 10)
        ).toDF("user_id", "product_id")

        // 创建临时视图
        df.createOrReplaceTempView("purchase_data")

        // TODO 1.获取指定用户(user01)所购买的商品
        spark.sql(
            """
              |select
              |     collect_set(product_id) total_purchased_product_id
              |from
              |     purchase_data
              |where
              |     user_id = "user01"
              |group by
              |     user_id
              |""".stripMargin).createOrReplaceTempView("target_data")

        // TODO 2.获取购买过相同商品的用户
        spark.sql(
            """
              |select
              |     user_id,
              |     product_id
              |from
              |     purchase_data,
              |     target_data
              |where
              |     -- 判断商品是否被 user01 所购买
              |     user_id <> "user01"
              |     and
              |     array_contains(total_purchased_product_id,product_id)
              |""".stripMargin).show()

    }

}

输出结果如下所示:

+-------+----------+
|user_id|product_id|
+-------+----------+
| user02|         1|
| user02|         2|
| user03|         3|
| user04|         1|
+-------+----------+

解决这个需求的关键是了解 array_contains 函数:

image.png

平常碰到其它需求可以在 Spark SQL 官方函数文档 找找有没有相关的处理函数,让需求简单化。

如果你有碰到复杂的指标计算需求无法解决,欢迎各位同学在评论区或者私信与我进行讨论(无偿),学无止境,冲呀!

相关文章
|
3月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
90 5
|
3月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
66 3
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
91 0
|
5月前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
78 1
|
8月前
|
SQL 分布式计算 关系型数据库
Spark 分析计算连续三周登录的用户数
本文介绍了如何使用窗口函数`range between`来查询`login_time`为2022-03-10的用户最近连续三周的登录数。首先在MySQL中创建`log_data`表并插入数据,接着定义需求为找出该日期前连续三周活跃的用户数。通过Spark SQL,分步骤实现:1)确定统计周期,2)筛选符合条件的数据,3)计算用户连续登录状态。在初始实现中出现错误,因未考虑日期在周中的位置,修正后正确计算出活跃用户数。
139 6
|
7月前
|
分布式计算 Serverless 数据处理
Serverless Spark计算服务
Serverless Spark计算服务
|
8月前
|
SQL 分布式计算 Hadoop
Spark分布式内存计算框架
Spark分布式内存计算框架
236 0
|
8月前
|
分布式计算 数据处理 Apache
Spark RDD的行动操作与延迟计算
Spark RDD的行动操作与延迟计算
Spark RDD的行动操作与延迟计算
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
220 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
64 0