03-Spark

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: 03-Spark

一.简单介绍

1.什么是 RDD?

RDD 是“Resilient Distributed Dataset”的缩写,翻译为“弹性分布式数据集”。它是 Apache Spark 分布式计算框架中的基本数据抽象,用于在集群上执行并行处理。

RDD 是 Spark 中的核心数据结构,它代表分布式的、不可变的、弹性的数据集合。这意味着 RDD 可以在集群的不同节点上进行并行处理,它们是只读的,一旦创建就不可更改,但可以通过一系列转换操作来生成新的 RDD。此外,RDD 具有弹性,意味着在计算失败时,Spark 能够自动恢复并重新计算丢失的数据。

使用 RDD,Spark 可以将数据分割成一系列的分区,每个分区可以在集群中的不同节点上并行处理。这种并行处理和容错机制使得 Spark 在大规模数据处理和分布式计算方面具有高效性和弹性。

在较新的 Spark 版本中,Dataset 和 DataFrame 已经取代了 RDD 成为更为推荐的数据抽象,因为它们提供了更高层次的抽象和更多的优化机会。但是,RDD 仍然是 Spark 的基石,用于底层的数据处理和分布式计算场景。

2.什么是 DataFrame

Spark 中的 DataFrame 是一种分布式数据集,它是一种高级抽象,构建在弹性分布式数据集(RDD)之上。DataFrame 提供了更丰富的数据结构和 API,使得在 Spark 中进行数据处理更加简单、方便、高效。

DataFrame 可以看作是具有命名列的分布式数据表,类似于传统数据库或数据处理工具中的表格。它具有以下特点:

  1. 结构化数据:DataFrame 是结构化的数据,每个列都有一个名称和数据类型。这样可以更好地组织和处理数据,使得数据分析和查询更加方便。
  2. 不可变性:DataFrame 是不可变的数据结构,一旦创建,其内容不能被直接修改。相比之下,你可以通过转换操作生成新的 DataFrame。
  3. 惰性执行:Spark 中的 DataFrame 具有惰性执行特性。这意味着在进行数据处理时,实际的计算并不会立即执行,而是在遇到行动操作(例如 collect、show、count 等)时才会触发实际的计算。
  4. 分布式计算:DataFrame 是分布式的数据集,可以在集群中的多个节点上进行并行处理。Spark 会自动优化执行计划,以便充分利用集群资源。
  5. 内置函数和优化:Spark 提供了许多内置的高级函数和优化器,用于数据处理、查询、聚合等操作。这些内置函数可以加速处理速度,并且通常比手动编写 RDD 代码更加高效。

创建DataFrame的常见方法包括:

  • 从已有的数据源(如文件、数据库、Hive 表等)中读取数据。
  • 通过在已有的 RDD 上添加模式信息来转换成 DataFrame。
  • 通过编程方式构建数据集。

使用 DataFrame,你可以像 SQL 一样进行查询、过滤、聚合等操作,也可以进行复杂的数据处理和转换。此外,Spark 还提供了与许多其他数据处理库(如 Pandas)的集成,使得在 Spark 中进行数据分析和处理更加灵活和强大。DataFrame 的出现大大简化了 Spark 的数据处理编程,并提高了代码的可读性和可维护性。

3.什么是 DataSet

在 Spark 中,DataSet 是一种更高级的分布式数据集抽象,它是 DataFrame 的扩展,结合了强类型和面向对象的特性。DataSet 在 Spark 1.6 版本中引入,是对 DataFrame 的类型安全扩展。

DataSet 与 DataFrame 的区别在于类型信息:

  1. 强类型:DataSet 是强类型的数据集合,它使用 Scala 或 Java 的类来表示每个记录,这样在编译时就可以检查类型的一致性。这使得在处理数据时更加安全,减少了在运行时出现类型错误的可能性。
  2. 编译时检查:由于 DataSet 是强类型的,所以在对数据进行操作时,编译器可以在编译阶段检查数据的类型,而不是在运行时发现错误。这有助于在开发过程中更早地发现错误,提高代码的可靠性和性能。
  3. 面向对象:DataSet 中的数据被组织为一组对象,每个对象对应于一个记录。这使得在处理复杂数据结构时更加直观和方便。

虽然 DataSet 是 DataFrame 的扩展,但在实际使用中,可以将 DataSet 视为 DataFrame 的一种特殊形式。DataSet API 和 DataFrame API 非常相似,大部分操作都可以在两者之间互相转换。

