Spark-SparkSql

简介: SparkSql 允许spark执行sql语句,hivesql,scala的描述的基于关系的查询。其实是封装了新的RDD-SchemaRDD,由行对象组成,有一个模式描述每列的数据类型。
  1. SparkSql
    允许spark执行sql语句,hivesql,scala的描述的基于关系的查询。其实是封装了新的RDD-SchemaRDD,由行对象组成,有一个模式描述每列的数据类型。SchemaRDD与关系型数据库的表很相似,可以通过存在的RDD/Parquet文件/Json文件/用Hive中的数据HiveSql创建。其中相关功能入口是SQLContext()及其子类。
    如HiveContext可以用HiveQL分析器查询访问HiveUDFs的能力、从Hive中读取数据。SparkSQL的数据源相关操作可以通过SchemaRDD接口来操作,可以是被当作一般的RDD,也可以注册成临时表,在上面进行sql查询。
    有两种创建SchemaRDD的方式,一是已经知道了模式,基于反射推断。二是不知道模式,采取实现接口方法,构造一个模式。
//指定模式
val schema = StructType("name age".split(',').map(fieldName=>StructField(fieldName,StringType,true)))
vak rowRdd = sc.textFile("文件地址").map(_.split(',')).map(p=>Row(p(0),p(1).trim))
val peopleSchemaRdd = sqlContext.applySchema(rowRdd ,schema )
peopleSchemaRdd.registerTable("people")

或者

  DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
 schemaPeople.registerTempTable("people");
 Parquet是柱状的格式,binnaryAsString标记sparksql将二进制文件解释成字符串。cacheMetadata打开缓存提高静态数据的查询速度。comperssion.codec是设置文件的压缩算法(snappy、gzip、lzo)。filterPushdown是该文件过滤器的pushdown优化。

