使用Spark DataFrame针对数据进行SQL处理

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介:

简介

    DataFrame让Spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,计算性能更还快了两倍。这一个小小的API,隐含着Spark希望大一统「大数据江湖」的野心和决心。DataFrame像是一条联结所有主流数据源并自动转化为可并行处理格式的水渠,通过它Spark能取悦大数据生态链上的所有玩家,无论是善用R的数据科学家,惯用SQL的商业分析师,还是在意效率和实时性的统计工程师。

例子说明

    提供了将结构化数据为DataFrame并注册为表,使用SQL查询的例子

    提供了从RMDB中读取数据为DataFrame的例子

    提供了将数据写入到RMDB中的例子

代码样例

import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import java.io.PrintWriter
import util.control.Breaks._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.Connection
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import java.util.Properties
import org.apache.spark.sql.SaveMode

object SimpleDemo extends App {
  val sc = new SparkContext("local[*]", "test")
  val sqlc = new SQLContext(sc)
  val driverUrl = "jdbc:mysql://ip:3306/ding?user=root&password=root&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8"
  val tableName = "tbaclusterresult"

  //把数据转化为DataFrame,并注册为一个表
  val df = sqlc.read.json("G:/data/json.txt")
  df.registerTempTable("user")
  val res = sqlc.sql("select * from user")
  println(res.count() + "---------------------------")
  res.collect().map { row =>
    {
      println(row.toString())
    }
  }

  //从MYSQL读取数据
  val jdbcDF = sqlc.read
    .options(Map("url" -> driverUrl,
      //      "user" -> "root",
      //      "password" -> "root",
      "dbtable" -> tableName))
    .format("jdbc")
    .load()
  println(jdbcDF.count() + "---------------------------")
  jdbcDF.collect().map { row =>
    {
      println(row.toString())
    }
  }

  //插入数据至MYSQL
  val schema = StructType(
    StructField("name", StringType) ::
      StructField("age", IntegerType)
      :: Nil)

  val data1 = sc.parallelize(List(("blog1", 301), ("iteblog", 29),
    ("com", 40), ("bt", 33), ("www", 23))).
    map(item => Row.apply(item._1, item._2))
  import sqlc.implicits._
  val df1 = sqlc.createDataFrame(data1, schema)
  //  df1.write.jdbc(driverUrl, "sparktomysql", new Properties)
  df1.write.mode(SaveMode.Overwrite).jdbc(driverUrl, "testtable", new Properties)

  //DataFrame类中还有insertIntoJDBC方法,调用该函数必须保证表事先存在,它只用于插入数据,函数原型如下:
  //def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit

  //插入数据到MYSQL
  val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
  data.foreachPartition(myFun)

  case class Blog(name: String, count: Int)