DataSet的创建方式包括:

  • 从已有的 DataFrame 中转换:可以通过调用as方法,将 DataFrame 转换为 DataSet。这样可以利用强类型特性对数据进行类型安全的处理。
  • 从已有的 RDD 中转换:可以通过调用toDS方法,将 RDD 转换为 DataSet。需要提供一个 Encoder 来描述如何将 RDD 中的数据映射到 DataSet 中的类。
  • 从数据源读取:可以通过 SparkSession 的 read API 从数据源(如文件、数据库、Hive 表等)中读取数据并创建 DataSet。

使用 DataSet,可以利用 Scala 或 Java 中的类和对象来表示数据,而不仅仅是简单的行和列。这使得在编写 Spark 应用程序时,能够更好地结合面向对象的编程范式,编写更具表现力和易于维护的代码。DataSet 对于需要类型安全和面向对象的数据处理场景非常有用。

4.RDD 和 DataFrame 和 DataSet?

在 SparkSQL 中 Spark 为我们提供了两个新的抽象,分别是 DataFrameDataSet首先从版本的产生上来看:

  • Spark1.0 => RDD
  • Spark1.3 => DataFrame
  • Spark1.6 => Dataset

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的 Spark 版本中, DataSet 有可能会逐步取代 RDD 和 DataFrame 成为唯一的 API 接口。

三者的共性:RDD 、 DataFrame 、 DataSet 全都是 spark 平台下的分布式弹性数据集,为处理超大型数 据提供便利;

  • 三者都有惰性机制,在进行创建、转换,如 map 方法时,不会立即执行,只有在遇到 Action 如 foreach 时,三者才会开始遍历运算 ;
  • 三者有许多共同的函数,如 filter ,排序等 ;
  • 在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包 : import spark.implicits._ (在 创建好 SparkSession 对象后尽量直接导入)
  • 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会 内存溢出
  • 三者都有 partition 的概念。
  • DataFrame 和 DataSet 均可使用模式匹配获取各个字段的值和类型

三者的区别:

  1. RDD
  • RDD 一般和 spark mllib 同时使用
  • RDD 不支持 sparksql 操作
  1. DataFrame
  • 与 RDD 和 Dataset 不同, DataFrame 每一行的类型固定为 Row ,每一列的值没法直 接访问,只有通过解析才能获取各个字段的值
  • DataFrame 与 DataSet 一般不与 spark mllib 同时使用
  • DataFrame 与 DataSet 均支持 SparkSQL 的操作,比如 select , groupby 之类,还能 注册临时表/ 视窗,进行 sql 语句操作
  • DataFrame 与 DataSet 支持一些特别方便的保存方式,比如保存成 csv ,可以带上表 头,这样每一列的字段名一目了然( 后面专门讲解 )
  1. DataSet
  • Dataset 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同 DataFrame 其实就是 DataSet 的一个特例 type DataFrame = Dataset[Row]
  • DataFrame 也可以叫 Dataset[Row], 每一行的类型是 Row ,不解析,每一行究竟有哪 些字段,各个字段又是什么类型都无从得知,只能用上面提到的 getAS 方法或者共性中的第七条提到的模匹配拿出特定字段。而 Dataset 中,每一行是什么类型是 不一定的,在自定义了 case class 之后可以很自由的获得每一行的信息。

5.spark 部署模式

  • Hadoop YARN
  • Apache Mesos (deprecated)
  • Kubernetes
  • Standalone Mode (最简单的方式)

6.作业提交模版

./bin/spark-submit \

 --class <main-class> \        # 应用程序主入口类

 --master <master-url> \       # 集群的 Master Url

 --deploy-mode <deploy-mode> \ # 部署模式

 --conf <key>=<value> \        # 可选配置

 ... # other options

 <application-jar> \           # Jar 包路径

 [application-arguments]       #传递给主入口类的参数

7.Local 模式

# 本地模式提交应用

spark-submit \

--class org.apache.spark.examples.SparkPi \

--master local[2] \

/usr/app/spark-2.4.0-bin-hadoop2.6/examples/jars/spark-examples_2.11-2.4.0.jar \

100   # 传给 SparkPi 的参数

8.Standalone 模式

1.底层设计

Standalone 是 Spark 提供的一种内置的集群模式,采用内置的资源管理器进行管理。下面按照如图所示演示 1 个 Mater 和 2 个 Worker 节点的集群配置,这里使用两台主机进行演示:

  • hadoop001: 由于只有两台主机,所以 hadoop001 既是 Master 节点,也是 Worker 节点;
  • hadoop002 : Worker 节点。

