2.sparkSQL--DataFrames与RDDs的相互转换

简介: Spark SQL支持两种RDDs转换为DataFrames的方式 使用反射获取RDD内的Schema     当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。 通过编程接口指定Schema     通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。
Spark SQL支持两种RDDs转换为DataFrames的方式
使用反射获取RDD内的Schema
    当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。
通过编程接口指定Schema
    通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。
    这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成Schema。

原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6613755.html

微信:intsmaze

使用反射获取Schema(Inferring the Schema Using Reflection)
import org.apache.spark.sql.{DataFrameReader, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object InferringSchema {
  def main(args: Array[String]) {

    //创建SparkConf()并设置App名称
    val conf = new SparkConf().setAppName("SQL-intsmaze")

    //SQLContext要依赖SparkContext
    val sc = new SparkContext(conf)
    //创建SQLContext
    val sqlContext = new SQLContext(sc)

    //从指定的地址创建RDD
    val lineRDD = sc.textFile("hdfs://192.168.19.131:9000/person.tzt").map(_.split(","))

    //创建case class
    //将RDD和case class关联
    val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

    //导入隐式转换,如果不导入无法将RDD转换成DataFrame
    //将RDD转换成DataFrame
    import sqlContext.implicits._
    val personDF = personRDD.toDF

    //注册表
    personDF.registerTempTable("intsmaze")
    //传入SQL
    val df = sqlContext.sql("select * from intsmaze order by age desc limit 2")

    //将结果以JSON的方式存储到指定位置
    df.write.json("hdfs://192.168.19.131:9000/personresult")

    //停止Spark Context
    sc.stop()
  }
}
//case class一定要放到外面
case class Person(id: Int, name: String, age: Int)
spark shell中不需要导入sqlContext.implicits._是因为spark shell默认已经自动导入了。
打包提交到yarn集群:
/home/hadoop/app/spark/bin/spark-submit --class InferringSchema \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar 

 

通过编程接口指定Schema(Programmatically Specifying the Schema)

当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步:

从原来的RDD创建一个Row格式的RDD.

创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema.

通过SQLContext提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema.

 

import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}

object SpecifyingSchema {
  def main(args: Array[String]) {
    //创建SparkConf()并设置App名称
    val conf = new SparkConf().setAppName("SQL-intsmaze")
    //SQLContext要依赖SparkContext
    val sc = new SparkContext(conf)
    //创建SQLContext
    val sqlContext = new SQLContext(sc)

    //从指定的地址创建RDD
    val personRDD = sc.textFile(args(0)).map(_.split(","))

    //通过StructType直接指定每个字段的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )

    //将RDD映射到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))

    //将schema信息应用到rowRDD上
    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)

    //注册表
    personDataFrame.registerTempTable("intsmaze")
    //执行SQL
    val df = sqlContext.sql("select * from intsmaze order by age desc ")
    //将结果以JSON的方式存储到指定位置
    df.write.json(args(1))
    //停止Spark Context
    sc.stop()
  }
}
将程序打成jar包,上传到spark集群,提交Spark任务
/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar \
hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult

 

/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar \
hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult

maven项目的pom.xml中添加Spark SQL的依赖

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.10</artifactId>
  <version>1.6.2</version>
</dependency>

 

作者: intsmaze(刘洋)
老铁,你的--->推荐,--->关注,--->评论--->是我继续写作的动力。
微信公众号号:Apache技术研究院
由于博主能力有限,文中可能存在描述不正确,欢迎指正、补充!
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关文章
|
XML 存储 API
RAG效果优化:高质量文档解析详解
本文介绍了如何通过高质量的文档解析提升RAG系统整体的效果。
16117 15
|
存储 关系型数据库 MySQL
MySQL MATCH 函数如何使用 WITH QUERY EXPANSION?
【9月更文挑战第2天】MySQL MATCH 函数如何使用 WITH QUERY EXPANSION?
161 0
|
人工智能 自然语言处理 算法
阿里云PAI大模型评测最佳实践
在大模型时代,模型评测是衡量性能、精选和优化模型的关键环节,对加快AI创新和实践至关重要。PAI大模型评测平台支持多样化的评测场景,如不同基础模型、微调版本和量化版本的对比分析。本文为您介绍针对于不同用户群体及对应数据集类型,如何实现更全面准确且具有针对性的模型评测,从而在AI领域可以更好地取得成就。
|
开发框架 小程序 JavaScript
【小程序开发框架选型】7大小程序开发框架,哪一个更适合你?
小程序越来越流行,微信小程序、百度小程序、支付宝小程序、头条小程序等等不断涌入我们的生活,随着小程序的火爆,各种小程序框架不断出现。小程序开发公认的7个小程序开发框架: 原生、uni-app、taro、mpvue、wepy、chameleon、remax。各有利弊。
2190 0
|
XML Android开发 数据格式
Android View动画和属性动画
Android View动画和属性动画
Android View动画和属性动画
|
SQL 分布式计算 Cloud Native
在阿里云中实现EMR离线数据分析
E-MapReduce(简称“EMR”)是云原生开源大数据平台,向客户提供简单易集成的Hadoop、Hive、Spark、Flink、Presto、Clickhouse、Delta、Hudi等开源大数据计算和存储引擎。EMR计算资源可以根据业务的需要调整。EMR可以部署在阿里云公有云的ECS和ACK、专有云平台。产品文档地址:https://www.aliyun.com/product/emapreduce
567 0
在阿里云中实现EMR离线数据分析
|
JavaScript 前端开发 索引
javascript unshift()和shift()
console.log('unshift()和shift()方法的行为非常类似于push()和pop(),不一样的是前者在数组的头部而非尾部进行元素的插入和删除操作。'); console.log('unshift() 在数组的头部添加一个或多个元素,并将已存在的元素移动到更高索引的位置来获得足够的空间,最后返回数组新的长度。
1026 0
|
3天前
|
存储 关系型数据库 分布式数据库
PostgreSQL 18 发布,快来 PolarDB 尝鲜!
PostgreSQL 18 发布,PolarDB for PostgreSQL 全面兼容。新版本支持异步I/O、UUIDv7、虚拟生成列、逻辑复制增强及OAuth认证,显著提升性能与安全。PolarDB-PG 18 支持存算分离架构,融合海量弹性存储与极致计算性能,搭配丰富插件生态,为企业提供高效、稳定、灵活的云数据库解决方案,助力企业数字化转型如虎添翼!