[Flink]Flink1.3 指南五 指定keys

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 一些转换(例如,join,coGroup,keyBy,groupBy)要求在一组元素上定义一个key。其他转换(Reduce,GroupReduce,Aggregate,Windows)允许在使用这些函数之前对数据进行分组。

一些转换(例如,join,coGroup,keyBy,groupBy)要求在一组元素上定义一个key。其他转换(Reduce,GroupReduce,Aggregate,Windows)允许在使用这些函数之前对数据进行分组。

一个DataSet进行分组如下:

DataSet<...> input = // [...]
DataSet<...> reduced = input.groupBy(/*define key here*/).reduceGroup(/*do something*/);

而使用DataStream可以指定一个key:

DataStream<...> input = // [...]
DataStream<...> windowed = input.keyBy(/*define key here*/).window(/*window specification*/);

Flink的数据模型不是基于键值对。因此,你不需要将数据集类型物理上打包成keys和values。keys是"虚拟":它们只是被定义在实际数据之上的函数,以指导分组算子使用。

备注:

在下面的讨论中,我们将使用DataStream API和keyBy。对于DataSet API,你只需要替换为DataSet和groupBy即可。

下面介绍几种Flink定义keys方法。

1. 为Tuples类型定义keys

最简单的情况是在元组的一个或多个字段上对元组进行分组:

下面是在元组的第一个字段(整数类型)上进行分组。

Java版本:

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

Scala版本:

val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)

下面,我们将在复合key上对元组进行分组,复合key包含元组的第一个和第二个字段:

Java版本:

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)

Scala版本:

val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)

如果你有一个包含嵌套元组的DataStream,例如:

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

如果指定keyBy(0),则使用整个Tuple2作为key(以Integer和Float为key)。如果要到嵌套的Tuple2的某个字段中,则必须使用下面说明的字段表达式指定keys。

2. 使用字段表达式定义keys

你可以使用基于字符串的字段表达式来引用嵌套字段以及定义用于分组,排序,连接或coGrouping的key。

字段表达式可以非常容易地选择(嵌套)复合类型(如Tuple和POJO类型)中的字段。

在下面的例子中,我们有一个WC POJO,它有两个字段wordcount。如果想通过字段word分组,我们只需将word传递给keyBy()函数即可。

// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

字段表达式语法:

(1) 按其字段名称选择POJO字段。例如,user是指POJO类型的user字段。

(2) 通过字段名称或0到字段索引偏移量之间的数值选择元组字段(field name or 0-offset field index)。例如,f05分别指Java元组类型的第一和第六字段。

(3) 你可以在POJO和元组中选择嵌套字段。例如,user.zip是指存储在POJO类型的user字段中的POJO类型的zip字段。支持POJO和Tuples的任意嵌套和混合,如f1.user.zipuser.f3.1.zip

(4) 你可以使用*通配符表达式选择完整类型。这也适用于不是元组或POJO类型的类型。

Example:

public static class WC {
  public ComplexNestedClass complex; //nested POJO
  private int count;
  // getter / setter for private field (count)
  public int getCount() {
    return count;
  }
  public void setCount(int c) {
    this.count = c;
  }
}
public static class ComplexNestedClass {
  public Integer someNumber;
  public float someFloat;
  public Tuple3<Long, Long, String> word;
  public IntWritable hadoopCitizen;
}

下面是上述示例代码的有效字段表达式:

count:WC类中的count字段。
complex:递归地选择复合字段POJO类型ComplexNestedClass的所有字段。
complex.word.f2:选择嵌套字段Tuple3的最后一个字段。
complex.hadoopCitizen:选择Hadoop IntWritable类型。

3. 使用key选择器函数定义keys

定义key的另一种方法是key选择器函数。key选择器函数将单个元素作为输入,并返回元素的key。key可以是任何类型的。

以下示例显示了一个key选择器函数,它只返回一个对象的字段:

Java版本:

public class WC {
  public String word; public int count;
}

DataStream<WC> words = // [...]
KeyedStream<WC> kyed = words.keyBy(new KeySelector<WC, String>() {
     public String getKey(WC wc) { return wc.word; }
});

Scala版本:

case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )

备注:

Flink版本:1.3

原文:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#specifying-keys

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
流计算 数据安全/隐私保护 Windows
[Flink]Flink1.3 Stream指南七 理解事件时间与Watermarks
Flink实现了数据流模型(Dataflow Model)中许多技术。如果想对事件时间(event time)和水位线(watermarks)更详细的了解,请参阅下面的文章: The world beyond batch: Streaming 101 The Dataflow Model 支持事件时间的流处理器需要一种方法来衡量事件时间的进度。
1539 0
|
流计算
[Flink]Flink1.3 Stream指南八 图解事件时间与Watermarks
如果你正在构建实时流处理应用程序,那么事件时间处理是你迟早必须使用的功能之一。因为在现实世界的大多数用例中,消息到达都是无序的,应该有一些方法,通过你建立的系统知道消息可能延迟到达,并且有相应的处理方案。
1683 0
|
流计算 Java Scala
[Flink]Flink1.3 Stream指南六 事件时间与处理时间
Flink在数据流中支持几种不同概念的时间。 1. 处理时间 Processing Time Processing Time(处理时间)是指执行相应操作机器的系统时间(Processing time refers to the system time of the machine that is executing the respective operation.)。
1841 0
|
流计算 API Windows
[Flink]Flink1.3 Stream指南五 窗口触发器与驱逐器
1. 窗口触发器 触发器(Trigger)确定窗口(由窗口分配器形成)何时准备好被窗口函数处理。每个窗口分配器都带有默认触发器。
2631 0
|
Java Scala 流计算
[Flink]Flink1.3 Stream指南三 窗口分配器
1.4版本:Flink1.4 窗口概述 Windows(窗口)是处理无限数据流的核心。Windows将流分解成有限大小的"桶",在上面我们可以进行计算。
1983 0
|
Java 流计算 Maven
[Flink]Flink1.3 Batch指南二 集群运行
Flink程序可以分布在许多机器的群集上。有两种方式可以将程序发送到集群上运行: (1) 命令行接口 (2) 远程环境 1. 命令行接口 命令行接口允许你将打包程序(JAR)提交到集群(或单机配置)。
1559 0
|
Java Scala 流计算
[Flink]Flink1.3 Stream指南四 窗口函数
1.4版本:Flink1.4 窗口函数 在定义窗口分配器之后,我们需要在每个窗口上指定我们要执行的计算。这是窗口函数的责任,一旦系统确定窗口准备好处理数据,窗口函数就处理每个窗口中的元素。
1724 0
|
算法 Java 流计算
[Flink]Flink1.3 Batch指南一 本地运行
Flink可以在单台机器上运行,甚至可以在单个Java虚拟机中运行。 这运行机制可以方便用户在本地测试和调试Flink程序。
2129 0
|
资源调度 Java 流计算
[Flink]Flink1.3 指南四 命令行接口
Flink提供了一个命令行接口(CLI)用来运行打成JAR包的程序,并且可以控制程序的运行。命令行接口在Flink安装完之后即可拥有,本地单节点或是分布式的部署安装都会有命令行接口。
3425 0
|
Java API Apache
[Flink]Flink1.3 指南二 安装与启动
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/78276595 1. 下载 Flink 可以运行在 Linux, Mac OS X和Windows上。
1739 0