对于SparkSql的性能调优可以通过缓存数据和打开一些设置选项来调优。
如cacheTable缓存柱状格式的表spark会只浏览需要的列并且自动的去压缩数据减少内存的使用以及垃圾回收的压力。uncacheTable()可以删除临时表,spark.sql.inMemoryColumarStorage.compressed 基于数据的统计信息每列自动的选择一个压缩算法,
spark.sql.inMemoryColumarStorage.batchSize柱状缓存的批数据大小,越大的数据可以提高内存的利用率和压缩效率,但是OOM是个问题啊,据说spark 2.0的钨丝计划会解决spark申请内存的管理问题。
2. 实例

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class JavaSparkSQL {
  public static class Person implements Serializable {
    private String name;
    private int age;
    public String getName() {
      return name;
    }
    public void setName(String name) {
      this.name = name;
    }
    public int getAge() {
      return age;
    }
    public void setAge(int age) {
      this.age = age;
    }
  }

  public static void main(String[] args) throws Exception {
    /**
     * 初始化
     */
    SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL").setMaster("local[*]");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    SQLContext sqlContext = new SQLContext(ctx);
    System.out.println("=== Data source: RDD ===");
    /**
     * 加载本地文件转换成Bean
     */
    JavaRDD<String> a = ctx.textFile("resources/people.txt");
    System.out.println(a.toDebugString());//rdd的(血统)其实就是RDD得的转换
    JavaRDD<Person> people = ctx.textFile("resources/people.txt").map(
      new Function<String, Person>() {
        @Override
        public Person call(String line) {
          String[] parts = line.split(",");
          Person person = new Person();
          person.setName(parts[0]);
          person.setAge(Integer.parseInt(parts[1].trim()));
          return person;
        }
      });
    //注册表 javabean形RDD即对象
    DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
    schemaPeople.registerTempTable("people");
    // SQL
    DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    List<String> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) {
        return "Name: " + row.getString(0);
      }
    }).collect();
    for (String name: teenagerNames) {
      System.out.println(name);
    }

  /*  System.out.println("=== Data source: Parquet File ===");
    // DataFrames can be saved as parquet files, maintaining the schema information.
    schemaPeople.write().parquet("testdata/people.parquet");

    // Read in the parquet file created above.
    // Parquet files are self-describing so the schema is preserved.
    // The result of loading a parquet file is also a DataFrame.
    DataFrame parquetFile = sqlContext.read().parquet("testdata/people.parquet");

    //Parquet files can also be registered as tables and then used in SQL statements.
    parquetFile.registerTempTable("testdata/parquetFile");
    DataFrame teenagers2 =
      sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
    teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) {
          return "Name: " + row.getString(0);
      }
    }).collect();
    for (String name: teenagerNames) {
      System.out.println(name);
    }*/
    /**
     * 读取本地json文件
     */
    System.out.println("=== Data source: JSON Dataset ===");
    String path = "resources/people.json";
    // Because the schema of a JSON dataset is automatically inferred, to write queries,
    DataFrame peopleFromJsonFile = sqlContext.read().json(path);
    peopleFromJsonFile.printSchema();
    // root
    //  |-- age: IntegerType
    //  |-- name: StringType
    peopleFromJsonFile.registerTempTable("people");
    DataFrame teenagers3 = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
    teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) { return "Name: " + row.getString(0); }
    }).collect();
    for (String name: teenagerNames) {
      System.out.println(name);
    }
    /**
     * 测试
     */
    List<String> jsonData = Arrays.asList(
          "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
    JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
    DataFrame peopleFromJsonRDD = sqlContext.read().json(anotherPeopleRDD.rdd());
    peopleFromJsonRDD.printSchema();
    // root
    //  |-- address: StructType
    //  |    |-- city: StringType
    //  |    |-- state: StringType
    //  |-- name: StringType
    peopleFromJsonRDD.registerTempTable("people2");
    DataFrame peopleWithCity = sqlContext.sql("SELECT name, address.city FROM people2");
    List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) {
        return "Name: " + row.getString(0) + ", City: " + row.getString(1);
      }
    }).collect();
    for (String name: nameAndCity) {
      System.out.println(name);
    }
    ctx.stop();
  }
}
目录
相关文章
|
人工智能
AI 绘画Stable Diffusion 研究(五)sd文生图功能详解(下)(3)
AI 绘画Stable Diffusion 研究(五)sd文生图功能详解(下)
670 0
|
6月前
|
SQL Java 编译器
深入理解 Spring Data JPA 的导入与使用:以 UserRepository为例
本文深入解析了 Spring Data JPA 中 `UserRepository` 的导入与使用。通过示例代码,详细说明了为何需要导入 `User` 实体类、`JpaRepository` 接口及 `@Repository` 注解。这些导入语句分别用于定义操作实体、提供数据库交互方法和标识数据访问组件。文章还探讨了未导入时的编译问题,并展示了实际应用场景,如用户保存、查询与删除操作。合理使用导入语句,可让代码更简洁高效,充分发挥 Spring Data JPA 的优势。
378 0
|
机器学习/深度学习 人工智能 自然语言处理
【大模型】使用哪些资源来了解 LLM 的最新进展?
【5月更文挑战第9天】【大模型】使用哪些资源来了解 LLM 的最新进展?
|
XML Android开发 数据格式
🌐Android国际化与本地化全攻略!让你的App走遍全球无障碍!🌍
在全球化背景下,实现Android应用的国际化与本地化至关重要。本文以一款旅游指南App为例,详细介绍如何通过资源文件拆分与命名、适配布局与方向、处理日期时间及货币格式、考虑文化习俗等步骤,完成多语言支持和本地化调整。通过邀请用户测试并收集反馈,确保应用能无缝融入不同市场,提升用户体验与满意度。
465 3
|
SQL 开发框架 前端开发
在C#开发中使用第三方组件LambdaParser、DynamicExpresso、Z.Expressions,实现动态解析/求值字符串表达式
在C#开发中使用第三方组件LambdaParser、DynamicExpresso、Z.Expressions,实现动态解析/求值字符串表达式
5分钟明白LangChain 的输出解析器和链
本文介绍 LangChain 的输出解析器OutputParser的使用,和基于LangChain的LCEL构建链。
|
编解码 人工智能 对象存储
EAS
EAS
498 3
|
数据安全/隐私保护 Windows
Windows使用远程桌面连接树莓派
Windows使用远程桌面连接树莓派
1316 0
Windows使用远程桌面连接树莓派
|
安全 网络协议 前端开发
为什么要用SOCKS5代理?有什么优势?
SOCKS5代理是网络通信的中间层协议,超越HTTP和HTTPS代理,能处理各种协议如HTTP、HTTPS、POP3,提供灵活的流量兼容性和高效的数据处理。其优势在于:支持任何类型的流量,尤其适合流量密集型任务;专注于数据包传输,确保数据准确性;且具有更强的安全性,通过身份验证增强通信安全。广泛应用于大数据传输、远程连接和网页抓取等领域,尤其在需要高安全性和大量数据处理时,SOCKS5代理成为理想选择。
端口扫描 -- Masscan-Gui
端口扫描 -- Masscan-Gui
211 0