2.注意事项

  • 主机名与 IP 地址的映射必须在 /etc/hosts 文件中已经配置,否则就直接使用 IP 地址;
  • 每个主机名必须独占一行;
  • Spark 的 Master 主机是通过 SSH 访问所有的 Worker 节点,所以需要预先配置免密登录。

3.独立模式服务器地址

#spark目录

cd /data/spark/spark-3.1.2-bin-hadoop3.2

4.cluster 模式启动

HiveExercise

./bin/spark-submit \

 --class com.deepexi.toreador.spark.sql.HiveExercise \

 --master spark://deep01.cdh:17077 \

 --deploy-mode cluster \

 --executor-memory 1G \

 --total-executor-cores 2 \

 --conf spark.sql.hive.metastore.version=2.1.1 \

 --conf spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/lib/* \

 /data/spark/jars/toreador-spark-0.0.1-SNAPSHOT.jar \

 file:/data/spark/datasets/dealer.json

# 独立模式cluster模式日志存放位置

spark.eventLog.enabled           true

spark.eventLog.dir               hdfs://deep03.cdh:8020/spark/standalong/logs

MysqlExercise

./bin/spark-submit \

 --class com.deepexi.toreador.spark.sql.MysqlExercise \

 --master spark://deep01.cdh:17077 \

 --deploy-mode cluster \

 --executor-memory 1G \

 --total-executor-cores 2 \

 --conf spark.sql.hive.metastore.version=2.1.1 \

 --conf spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/lib/* \

 /data/spark/jars/toreador-spark-0.0.1-SNAPSHOT.jar \

 file:/data/spark/datasets/dealer.json

State of driver-20220830191420-0001 is RUNNING

GroupByExercise

./bin/spark-submit \

 --class com.deepexi.toreador.spark.sql.GroupByExercise \

 --master spark://deep01.cdh:17077 \

 --deploy-mode cluster \

 --executor-memory 1G \

 --total-executor-cores 2 \

 --conf spark.sql.hive.metastore.version=2.1.1 \

 --conf spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/lib/* \

 /data/spark/jars/toreador-spark-0.0.1-SNAPSHOT.jar \

 file:/data/spark/datasets/dealer.json

5.client 模式启动

./bin/spark-submit \

 --class com.deepexi.toreador.spark.sql.HiveExercise \

 --master spark://deep01.cdh:17077 \

 --deploy-mode client \

 --executor-memory 1G \

 --total-executor-cores 2 \

 --conf spark.sql.hive.metastore.version=2.1.1 \

 --conf spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/lib/* \

 /data/spark/jars/toreador-spark-0.0.1-SNAPSHOT.jar \

 file:/data/spark/datasets/dealer.json

6.杀掉任务

./bin/spark-class org.apache.spark.deploy.Client kill  spark://deep01.cdh:17077  driver-20220830191420-0001

9.yarn 模式

1.服务器地址

#账号

ssh -p 22 root@deep03.cdh


#spark目录

cd /data/spark/spark-3.1.2-onyarn


#历史服务器

http://deep03.cdh:28082

2.cluster 模式启动

./bin/spark-submit \

 --class com.deepexi.toreador.spark.sql.HiveExercise \

 --master yarn \

 --deploy-mode cluster \

 --executor-memory 1G \

 --executor-cores 2 \

 --queue root.default \

 --conf spark.sql.hive.metastore.version=2.1.1 \

 --conf spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/lib/* \

 /data/spark/jars/toreador-spark-0.0.1-SNAPSHOT.jar

./bin/spark-submit \

 --class com.deepexi.toreador.spark.sql.HiveExercise3 \

 --master yarn \

 --deploy-mode cluster \

 --executor-memory 1G \

 --executor-cores 2 \

 --queue root.default \

 --conf spark.sql.hive.metastore.version=2.1.1 \

 --conf spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/lib/* \

 /data/spark/jars/toreador-spark-0.0.1-SNAPSHOT.jar \

   file:/data/spark/datasets/dealer.json

3.client 模式启动

./bin/spark-submit \

 --class com.deepexi.toreador.spark.sql.HiveExercise \

 --master yarn \

 --deploy-mode client \

 --executor-memory 1G \

 --executor-cores 2 \

 --queue root.default \

 --conf spark.sql.hive.metastore.version=2.1.1 \

 --conf spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/lib/* \

 /data/spark/jars/toreador-spark-0.0.1-SNAPSHOT.jar

10.spark 调研信息

SQL (Data Retrieval Statements 部分)

yarn 部署模式

独立部署模式

web api

配置

UDF (User-Defined Functions)部分

https://spark.apache.org/docs/latest/sql-ref-syntax.html SQL (Data Retrieval Statements部分)

https://spark.apache.org/docs/latest/running-on-yarn.html (yarn部署模式)

https://spark.apache.org/docs/latest/spark-standalone.html(独立部署模式)

https://spark.apache.org/docs/latest/monitoring.html (web api)

https://spark.apache.org/docs/latest/configuration.html (配置)

https://spark.apache.org/docs/latest/sql-ref-functions.html(UDFs (User-Defined Functions)部分)

RDD,全称为 Resilient Distributed Datasets,是一个容错的、并行的数据结构,弹性分布式数据集.

运行状态

  • active
  • complete
  • pending
  • failed

task 状态

  • RUNNING
  • SUCCESS
  • FAILED
  • KILLED
  • PENDING

二.YARN

1.YARN 介绍

Yarn 的框架也是经典的主从结构,和 HDFS 的一样,大体上 yarn 由一个 ResourceManager 和多个 NodeManager 构成,RM 为主节点,NM 为从节点。

组件名 作用
ResourceManager 是 Master 上一个独立运行的进程,负责集群统一的资源管理、调度、分配等等;
ApplicationManager 相当于这个 Application 的监护人和管理者,负责监控、管理这个 Application 的所有 Attempt 在 cluster 中各个节点上的具体运行,同时负责向 Yarn ResourceManager 申请资源、返还资源等;
NodeManager 是 Slave 上一个独立运行的进程,负责上报节点的状态(磁盘,内存,cpu 等使用信息);
Container 是 yarn 中分配资源的一个单位,包涵内存、CPU 等等资源,YARN 以 Container 为单位分配资源;

ResourceManager1、接收客户端请求2、为系统资源分配3、与 NM 进行心跳交互,监控集群4、调度组件 SchedulerRM 挂掉: 单点故障:基于 Zookeeper 实现 HA,主提供服务, 备同步主的信息,如果主挂掉,立即主备切换ApplicationManager/ApplicationMaster (MR 任务启动时候 jps 有 MRAppmaster,任务完成就没了)1、应用程序的 Master2、每一个 Job 对应一个 AM3、AM 和 RM 不在一个机器4、AM 申请 RM 资源调度5、AM 联合 NM 监控 jobAM 挂掉: RM 负责重启 无需重新运行已完成的任务NodeManager:(只管内存资源)1、对应 1.0TaskTracker 的角色2、负责启动应用程序的 Container3、监控内部容器资源使用情况,心跳 RMNM 挂掉: 心跳消失,RM 通知 AM 进一步处理Container1、任务运行环境的封装2、AM 及普通任务均运行在 Container 中3、资源代表

工作流程

应用程序提交 --> 申请资源 --> 启动 ApplicationMaster --> 申请运行任务的 Container--> 分发 Container --> 运行 task 任务 --> task 任务结束 --> 回收 Container。

2.以集群模式启动

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \

   --master yarn \

   --deploy-mode cluster \

   --driver-memory 4g \

   --executor-memory 2g \

   --executor-cores 1 \

   --queue thequeue \

   examples/jars/spark-examples*.jar \

   10

3.客户机模式启动

$ ./bin/spark-shell --master yarn --deploy-mode client

4.查看日志

yarn logs -applicationId <app ID>

三.spark 配置

1.Spark 配置系统

  • 可以通过 SparkConf 进行配置.
  • conf/park-env 设置每台机器的设置
  • log4j2.properties 配置日志记录

2.具体配置

Application Properties 里面是配置参数.

val conf = new SparkConf()

            .setMaster("local[2]")

            .setAppName("CountingSheep")

val sc = new SparkContext(conf)

3.运行时配置

val sc = new SparkContext(new SparkConf())

./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false

 --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

四.spark 函数

1.数组函数

SELECT array(1, 2, 3);


SELECT array_distinct(array(1, 2, 3, null, 3));


SELECT array_except(array(1, 2, 3), array(1, 3, 5));


SELECT array_join(array('hello', 'world'), ' ');

2.Map Functions

SELECT map(1.0, '2', 3.0, '4');


SELECT map_concat(map(1, 'a', 2, 'b'), map(3, 'c'));


SELECT map_contains_key(map(1, 'a', 2, 'b'), 1);

3.日期函数

SELECT add_months('2016-08-31', 1);


SELECT current_date();


SELECT current_date;


SELECT current_timestamp();


SELECT date_add('2016-07-30', 1);


SELECT date_sub('2016-07-30', 1);


SELECT datediff('2009-07-31', '2009-07-30');


SELECT day('2009-07-30');


SELECT dayofmonth('2009-07-30');


SELECT dayofweek('2009-07-30');


SELECT dayofyear('2016-04-09');


SELECT last_day('2009-01-12');


SELECT year('2016-07-30');

4.JSON Functions

SELECT from_json('{"a":1, "b":0.8}', 'a INT, b DOUBLE');


SELECT get_json_object('{"a":"b"}', '$.a');


SELECT json_array_length('[1,2,3,4]');


5.聚合函数

  • MAX
  • MIN
  • SUM
  • AVG
  • EVERY
  • ANY
  • SOME

SELECT any(col) FROM VALUES (true), (false), (false) AS tab(col);


SELECT avg(col) FROM VALUES (1), (2), (3) AS tab(col);


SELECT collect_list(col) FROM VALUES (1), (2), (1) AS tab(col);


SELECT collect_set(col) FROM VALUES (1), (2), (1) AS tab(col);


-- 方差

SELECT corr(c1, c2) FROM VALUES (3, 2), (3, 3), (6, 4) as tab(c1, c2);


SELECT count(*) FROM VALUES (NULL), (5), (5), (20) AS tab(col);


SELECT count(DISTINCT col) FROM VALUES (NULL), (5), (5), (10) AS tab(col);

6.窗口函数

-- cume_dist() over()  计算一个值相对于分区中所有值的位置。


SELECT a, b, rank(b) OVER (PARTITION BY a ORDER BY b) FROM VALUES ('A1', 2), ('A1', 1), ('A2', 3), ('A1', 1) tab(a, b);


SELECT a, b, row_number() OVER (PARTITION BY a ORDER BY b) FROM VALUES ('A1', 2), ('A1', 1), ('A2', 3), ('A1', 1) tab(a, b);

7.自定义函数

//获取SparkSession

SparkSession spark = SparkSession

 .builder()

 .appName("Java Spark SQL UDF scalar example")

 .getOrCreate();


//定义函数

UserDefinedFunction random = udf(

 () -> Math.random(), DataTypes.DoubleType

);

random.asNondeterministic();

//注册函数

spark.udf().register("random", random);

//执行自定义函数

spark.sql("SELECT random()").show();


spark.udf().register("plusOne",

 (UDF1<Integer, Integer>) x -> x + 1, DataTypes.IntegerType);

spark.sql("SELECT plusOne(5)").show();

五.spark-sql

1.Datasets 和 DataFrames

  • Datasets:据集是数据的分布式集合。RDD 算子
  • DataFrames:DataFrame 是组织成命名列的数据集。

而在 JavaAPI 中,用户需要使用 Dataset < Row > 来表示 DataFrame。

2.spark-java 入口

import org.apache.spark.sql.SparkSession;


SparkSession spark = SparkSession

 .builder()

 .appName("Java Spark SQL basic example")

 .config("spark.some.config.option", "some-value")

 .getOrCreate();

3.创建 DataFrame

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;


Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");


// Displays the content of the DataFrame to stdout

df.show();

4.DataFrame 操作

df.printSchema();


df.select("name").show();


df.select(col("name"), col("age").plus(1)).show();


df.filter(col("age").gt(21)).show();


df.groupBy("age").count().show();


df.createOrReplaceTempView("people");


Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");

sqlDF.show();




//添加对象

Encoder<Person> personEncoder = Encoders.bean(Person.class);

Dataset<Person> javaBeanDS = spark.createDataset(

 Collections.singletonList(person),

 personEncoder

);

javaBeanDS.show();



//获取属性值

Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

Encoder<String> stringEncoder = Encoders.STRING();

Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(

   (MapFunction<Row, String>) row -> "Name: " + row.getString(0),

   stringEncoder);

teenagerNamesByIndexDF.show();

5.获取数据源

  • 读取文件
  • 读取 hive 库

spark.sql("SELECT * FROM src").show();

6.写入数据库

Dataset<Row> jdbcDF = spark.read()

 .format("jdbc")

 .option("url", "jdbc:postgresql:dbserver")

 .option("dbtable", "schema.tablename")

 .option("user", "username")

 .option("password", "password")

 .load();


Properties connectionProperties = new Properties();

connectionProperties.put("user", "username");

connectionProperties.put("password", "password");

Dataset<Row> jdbcDF2 = spark.read()

 .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);


// Saving data to a JDBC source

jdbcDF.write()

 .format("jdbc")

 .option("url", "jdbc:postgresql:dbserver")

 .option("dbtable", "schema.tablename")

 .option("user", "username")

 .option("password", "password")

 .save();


jdbcDF2.write()

 .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

7.数据类型

Data type Value type in Java API to access or create a data type
ByteType byte or Byte DataTypes.ByteType
ShortType short or Short DataTypes.ShortType
IntegerType int or Integer DataTypes.IntegerType
LongType long or Long DataTypes.LongType
FloatType float or Float DataTypes.FloatType
DoubleType double or Double DataTypes.DoubleType
DecimalType java.math.BigDecimal DataTypes.createDecimalType()DataTypes.createDecimalType(precision, scale).
StringType String DataTypes.StringType
BinaryType byte[] DataTypes.BinaryType
BooleanType boolean or Boolean DataTypes.BooleanType
TimestampType java.sql.Timestamp DataTypes.TimestampType
DateType java.sql.Date DataTypes.DateType
YearMonthIntervalType java.time.Period YearMonthIntervalType
DayTimeIntervalType java.time.Duration DayTimeIntervalType
ArrayType java.util.List DataTypes.createArrayType(elementType) Note: The value of containsNull will be true.DataTypes.createArrayType(elementType, containsNull).
MapType java.util.Map DataTypes.createMapType(keyType, valueType)Note: The value of valueContainsNull will be true.DataTypes.createMapType(keyType, valueType, valueContainsNull)
StructType org.apache.spark.sql.Row DataTypes.createStructType(fields)Note: fields is a List or an array of StructFields.Also, two fields with the same name are not allowed.
StructField The value type in Java of the data type of this field (For example, int for a StructField with the data type IntegerType) DataTypes.createStructField(name, dataType, nullable)

六.SQL 语法

1.ALTER DATABASE

-- 创建数据库

CREATE DATABASE inventory;

CREATE DATABASE IF NOT EXISTS customer_db;

CREATE DATABASE inventory_db COMMENT 'This database is used to maintain Inventory';


-- 选用数据库

USE userdb;


-- 修改数据库的编辑人和编辑时间

ALTER DATABASE inventory SET DBPROPERTIES ('Edited-by' = 'John', 'Edit-date' = '01/01/2001');


-- 展示库的描述信息

DESCRIBE DATABASE EXTENDED inventory;


-- 修改库的存储位置

ALTER DATABASE inventory SET LOCATION 'file:/temp/spark-warehouse/new_inventory.db';


-- 删除库

DROP DATABASE IF EXISTS inventory_db CASCADE;

2.ALTER TABLE

-- 显示表描述

DESC student;


-- 修改表名

ALTER TABLE Student RENAME TO StudentInfo;


-- 显示表分区

SHOW PARTITIONS StudentInfo;


-- 修改表分区

ALTER TABLE default.StudentInfo PARTITION (age='10') RENAME TO PARTITION (age='15');


-- 添加列

ALTER TABLE StudentInfo ADD columns (LastName string, DOB timestamp);


-- 删除列

ALTER TABLE StudentInfo DROP columns (LastName, DOB);


-- 修改列名

ALTER TABLE StudentInfo RENAME COLUMN name TO FirstName;


-- 修改字段备注信息

ALTER TABLE StudentInfo ALTER COLUMN FirstName COMMENT "new comment";


-- 替换字段

ALTER TABLE StudentInfo REPLACE COLUMNS (name string, ID int COMMENT 'new comment');


-- 添加分区

ALTER TABLE StudentInfo ADD IF NOT EXISTS PARTITION (age=18);


-- 删除分区

ALTER TABLE StudentInfo DROP IF EXISTS PARTITION (age=18);


-- 添加多个分区

ALTER TABLE StudentInfo ADD IF NOT EXISTS PARTITION (age=18) PARTITION (age=20);


-- 删除表

DROP TABLE IF EXISTS employeetable;


-- 修复表

MSCK REPAIR TABLE t1;


-- 清空表

TRUNCATE TABLE Student;


-- 创建表

CREATE TABLE students (name VARCHAR(64), address VARCHAR(64))

   USING PARQUET PARTITIONED BY (student_id INT);


-- 插入一条数据

INSERT INTO students VALUES

   ('Amy Smith', '123 Park Ave, San Jose', 111111);


-- 插入多条

INSERT INTO students VALUES

   ('Bob Brown', '456 Taylor St, Cupertino', 222222),

   ('Cathy Johnson', '789 Race Ave, Palo Alto', 333333);


-- 插入select的数据

INSERT INTO students PARTITION (student_id = 444444)

   SELECT name, address FROM persons WHERE name = "Dora Williams";


-- 插入指定分区

INSERT INTO students PARTITION (birthday = date'2019-01-02')

   VALUES ('Amy Smith', '123 Park Ave, San Jose');



-- 覆盖插入

INSERT OVERWRITE students PARTITION (student_id = 222222)

   SELECT name, address FROM persons WHERE name = "Dora Williams";

3.ALTER VIEW

ALTER VIEW tempdb1.v1 RENAME TO tempdb1.v2;

DESCRIBE TABLE EXTENDED tempdb1.v2;

ALTER VIEW tempdb1.v2 AS SELECT * FROM tempdb1.v1;


-- 创建视图

CREATE GLOBAL TEMPORARY VIEW IF NOT EXISTS subscribed_movies

AS

SELECT mo.member_id, mb.full_name, mo.movie_title

FROM movies AS mo

        INNER JOIN members AS mb

                   ON mo.member_id = mb.id;

-- 删除视图

DROP VIEW IF EXISTS employeeView;

4.CREATE FUNCTION

CREATE FUNCTION simple_udf AS 'SimpleUdf'  USING JAR '/tmp/SimpleUdf.jar';

SHOW USER FUNCTIONS;

DROP TEMPORARY FUNCTION IF EXISTS test_avg;

DROP FUNCTION test_avg;

5.查询相关

[ WITH with_query [ , ... ] ]

select_statement [ { UNION | INTERSECT | EXCEPT } [ ALL | DISTINCT ] select_statement, ... ]

   [ ORDER BY { expression [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [ , ... ] } ]

   [ SORT BY { expression [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [ , ... ] } ]

   [ CLUSTER BY { expression [ , ... ] } ]

   [ DISTRIBUTE BY { expression [, ... ] } ]

   [ WINDOW { named_window [ , WINDOW named_window, ... ] } ]

   [ LIMIT { ALL | expression } ]

-- with as语法

WITH t(x, y) AS (SELECT 1, 2)

SELECT *

FROM t

WHERE x = 1

 AND y = 2;


-- 内层with as

WITH t AS (

   WITH t2 AS (SELECT 1)

   SELECT *

   FROM t2

)

SELECT *

FROM t;


-- 分区和排序字段相同

SELECT age, name

FROM person CLUSTER BY age;



-- 分区

SELECT age, name

FROM person DISTRIBUTE BY age;


-- 分组

SELECT id, SUM(quantity)

FROM dealer

GROUP BY id

ORDER BY id;


-- HAVING子句

SELECT city, SUM(quantity) AS SUM

FROM dealer

GROUP BY city

HAVING SUM (quantity) > 15;


-- hints

SELECT /*+ COALESCE(3) */ *

