使用Spark DataFrame针对数据进行SQL处理

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介:

简介

    DataFrame让Spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,计算性能更还快了两倍。这一个小小的API,隐含着Spark希望大一统「大数据江湖」的野心和决心。DataFrame像是一条联结所有主流数据源并自动转化为可并行处理格式的水渠,通过它Spark能取悦大数据生态链上的所有玩家,无论是善用R的数据科学家,惯用SQL的商业分析师,还是在意效率和实时性的统计工程师。

例子说明

    提供了将结构化数据为DataFrame并注册为表,使用SQL查询的例子

    提供了从RMDB中读取数据为DataFrame的例子

    提供了将数据写入到RMDB中的例子

代码样例

import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import java.io.PrintWriter
import util.control.Breaks._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.Connection
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import java.util.Properties
import org.apache.spark.sql.SaveMode

object SimpleDemo extends App {
  val sc = new SparkContext("local[*]", "test")
  val sqlc = new SQLContext(sc)
  val driverUrl = "jdbc:mysql://ip:3306/ding?user=root&password=root&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8"
  val tableName = "tbaclusterresult"

  //把数据转化为DataFrame,并注册为一个表
  val df = sqlc.read.json("G:/data/json.txt")
  df.registerTempTable("user")
  val res = sqlc.sql("select * from user")
  println(res.count() + "---------------------------")
  res.collect().map { row =>
    {
      println(row.toString())
    }
  }

  //从MYSQL读取数据
  val jdbcDF = sqlc.read
    .options(Map("url" -> driverUrl,
      //      "user" -> "root",
      //      "password" -> "root",
      "dbtable" -> tableName))
    .format("jdbc")
    .load()
  println(jdbcDF.count() + "---------------------------")
  jdbcDF.collect().map { row =>
    {
      println(row.toString())
    }
  }

  //插入数据至MYSQL
  val schema = StructType(
    StructField("name", StringType) ::
      StructField("age", IntegerType)
      :: Nil)

  val data1 = sc.parallelize(List(("blog1", 301), ("iteblog", 29),
    ("com", 40), ("bt", 33), ("www", 23))).
    map(item => Row.apply(item._1, item._2))
  import sqlc.implicits._
  val df1 = sqlc.createDataFrame(data1, schema)
  //  df1.write.jdbc(driverUrl, "sparktomysql", new Properties)
  df1.write.mode(SaveMode.Overwrite).jdbc(driverUrl, "testtable", new Properties)

  //DataFrame类中还有insertIntoJDBC方法,调用该函数必须保证表事先存在,它只用于插入数据,函数原型如下:
  //def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit

  //插入数据到MYSQL
  val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
  data.foreachPartition(myFun)

  case class Blog(name: String, count: Int)

  def myFun(iterator: Iterator[(String, Int)]): Unit = {
    var conn: Connection = null
    var ps: PreparedStatement = null
    val sql = "insert into blog(name, count) values (?, ?)"
    try {
      conn = DriverManager.getConnection(driverUrl, "root", "root")
      iterator.foreach(data => {
        ps = conn.prepareStatement(sql)
        ps.setString(1, data._1)
        ps.setInt(2, data._2)
        ps.executeUpdate()
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (ps != null) {
        ps.close()
      }
      if (conn != null) {
        conn.close()
      }
    }
  }
}

将数据写入ORACLE示例

val driverUrl: String = "jdbc:oracle:thin:@IP:1521/sda"
    jdbcDF.foreachPartition(insertDataFunc)
    def insertDataFunc(iterator: Iterator[Row]): Unit = {
      var conn: Connection = null
      var psmt: PreparedStatement = null
      val sql = "INSERT INTO TEST2(ID,NAME,NUM) VALUES ( ?,?, ?)"
      var i = 0
      var num = 0
      try {
        conn = DriverManager.getConnection(driverUrl, "user", "password")
        conn.setAutoCommit(false);
        psmt = conn.prepareStatement(sql)
        iterator.foreach { row =>
          {
            i += 1
            if (i > batchSize) {
              i = 0
              psmt.executeBatch();
              num += psmt.getUpdateCount();
              psmt.clearBatch();
            }
            psmt.setObject(1, row(0))
            psmt.setObject(2, row(1))
            psmt.setObject(3, row(2))
            psmt.addBatch();
          }
        }
        psmt.executeBatch();
        num += psmt.getUpdateCount();
        conn.commit();
        println(num+"..........................")
      } catch {
        case e: Exception => {
          e.printStackTrace()
          try {
            conn.rollback();
          } catch {
            case e: Exception => e.printStackTrace();
          }
        }
      } finally {
        if (psmt != null) {
          psmt.close()
        }
        if (conn != null) {
          conn.close()
        }
      }
    }
相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
目录
相关文章
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
788 43
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
284 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
5月前
|
SQL
SQL如何只让特定列中只显示一行数据
SQL如何只让特定列中只显示一行数据
|
5月前
|
SQL JSON 分布式计算
Spark SQL架构及高级用法
Spark SQL基于Catalyst优化器与Tungsten引擎,提供高效的数据处理能力。其架构涵盖SQL解析、逻辑计划优化、物理计划生成及分布式执行,支持复杂数据类型、窗口函数与多样化聚合操作,结合自适应查询与代码生成技术,实现高性能大数据分析。
|
5月前
|
SQL
SQL中如何删除指定查询出来的数据
SQL中如何删除指定查询出来的数据
|
5月前
|
SQL 关系型数据库 MySQL
SQL如何对不同表的数据进行更新
本文介绍了如何将表A的Col1数据更新到表B的Col1中,分别提供了Microsoft SQL和MySQL的实现方法,并探讨了多表合并后更新的优化方式,如使用MERGE语句提升效率。适用于数据库数据同步与批量更新场景。
|
6月前
|
SQL DataWorks 数据管理
SQL血缘分析实战!数据人必会的3大救命场景
1. 开源工具:Apache Atlas(元数据管理)、Spline(血缘追踪) 2. 企业级方案:阿里DataWorks血缘分析、腾讯云CDW血缘引擎 3. 自研技巧:在ETL脚本中植入版本水印,用注释记录业务逻辑变更 📌 重点总结:
|
6月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
310 0
|
9月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
404 79