  def myFun(iterator: Iterator[(String, Int)]): Unit = {
    var conn: Connection = null
    var ps: PreparedStatement = null
    val sql = "insert into blog(name, count) values (?, ?)"
    try {
      conn = DriverManager.getConnection(driverUrl, "root", "root")
      iterator.foreach(data => {
        ps = conn.prepareStatement(sql)
        ps.setString(1, data._1)
        ps.setInt(2, data._2)
        ps.executeUpdate()
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (ps != null) {
        ps.close()
      }
      if (conn != null) {
        conn.close()
      }
    }
  }
}

将数据写入ORACLE示例

val driverUrl: String = "jdbc:oracle:thin:@IP:1521/sda"
    jdbcDF.foreachPartition(insertDataFunc)
    def insertDataFunc(iterator: Iterator[Row]): Unit = {
      var conn: Connection = null
      var psmt: PreparedStatement = null
      val sql = "INSERT INTO TEST2(ID,NAME,NUM) VALUES ( ?,?, ?)"
      var i = 0
      var num = 0
      try {
        conn = DriverManager.getConnection(driverUrl, "user", "password")
        conn.setAutoCommit(false);
        psmt = conn.prepareStatement(sql)
        iterator.foreach { row =>
          {
            i += 1
            if (i > batchSize) {
              i = 0
              psmt.executeBatch();
              num += psmt.getUpdateCount();
              psmt.clearBatch();
            }
            psmt.setObject(1, row(0))
            psmt.setObject(2, row(1))
            psmt.setObject(3, row(2))
            psmt.addBatch();
          }
        }
        psmt.executeBatch();
        num += psmt.getUpdateCount();
        conn.commit();
        println(num+"..........................")
      } catch {
        case e: Exception => {
          e.printStackTrace()
          try {
            conn.rollback();
          } catch {
            case e: Exception => e.printStackTrace();
          }
        }
      } finally {
        if (psmt != null) {
          psmt.close()
        }
        if (conn != null) {
          conn.close()
        }
      }
    }
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
28天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
56 2
|
2月前
|
Java 网络架构 数据格式
Struts 2 携手 RESTful:颠覆传统,重塑Web服务新纪元的史诗级组合!
【8月更文挑战第31天】《Struts 2 与 RESTful 设计:构建现代 Web 服务》介绍如何结合 Struts 2 框架与 RESTful 设计理念,构建高效、可扩展的 Web 服务。Struts 2 的 REST 插件提供简洁的 API 和约定,使开发者能快速创建符合 REST 规范的服务接口。通过在 `struts.xml` 中配置 `<rest>` 命名空间并使用注解如 `@Action`、`@GET` 等,可轻松定义服务路径及 HTTP 方法。
38 0
|
2月前
|
测试技术 Java
全面保障Struts 2应用质量:掌握单元测试与集成测试的关键策略
【8月更文挑战第31天】Struts 2 的测试策略结合了单元测试与集成测试。单元测试聚焦于单个组件(如 Action 类)的功能验证,常用 Mockito 模拟依赖项;集成测试则关注组件间的交互,利用 Cactus 等框架确保框架拦截器和 Action 映射等按预期工作。通过确保高测试覆盖率并定期更新测试用例,可以提升应用的整体稳定性和质量。
59 0
|
2月前
|
数据库 Java 监控
Struts 2 日志管理化身神秘魔法师,洞察应用运行乾坤,演绎奇幻篇章!
【8月更文挑战第31天】在软件开发中,了解应用运行状况至关重要。日志管理作为 Struts 2 应用的关键组件,记录着每个动作和决策,如同监控摄像头,帮助我们迅速定位问题、分析性能和使用情况,为优化提供依据。Struts 2 支持多种日志框架(如 Log4j、Logback),便于配置日志级别、格式和输出位置。通过在 Action 类中添加日志记录,我们能在开发过程中获取详细信息,及时发现并解决问题。合理配置日志不仅有助于调试,还能分析用户行为,提升应用性能和稳定性。
38 0
|
2月前
|
Java 测试技术 容器
从零到英雄:Struts 2 最佳实践——你的Web应用开发超级变身指南!
【8月更文挑战第31天】《Struts 2 最佳实践:从设计到部署的全流程指南》深入介绍如何利用 Struts 2 框架从项目设计到部署的全流程。从初始化配置到采用 MVC 设计模式,再到性能优化与测试,本书详细讲解了如何构建高效、稳定的 Web 应用。通过最佳实践和代码示例,帮助读者掌握 Struts 2 的核心功能,并确保应用的安全性和可维护性。无论是在项目初期还是后期运维,本书都是不可或缺的参考指南。
33 0
|
2月前
|
测试技术 Java
揭秘Struts 2测试的秘密:如何打造无懈可击的Web应用?
【8月更文挑战第31天】在软件开发中,确保代码质量的关键在于全面测试。对于基于Struts 2框架的应用,结合单元测试与集成测试是一种有效的策略。单元测试聚焦于独立组件的功能验证,如Action类的执行逻辑;而集成测试则关注组件间的交互,确保框架各部分协同工作。使用JUnit进行单元测试,可通过简单示例验证Action类的返回值;利用Struts 2 Testing插件进行集成测试,则可模拟HTTP请求,确保Action方法正确处理请求并返回预期结果。这种结合测试的方法不仅提高了代码质量和可靠性,还保证了系统各部分按需协作。
12 0
|
5月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
199 0
|
5月前
|
SQL 分布式计算 大数据
【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)
【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)
133 0
|
2月前
|
SQL 存储 分布式计算
下一篇
无影云桌面