FROM t;

SELECT /*+ REPARTITION(3) */ *

FROM t;

SELECT /*+ REPARTITION_BY_RANGE(c) */ *

FROM t;

SELECT /*+ REBALANCE */ *

FROM t;


-- 内联表

SELECT *

FROM VALUES("one", 1)

  , ("two", 2)

  , ("three", NULL) AS data(a, b);


-- 连接

SELECT id, name, employee.deptno, deptname

FROM employee

        INNER JOIN department ON employee.deptno = department.deptno;


-- like使用

SELECT *

FROM person

WHERE name LIKE 'M%';



-- limit 使用

SELECT name, age

FROM person

ORDER BY name LIMIT 2;


-- 排序

SELECT *

FROM person

ORDER BY name ASC, age DESC;


-- UNION去重

   (SELECT c FROM number1)

   UNION

   DISTINCT

   (

   SELECT C

   FROM number2);


-- UNION不去重

SELECT c

FROM number1

UNION ALL

(SELECT c FROM number2);


-- 按分区,排序

SELECT /*+ REPARTITION(zip_code) */ name, age, zip_code

FROM person SORT BY NAME;


-- where 使用

SELECT *

FROM person

WHERE id BETWEEN 200 AND 300

ORDER BY id;



-- 窗口函数

SELECT name, dept, salary, MIN(salary) OVER (PARTITION BY dept ORDER BY salary) AS MIN

