翻译:彭慧波,FreeWheel 基础架构大数据开发工程师
Spark是一个当下较为热门的,能同时处理结构化数据和非结构化数据的工具。Spark能够支持诸如integer, long, double, string等在内的基本数据类型,同时也支持包括DATE和TIMESTAMP在内的复杂的数据类型。这些复杂的数据类型需要开发人员花费大量的时间来理解和使用它们。本文将会深入介绍DATE和TIMESTAMP,力图使读者对其有一个深入的了解,避免在使用的过程中犯错。本文将分为以下四个部分:
- DATE的定义及其使用的日历,这也将包括Spark3.0对所使用日历的变化;
- TIMESTAMP的定义及其与时区的联系,TIMESTAMP如何通过时区偏移来描述一个具体的时间点,以及Java8和Spark3.0中所使用的新的时间API的变化;
- Spark中如何通过API来构建DATE和TIMESTAMP值;
- Spark driver收集DATE和TIMESTAMP对象的最佳实践和常见误区。
DATE和日历
对于DATE的定义非常简单,Date是由年,月,日组合而成的一个字段,比如2012年12月31日。但是,年,月,日有各自的取值范围,比如,月的取值范围必须是从1到12,日的取值范围必须根据年和月的不同可以取值为1到28/29/30/31。因此Date值代表的是真实存在的一天。
年、月、日的约束条件和取值范围可能是由许多不同的日历定义的。这些日历有不同的应用场景,有些日历只在特定的地区使用,比如农历;有些日历只在历史上使用,比如儒略历;而国际上和人们日常生活中常用的标准是公历。公历诞生于1582年,后来其纪年日期也被扩展到1582年之前。这种扩展的日历也被称作Proleptic Gregorian公历。
Proleptic Gregorian日历目前已经被pandas,R,Apache Arrow等多个数据处理框架所使用, Spark从3.0版本开始使用Proleptic Gregorian公历。在3.0之前的版本中,Spark同时使用了儒略历和普通公历,对于1582年之前的日期使用儒略历,对于1582年之后的日期使用公历。Spark对于日历的这种使用方式是调用Java 8 之前版本中java.sql.Date API造成的,在Java8及其之后的版本中java.time.LocalDate API废弃了原先使用两种日历的模式,转而使用Proleptic Gregorian公历。
当然,DATE类型并不与时区相关。
TIMESTAMP和时区
TIMESTAMP采用新的字段扩展了DATE类型:小时,分钟,秒(可能拥有小数部分)以及一个session范围内的时区。TIMESTAMP定义了地球上一个具体的时间点。比如,2012年12月31日23时59分59.123456秒,session时区是UTC+01:00。当将TimeStamp值写入非文本数据源(如Parquet)时,这些值只是没有时区信息的点(如UTC中的TimeStamp)。如果使用不同的session时区写入和读取TimeStamp值,可能会看到不同的小时/分钟/秒字段值,但它们实际上对应的是相同的时间点。
当然,小时、分钟和秒也有着各自的取值范围,小时是0-23,分钟和秒是0-59,Spark支持最高到微秒的精度,而微秒的有效范围是0到999,999微秒。
在任何时间点,我们都可以根据所在时区的不同,观察到时钟的不同显示值。同样地,一个时钟的显示值也可以根据所在时区的不同代表许多不同的时间点。时区偏移允许明确地将当前的TimeStamp值绑定到一个具体的时间点。时区偏移被定义为格林尼治标准时间(GMT)或协调世界时(UTC+0)的小时偏移。这样的时区信息表示消除了歧义,但对普通用户来说却是比较不方便。用户更喜欢指出全球的某个具体位置,譬如美国/洛杉矶或欧洲/巴黎等。
如果用具体位置来代替具体的时区偏移信息并与TimeStamp进行绑定的话,将会带来一些额外的问题。譬如,我们必须维护一个特殊的时区数据库,以将时区名称映射到具体的偏移量。 由于Spark是运行在JVM上的,因此它将时区到具体偏移量的映射委托给了Java标准库,该库从Internet分配号码授权机构的时区数据库(IANA TZDB)加载数据。 此外,Java标准库中的映射机制在某些方面会影响Spark的行为。 我们在下面重点介绍其中存在的一些问题。
从Java 8开始,JDK公布了用于操作DATE和TIMESTAMP的新API,Spark从3.0版本也迁移到这些的API中。尽管Java 8和Java7对于时区名称到偏移量的映射使用了相同的源数据库IANA TZDB,但是二者实现的方式还是有所不同的。
举例来说,让我们看一下1883年之前的美国/洛杉矶时区的一个TimeStamp:1883-11-10 00:00:00。 这个时间点之所以与众不同是因为在1883年11月18日当天,所有北美铁路都切换到了一个新的标准时间系统来管理其时间表。使用Java 7的时间API,我们可以获得本地TimeStamp为-08:00的时区偏移量:
scala> java.time.ZoneId.systemDefault
res0: java.time.ZoneId = America/Los_Angeles
scala> java.sql.Timestamp.valueOf("1883-11-10 00:00:00").getTimezoneOffset / 60.0
res1: Double = 8.0
Java 8 API却返回了不同的结果:
scala> java.time.ZoneId.of("America/Los_Angeles")
.getRules.getOffset(java.time.LocalDateTime.parse("1883-11-10T00:00:00"))
res2: java.time.ZoneOffset = -07:52:58
在1883年11月18日之前,时间并不是全球统一的,大多数城镇都各自使用某种形式的本地统一时间,该时间由著名的时钟维护(例如,在教堂的尖顶上或在珠宝商的窗户中) 。 这就是为什么我们会看到如此奇怪的时区偏移。
该示例说明了Java 8时间函数更加精确,并考虑了IANA TZDB的历史数据。 切换到Java 8时间API后,Spark 3.0也从这一改进中受益,并在解决时区偏移方面变得更加精确。
正如我们前面提到的,Spark 3.0也将DATE切换为Proleptic Gregorian日历。TIMESTAMP也是如此。Spark 3.0完全符合 ISO SQL:2016提出的标准,TIMESTAMP的有效范围在0001-01-01 00:00:00到9999-12-31 23:59:59.999999之间。并支持该范围内的所有TimeStamp。 而Spark 2.4和更早版本却存在以下几个问题:
- 0001-01-01 00:00:00至1582-10-03 23:59:59.999999这一范围内的时间, Spark 2.4使用儒略历,不符合ISO SQL:2016的标准。 Spark 3.0则使用了Proleptic Gregorian公历来获取年,月,日等信息。由于采用日历不同,Spark 2.4中存在某些Spark3.0中不存在的日期,譬如,1000-02-29不是有效日期,因为公历中的1000年不是一个闰年。 同时,Spark 2.4在这TimeStamp范围内也将时区名称解析到了错误的时区偏移上。
- 1582-10-04 00:00:00至 1582-10-14 23:59:59.999999 这一范围内的时间,是Spark 3.0中有效的本地TimeStamp范围, Spark 2.4则不存在这样的TimeStamp。
- 1582-10-15 00:00:00至1899-12-31 23:59:59.999999 这一范围内的时间, Spark 3.0使用IANA TZDB中的历史数据可以正确解析时区偏移。Spark 2.4则如我们在上面的示例中所描述的,在某些情况下可能无法正确解析时区名称所对应的时区偏移。
- 1900-01-01 00:00:00至2036-12-31 23:59:59.999999 这一范围内的时间, Spark 3.0和Spark 2.4均符合ANSI SQL标准,并在日期/时间操作(例如获取某个给定月份中的某天)中使用公历。
- 2037-01-01 00:00:00至9999-12-31 23:59:59.999999 这一范围内的时间,由于JDK的bug #8073446,Spark 2.4可以解析时区偏移,但是会在某些特定夏令时出现错误解析的情况。 Spark 3.0则不受此bug的影响。
将时区名称映射到偏移量带来的另一方面问题是夏令时(DST)的应用或切换到另一个标准时区偏移量而导致的本地TimeStamp的重叠。例如,2019年11月3日02:00:00,时钟向后调了1小时到01:00:00。2019-11-03 01:30:00 美国 / 洛杉矶对应的时间点可以映射到2019-11-03 01:30:00 UTC-08:00或2019-11-03 01:30:00 UTC-07:00。在Spark3.0中,本地TimeStamp将时区名称映射到偏移量的时候,切换到夏时制可能会导致本地TimeStamp重叠的情况。如果可能的话,建议在构建TimeStamp时指定确切的时区偏移量。如果未指定偏移量,而只是设置时区名称(例如'2019-11-03 01:30:00 美国 / 洛杉矶'),Spark 3.0将采用较早的偏移量,通常对应于“夏季”。Spark 2.4则有所不同,其采用“冬季”偏移。因此,在时钟向前跳的缺口时间范围内将没有有效的偏移量。从上面的示例可以看出,时区名称到时区偏移量的映射是不明确的,并且不是一对一的。 因此再次强调,在构建TimeStamp时指定确切的时区偏移量,例如TimeStamp '2019-11-03 01:30:00 UTC-07:00'。
接下来我们将讨论时区的偏移量映射问题,ANSI SQL标准定义了以下两种TimeStamp:
1.不带时区的TimeStamp:其包含内容为年,月,日,小时,分钟,秒。 这些时间单位不与任何时区绑定,实际上就是我们常看到墙上挂着的时钟时间点。
2.带有时区的TimeStamp:其包含内容为年,月,日,小时,分钟,秒,时区偏移小时,时区偏移分钟。 带有时区的TimeStamp表示了由UTC时区中的时刻与当前时区偏移量根据小时和分钟分别进行计算组合而成的时间值。
带有时区的TimeStamp中包含的时区偏移量不会影响TimeStamp本身所表示的物理时间点。 相反,时区偏移量会影响需要显示或者打印TimeStamp时候的一些操作,比如,日期/时间提取(例如EXTRACT)以及其它与时区相关的操作(例如向TimeStamp添加月份)。
Spark SQL将TimeStamp类型定义为带session时区的TimeStamp,这是由字段年,月,日,小时,分钟,秒,session时区的组合,其中年到秒这部分字段标识了UTC时间的某一时刻。 session时区则是从Spark SQL配置中spark.sql.session.timeZone参数值获取。 其中session时区可以进行如下设置:
1.时区偏移量的形式为 '(+|-) HH:mm', 这种形式能使我们能够准确地定义一个物理时间点。
2.时区名称也称之为时区ID,其具体表现形式为“区域/城市”,例如“ 美国/洛杉矶”。 但是,这种形式的时区信息会使我们遇到上面描述的一些问题,例如本地TimeStamp的重叠。对于任何时区ID,每个UTC时刻都明确地与一个时区偏移量相关联,因此,每个具有时区ID的TimeStamp都可以明确地转换为具有区域偏移量的TimeStamp。
默认情况下,session时区会设置为Java虚拟机的默认时区。
Spark中带有session时区的TimeStamp与以下TimeStamp有所不同:
1.不带时区的TimeStamp,因为不带时区的TimeStamp值可以映射到多个物理时刻,但是带有session时区的TimeStamp的任何值都是具体的物理时刻。因此,写Spark SQL的时候可以通过在所有会话中使用一个固定的时区偏移来进行统一,例如UTC+0。 在这种情况下,我们可以将UTC的TimeStamp视为本地TimeStamp。
2.带有时区的TimeStamp,因为根据SQL标准,该类型的列值可以具有不同的时区偏移量。 Spark SQL不支持该功能。
我们应该注意到,与session时区关联的TimeStamp并不是Spark SQL的新发明。 Oracle等关系型数据库也为TimeStamp提供了类似的类型:当前时区TimeStamp。
构建Date和TimeStamp
Spark SQL提供了一些构造Date和TimeStamp值的方法:
- 不带参数的默认构造函数:
CURRENT_TIMESTAMP()和CURRENT_DATE()
- 来自其他原始Spark SQL类型,例如INT,LONG和STRING;
- 来自外部类型,例如Python datetime或Java类java.time.LocalDate / Instant;
4.从数据源CSV,JSON,Avro,Parquet,ORC或其他类型中反序列化而来。
Spark 3.0中引入的函数MAKE_DATE具有三个参数:年,月,日。这三个参数共同构成了Date值。Spark尽可能将所有输入的参数隐式转换为INT类型。 该函数检查结果Date值在Proleptic Gregorian公历中是否是有效的,如果不是则返回NULL。 例如在PySpark中:
>>> spark.createDataFrame([(2020, 6, 26), (1000, 2, 29), (-44, 1, 1)],
... ['Y', 'M', 'D']).createTempView('YMD')
>>> df = sql('select make_date(Y, M, D) as date from YMD')
>>> df.printSchema()
root
|-- date: date (nullable = true)
为了打印DataFrame的内容,可以调用Spark 中的show() 算子,在executor端将Date转为字符串,并将字符串发送到driver端并在控制台上输出:
>>> df.show()
+-----------+
| date|
+-----------+
| 2020-06-26|
| null|
|-0044-01-01|
+-----------+
同样,可以通过MAKE_TIMESTAMP函数设置TimeStamp值。 与MAKE_DATE函数一样,MAKE_TIMESTAMP对Date字段执行相同的验证,并对时间字段进行额外的验证,小时的允许范围为0-23,分钟的允许范围为0-59,秒的允许范围为0-60。其中,秒的类型为小数,其精度为8,包含6位小数位。由于秒可以进一步细化,因此小数部分的精度最高为微秒。 例如在PySpark中:
>>> df = spark.createDataFrame([(2020, 6, 28, 10, 31, 30.123456),
... (1582, 10, 10, 0, 1, 2.0001), (2019, 2, 29, 9, 29, 1.0)],
... ['YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND'])
>>> df.show()
+----+-----+---+----+------+---------+
|YEAR|MONTH|DAY|HOUR|MINUTE| SECOND|
+----+-----+---+----+------+---------+
|2020| 6| 28| 10| 31|30.123456|
|1582| 10| 10| 0| 1| 2.0001|
|2019| 2| 29| 9| 29| 1.0|
+----+-----+---+----+------+---------+
>>> ts = df.selectExpr("make_timestamp(YEAR, MONTH, DAY, HOUR, MINUTE, SECOND) as MAKE_TIMESTAMP")
>>> ts.printSchema()
root
|-- MAKE_TIMESTAMP: timestamp (nullable = true)
就像处理Date一样,让我们使用show()操作打印ts DataFrame的内容。 以类似的方式,show()将TimeStamp转换为字符串,但是现在它考虑了由Spark SQL配置spark.sql.session.timeZone定义的session时区参数。 在以下示例中,我们将看到这一点。
>>> ts.show(truncate=False)
+--------------------------+
|MAKE_TIMESTAMP |
+--------------------------+
|2020-06-28 10:31:30.123456|
|1582-10-10 00:01:02.0001 |
|null |
Spark无法创建最后一个TimeStamp,因为2019年不是一个闰年,该日期是无效的。
此时你可能会注意到,上面的示例中我们没有提供任何时区信息。 在这种情况下,Spark从spark.sql.session.timeZone中获取时区值,并将其应用于函数调用。当然,也可以通过将其他时区作为MAKE_TIMESTAMP的最后一个参数传递来选择其他时区。以下是PySpark中的示例:
>>> df = spark.createDataFrame([(2020, 6, 28, 10, 31, 30, 'UTC'),
... (1582, 10, 10, 0, 1, 2, 'America/Los_Angeles'),
... (2019, 2, 28, 9, 29, 1, 'Europe/Moscow')],
... ['YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'TZ'])
>>> df = df.selectExpr('make_timestamp(YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, TZ) as MAKE_TIMESTAMP')
>>> df = df.selectExpr("date_format(MAKE_TIMESTAMP, 'yyyy-MM-dd HH:mm:SS VV') AS TIMESTAMP_STRING")
>>> df.show(truncate=False)
+---------------------------------+
|TIMESTAMP_STRING |
+---------------------------------+
|2020-06-28 13:31:00 Europe/Moscow|
|1582-10-10 10:24:00 Europe/Moscow|
|2019-02-28 09:29:00 Europe/Moscow|
+---------------------------------+
如上例所示,Spark考虑了指定时区的问题,但是会将所有本地TimeStamp都调整为session时区。 因此,传递给MAKE_TIMESTAMP函数的原始时区信息将丢失,因为带有session时区的TimeStamp类型假定了所有值都属于一个时区,甚至不为每个值存储一个时区信息。根据带有session时区的TimeStamp的定义,Spark在UTC时区中存储本地TimeStamp,并在提取日期/时间字段或将TimeStamp转换为字符串的时候使用session时区。
同样,可以使用LONG类型构造TimeStamp。 如果LONG类型值是自1970年1月1日00:00:00Z以来的秒数,则可以将其强制转换为Spark SQL中的TimeStamp:
spark-sql> select CAST(-123456789 AS TIMESTAMP);
1966-02-02 05:26:51
不幸的是,当前这种方法不允许我们指定秒的小数部分。 未来,Spark SQL将提供一些特定的函数通过自1970年1月1日00:00:00Z以来的秒,毫秒和微秒来建立
TimeStamp:timestamp_seconds(),timestamp_millis()和timestamp_micros()。
另一种方法是通过STRING类型值来构造Date和TimeStamp。 我们可以使用特殊的关键字来进行转化:
spark-sql> select timestamp '2020-06-28 22:17:33.123456 Europe/Amsterdam', date '2020-07-01';
2020-06-28 23:17:33.123456 2020-07-01
或通过强制转换,可以将转换应用到列中的所有值:
spark-sql> select cast('2020-06-28 22:17:33.123456 Europe/Amsterdam' as timestamp), cast('2020-07-01' as date);
2020-06-28 23:17:33.123456 2020-07-01
如果在输入字符串中省略了时区信息,Spark则将输入TimeStamp字符串解释为指定时区或session时区中的本地TimeStamp。此外,可以使用to_timestamp()函数将具有特定模式的字符串转换为TimeStamp。 在Datetime Patterns for Formatting and Parsing一文中描述了支持进行转换的匹配模式:
spark-sql> select to_timestamp('28/6/2020 22.17.33', 'dd/M/yyyy HH.mm.ss');
2020-06-28 22:17:33
如果未指定任何匹配模式,则to_timestamp()函数的行为类似于强制转换。
为了提高可用性,Spark SQL可以在上述所有接受字符串并返回TimeStamp和Date的方法中识别特殊字符串值:
- epoch是Date ”1970-01-01”或TimeStamp ” 1970-01-01 00:00:00Z”的别名;
- now是session时区的当前Date或TimeStamp。 在单个查询中,它总是产生相同的结果;
- today是TIMESTAMP类型的当前日的开始,或者是DATE类型的当前日的开始;
- tomorrow是TIMESTAMP类型的第二天的开始,或者是DATE类型的第二天的开始。
- yesterday是TIMESTAMP类型的当前日的前一天的开始。
例如,
spark-sql> select timestamp 'yesterday', timestamp 'today', timestamp 'now', timestamp 'tomorrow';
2020-06-27 00:00:00 2020-06-28 00:00:00 2020-06-28 23:07:07.18 2020-06-29 00:00:00
spark-sql> select date 'yesterday', date 'today', date 'now', date 'tomorrow';
2020-06-27 2020-06-28 2020-06-28 2020-06-29
Spark有一个强大的功能是可以从driver端的存在的外部对象集合中构造数据集,并创建相应类型的列。 Spark将外部类型的相关数据转换为语义上等效的内部表示形式。 PySpark允许从Python集合中的DATE和TIMESTAMP列创建数据集,例如:
>>> import datetime
>>> df = spark.createDataFrame([(datetime.datetime(2020, 7, 1, 0, 0, 0),
... datetime.date(2020, 7, 1))], ['timestamp', 'date'])
>>> df.show()
+-------------------+----------+
| timestamp| date|
+-------------------+----------+
|2020-07-01 00:00:00|2020-07-01|
+-------------------+----------+
PySpark使用系统时区在driver端将Python的datetime对象转换为内部Spark SQL表示形式,该时区可能与Spark中spark.sql.session.timeZone设置的session时区不同。Spark SQL内部值不包含任何有关原始时区的信息。 根据带session时区的TimeStamp的定义,接下来并行化对Date和TimeStamp的操作的时候将仅考虑Spark SQL中的session时区。
与我们上面针对Python集合演示的方式类似,Spark在Java / Scala API中将以下类型识别为外部date-time类型:
- java.sql.Date和java.time.LocalDate作为Spark SQL的DATE类型的外部类型;
- java.sql.Timestamp和java.time.Instant作为Spark SQL的TIMESTAMP类型的外部类型。
java.sql. 和java.time. 之间是有所区别的。Java 8中添加了基于Proleptic Gregorian日历的java.time.LocalDate类和java.time.Instant类,Proleptic Gregorian日历同时也应用于Spark3.0中。java.sql.Date和java.sql.Timestamp类却从1582年10月15日以来,引用了儒略历和公历混合的模式,这种方式同时与Spark3.0之前的版本使用的日历方式相同。由于日历系统的不同,Spark必须在转换为内部Spark SQL的时候执行其他操作,并将输入Date/TimeStamp值所引用的日历进行转换。对于1900年以后的TimeStamp而言,重置操作的开销很小,但对于1900年以前的TimeStamp却又不小的负担。
下面的示例显示了如何使用Scala集合制作TimeStamp。 在第一个示例中,说明了如何从字符串构造一个java.sql.Timestamp对象。 valueOf()函数将输入字符串解释为默认JVM时区中的本地TimeStamp,该时区可能与Spark的session时区不同。如果需要在特定时区构造java.sql.Timestamp或java.sql.Date的对象,建议使用java.text.SimpleDateFormat类(及其setTimeZone方法)或java.util.Calendar类。
scala> Seq(java.sql.Timestamp.valueOf("2020-06-29 22:41:30"), new java.sql.Timestamp(0)).toDF("ts").show(false)
+-------------------+
|ts |
+-------------------+
|2020-06-29 22:41:30|
|1970-01-01 03:00:00|
+-------------------+
scala> Seq(java.time.Instant.ofEpochSecond(-12219261484L), java.time.Instant.EPOCH).toDF("ts").show
+-------------------+
| ts|
+-------------------+
|1582-10-15 11:12:13|
|1970-01-01 03:00:00|
+-------------------+
同样,我们可以从java.sql.Date类或java.LocalDate类的对象集合中创建一个日期列。 java.LocalDate对象的转化完全独立于Spark的session时区或JVM默认时区,但是java.sql.Date对象却并非如此:
- java.sql.Date对象表示Spark driver端上默认JVM时区的本地日期;
- 为了正确转换Spark SQL值,driver和executor上的默认JVM时区必须相同。
scala> Seq(java.time.LocalDate.of(2020, 2, 29), java.time.LocalDate.now).toDF("date").show
+----------+
| date|
+----------+
|2020-02-29|
|2020-06-29|
+----------+
为了避免任何与日历和时区相关的问题,我们建议在Java / Scala并行化操作TimeStamp或Date的时候使用Java 8中的java.LocalDate/ Instant作为外部数据类型。
收集Date和TimeStamp
并行化执行的逆操作是将executor端的Date和TimeStamp收集回driver端,并返回一组外部可打印的数据类型。 例如,在上面的示例中,我们可以通过collect()算子将DataFrame拉回到driver端:
>>> df.collect()
[Row(timestamp=datetime.datetime(2020, 7, 1, 0, 0), date=datetime.date(2020, 7, 1))]
Spark将UTC时区的date和timestamps列的值作为时间点从executor端传输到driver端,并在driver端根据系统时区将其转换为Python datetime对象,而不使用Spark SQL会话时区。 collect()算子与上一节中描述的show()算子不同。 show()在将TimeStamp转换为字符串的时候使用session时区,并在driver端收集结果字符串。
在Java和Scala API中,Spark默认执行以下转换:
- Spark SQL中的Date值将转换为java.sql.Date的对象;
- Spark SQL中的TimeStamp值转换为java.sql.Timestamp的对象。
以上两种转换均在driver端的默认JVM时区中执行。 这样,通过Date.getDay(),getHour()等函数以及通过Spark SQL函数DAY,HOUR变量,根据driver端上的默认JVM时区或executor端session时区对于同一时间列进行转化获取的结果是相同的。
与使用java.sql.Date/Timestamp类创建Date和TimeStamp类似,Spark 3.0的执行需要从Proleptic Gregorian日历到混合日历(儒略历+公历)的重新设置。 对于1582年之后Date和1900年之后的TimeStamp而言,此操作几乎没有什么额外的开销,但是对于1582年之前的Date和1900年之前的TimeStamp则可能会带来一些开销。
如果将SQL 配置中spark.sql.datetime.java8API.enabled参数设置为true的话,则可以避免此类与日历相关的问题,此时Spark返回自Java 8开始添加的java.time类型。则Dataset的collect()算子将返回将返回如下结果:
- Spark SQL的DATE类型对应的是java.time.LocalDate类;
- Spark SQL的TIMESTAMP类型对应的是java.time.Instant类。
现在,转换不会遇到与日历相关的问题,因为Java 8类型和Spark SQL 3.0都基于Proleptic Gregorian日历。 collect()算子不再取决于默认的JVM时区,TimeStamp转换完全不取决于时区。 关于Date的转换,其使用了SQL 配置中spark.sql.session.timeZone设置的session时区。 例如,让我们看一个具有DATE和TIMESTAMP列的数据集,将默认JVM时区设置为欧洲/莫斯科,将会话时区设置为美国/洛杉矶。
scala> java.util.TimeZone.getDefault
res1: java.util.TimeZone = sun.util.calendar.ZoneInfo[id="Europe/Moscow",...]
scala> spark.conf.get("spark.sql.session.timeZone")
res2: String = America/Los_Angeles
scala> df.show
+-------------------+----------+
| timestamp| date|
+-------------------+----------+
|2020-07-01 00:00:00|2020-07-01|
+-------------------+----------+
show()算子会打印带有session时区美国/洛杉矶的TimeStamp,但是如果我们需要收集数据集,它将被转换为java.sql.Timestamp对象并通过toString方法显示具有欧洲/莫斯科的字符串:
scala> df.collect()
res16: Array[org.apache.spark.sql.Row] = Array([2020-07-01 10:00:00.0,2020-07-01])
scala> df.collect()(0).getAs[java.sql.Timestamp](0).toString
res18: java.sql.Timestamp = 2020-07-01 10:00:00.0
实际上,本地TimeStamp ‘2020-07-01 00:00:00’对应的是UTC的2020-07-01T07:00:00Z。可以观察到,如果启用Java 8 API并收集数据集将是如下的结果:
scala> df.collect()
res27: Array[org.apache.spark.sql.Row] = Array([2020-07-01T07:00:00Z,2020-07-01])
java.time.Instant对象将被转换为任何本地TimeStamp而不依赖于任何JVM时区信息。这是java.time.Instant相对于java.sql.Timestamp的优点之一。
java.sql.Timestamp要求更改全局JVM设置,这会影响同一JVM上的其他TimeStamp。因此,如果需要应用程序在不同时区中处理Date或TimeStamp,并且通过Java / Scala Dataset.collect() API将数据收集到driver端的时候,应用程序之间不会发生冲突的话,在Spark SQL中配置spark.sql.datetime.java8API.enabled,切换到Java 8 API 的配置。
总结
在此博客文章中,我们描述了Spark SQL DATE和TIMESTAMP类型,展示了如何从其它原始Spark SQL类型和外部Java类型构造Date和TimeStamp列,以及如何将Date和TimeStamp列作为外部Java类型收集回Spark driver端。从Spark 3.0版本开始,Spark从结合了儒略历和普通历的混合日历切换到了Proleptic Gregorian公历(更多信息请参阅SPARK-26651)。如我们先前演示的,这使Spark消除了许多问题。 为了让Spark3.0以前的版本向后兼容,Spark仍可以通过调用collect算子显式指定java.sql.Date和java.sql.Timestamp返回混合日历中的Date和TimeStamp。为避免使用Java / Scala的collect操作时出现日历和时区解析问题,可以通过SQL配置spark.sql.datetime.java8API.enabled来启用Java 8 API。Spark3.0作为Databricks Runtime 7.0的一部分,目前可以在Databricks平台上进行免费试用。
阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!