7 支持的数据类型
Flink对DataSet或DataStream中可以包含的元素类型设置了一些限制。 原因是系统分析类型以确定有效的执行策略。
有六种不同类别的数据类型:
- Java 元组 and Scala Case 类
- Java POJOs
- 原生类型
- Regular Classes
- Values
- Hadoop Writables
- Special Types
7.1 元组 and Case 类
7.1.1 Java版本
元组是包含固定数量的具有各种类型的字段的复合类型。 Java API提供从Tuple0到Tuple25的类。
元组的每个字段都可以是包含更多元组的任意的Flink的类型,从而产生嵌套元组。 可以使用字段名称tuple.f4直接访问元组的字段,也可以使用通用getter方法tuple.getField(int position)。 字段索引从0开始。
这与Scala的元组形成对比,但Java的常规索引更为一致。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements( new Tuple2<String, Integer>("hello", 1), new Tuple2<String, Integer>("world", 2)); wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() { @Override public Integer map(Tuple2<String, Integer> value) throws Exception { return value.f1; } }); wordCounts.keyBy(0); // also valid .keyBy("f0")
7.1.2 Scala版本
Scala case类(和Scala元组是case类的特例)是包含固定数量的具有各种类型的字段的复合类型。 元组字段由它们的1偏移名称寻址,例如第一个字段的_1。 字段按名称访问。
case class WordCount(word: String, count: Int) val input = env.fromElements( WordCount("hello", 1), WordCount("world", 2)) // Case Class Data Set input.keyBy("word")// key by field expression "word" val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set input2.keyBy(0, 1) // key by field positions 0 and 1
7.2 POJOs
如果满足以下要求,则Flink将Java和Scala类视为特殊的POJO数据类型:
- public限定
- 它必须有一个没有参数的公共构造函数(默认构造函数)。
- 所有字段都是public的,或者必须通过getter和setter函数访问。 对于名为foo的字段,getter和setter方法必须命名为getFoo()和setFoo()。
- Flink必须支持字段的类型。 目前,Flink使用Avro序列化任意对象(例如Date)。
Flink分析POJO类型的结构,即它了解POJO的字段。 因此,POJO类型比一般类型更容易使用。 此外,Flink可以比一般类型更有效地处理POJO。
以下示例显示了一个包含两个公共字段的简单POJO。
7.2.1 Java版本
public class WordWithCount { public String word; public int count; public WordWithCount() {} public WordWithCount(String word, int count) { this.word = word; this.count = count; } } DataStream<WordWithCount> wordCounts = env.fromElements( new WordWithCount("hello", 1), new WordWithCount("world", 2)); wordCounts.keyBy("word"); // key by field expression "word"
7.2.2 Scala 版本
class WordWithCount(var word: String, var count: Int) { def this() { this(null, -1) } } val input = env.fromElements( new WordWithCount("hello", 1), new WordWithCount("world", 2)) // Case Class Data Set input.keyBy("word")// key by field expression "word"
7.3 原生类型
Flink支持所有Java和Scala原生类型,如Integer,String和Double。
7.4 General Class Types
Flink支持大多数Java和Scala类(API和自定义)。 限制适用于包含无法序列化的字段的类,如文件指针,I / O流或其他本机资源。 遵循Java Beans约定的类通常可以很好地工作。
所有未标识为POJO类型的类都由Flink作为常规类类型处理。 Flink将这些数据类型视为黑盒子,并且无法访问其内容(即,用于有效排序)。 使用序列化框架Kryo对常规类型进行反序列化。
7.5 Values
值类型手动描述其序列化和反序列化。
它们不是通过通用序列化框架,而是通过使用读取和写入方法实现org.apache.flinktypes.Value接口来为这些操作提供自定义代码。当通用序列化效率非常低时,使用值类型是合理的。
一个示例是将元素的稀疏向量实现为数组的数据类型。知道数组大部分为零,可以对非零元素使用特殊编码,而通用序列化只需编写所有数组元素。
org.apache.flinktypes.CopyableValue接口以类似的方式支持手动内部克隆逻辑。
Flink带有与基本数据类型对应的预定义值类型。 (ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,CharValue,BooleanValue)。这些值类型充当基本数据类型的可变变体:它们的值可以被更改,允许程序员重用对象并从垃圾收集器中消除压力。
7.6 Hadoop Writables
可以使用实现org.apache.hadoop.Writable接口的类型。 write()和readFields()方法中定义的序列化逻辑将用于序列化。
7.7 Special Types
可以使用特殊类型,包括Scala的Either,Option和Try
Java API有自己的自定义Either实现。 与Scala的Either类似,它代表两种可能类型的值,左或右。 两者都可用于错误处理或需要输出两种不同类型记录的运算符。
7.8 Type Erasure & Type Inference
仅适用于Java
Java编译器在编译后抛弃了大部分泛型类型信息。这在Java中称为类型擦除。这意味着在运行时,对象的实例不再知道其泛型类型。例如,DataStream 和DataStream 的实例于JVM看起来相同。
Flink在准备执行程序时(当调用程序的主要方法时)需要类型信息。 Flink Java API尝试重建以各种方式丢弃的类型信息,并将其显式存储在数据集和运算符中。您可以通过DataStream.getType()检索类型。该方法返回TypeInformation的一个实例,这是Flink表示类型的内部方式。
类型推断有其局限性,在某些情况下需要程序员的“合作”。这方面的示例是从集合创建数据集的方法,例如
ExecutionEnvironment.fromCollection()
可以在其中传递描述类型的参数。但是像MapFunction <I,O>这样的通用函数也可能需要额外的类型信息。
ResultTypeQueryable接口可以通过输入格式和函数实现,以明确告知API其返回类型。调用函数的输入类型通常可以通过先前操作的结果类型来推断。