FROM employees;



-- case when使用

SELECT *

FROM person

WHERE CASE 1 = 1

         WHEN 100 THEN 'big'

         WHEN 200 THEN 'bigger'

         WHEN 300 THEN 'biggest'

         ELSE 'small'

         END = 'small';


-- 行转列

SELECT *

FROM person PIVOT (

                  SUM(age) AS a, AVG(class) AS C

       FOR NAME IN ('John' AS john, 'Mike' AS mike)

   );



-- 列转行

SELECT c_age, COUNT(1)

FROM person LATERAL VIEW EXPLODE(ARRAY(30, 60)) AS c_age

   LATERAL VIEW EXPLODE(ARRAY(40, 80)) AS d_age

GROUP BY c_age;



-- 执行计划

EXPLAIN

SELECT k, SUM(v)

FROM values(1, 2), (1, 3) t(k, v)

GROUP BY k;

EXPLAIN

EXTENDED

SELECT k, SUM(v)

FROM values(1, 2), (1, 3) t(k, v)

GROUP BY k;

6.辅助信息

-- 添加jar包

ADD JAR /tmp/test.jar;


-- 分析表

ANALYZE

TABLE students COMPUTE STATISTICS FOR COLUMNS NAME;


-- 所有jar文件

LIST

JAR;


-- 刷新表

REFRESH

TABLE tempDB.view1;


-- 展示列

SHOW

COLUMNS IN salesdb.customer;


-- 展示建表信息

SHOW

CREATE TABLE test;


-- 展示分区

SHOW

PARTITIONS salesdb.customer;


-- 显示所有表

SHOW

TABLES IN userdb;


-- 显示视图

SHOW

VIEWS FROM userdb;

七.ddl-spark

1.获取节点关系

Map<String, Set<String>> flowCache = dag.getFlows().stream()

 .collect(Collectors.groupingBy(Flow::getTo, Collectors.mapping(Flow::getFrom, Collectors.toSet())));

2.获取实现 node

//通过反射获取到实现类并调用initFromDdlNode方法

AbstractNode node = Reflect.onClass(nodeTypeEnum.getClassName()).create().call("initFromDdlNode", ddlNode, flowCache.get(ddlNode.getNodeKey())).get();

nodeCache.put(node.getNodeKey(), node);

3.java-api

在 Dataset 中有算子的 api

Dataset<T>

4.agg 函数

//agg函数不能直接传入Columns

result=groupDs.agg(first, rest);


相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
3月前
|
SQL 机器学习/深度学习 分布式计算
Spark适合处理哪些任务?
【9月更文挑战第1天】Spark适合处理哪些任务?
219 3
|
3月前
|
SQL 分布式计算 Hadoop
初识 Spark
【9月更文挑战第1天】. 初识 Spark
61 2
|
7月前
|
存储 缓存 分布式计算
spark BlockManager粗讲
spark BlockManager粗讲
|
SQL 机器学习/深度学习 分布式计算
【Spark】(一)初识 Spark
【Spark】(一)初识 Spark
174 0
【Spark】(一)初识 Spark
|
SQL 机器学习/深度学习 分布式计算
|
存储 机器学习/深度学习 缓存
五分钟零基础介绍 spark
相信大家都听说过火的不能再火、炒得不能再炒的新一代大数据处理框架 Spark. 那么 Spark 是何方神圣?为何大有取代 Hadoop 的势头?Spark 内部又是如何工作的呢?我们会用几篇文章为大家一一介绍。 Hadoop:我不想知道我是怎么来的,我就想知道我是怎么没的? 还是从 Hadoop 处理海量数据的架构说起,一个 Hadoop job 通常都是这样的: 从 HDFS 读取输入数据; 在 Map 阶段使用用户定义的 mapper function, 然后把结果写入磁盘; 在 Reduce 阶段,从各个处于 Map 阶段的机器中读取 Map 计算的中间结果,使用用户定义的 r
140 0
|
分布式计算 Kubernetes Spark
Spark on k8s
前言 Spark 自从2.3版本以来就支持运行在k8s上,本文主要介绍如何运行Spark在阿里云容器服务-Kubernetes。 前提条件 1、 已经购买阿里云容器服务-Kubernetes。购买链接:Kubernetes控制台。
3074 0
|
SQL 分布式计算 大数据
初学Spark
介绍大数据处理引擎Spark的特点,以及它的技术栈
2154 0
|
机器学习/深度学习 分布式计算 TensorFlow