Apache Storm 官方文档 —— Trident API 概述

本文涉及的产品
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
数据传输服务 DTS,数据同步 1个月
简介:

Trident 的核心数据模型是“流”(Stream),不过与普通的拓扑不同的是,这里的流是作为一连串 batch 来处理的。流是分布在集群中的不同节点上运行的,并且对流的操作也是在流的各个 partition 上并行运行的。

Trident 中有 5 类操作:

  1. 针对每个小分区(partition)的本地操作,这类操作不会产生网络数据传输;
  2. 针对一个数据流的重新分区操作,这类操作不会改变数据流中的内容,但是会产生一定的网络传输;
  3. 通过网络数据传输进行的聚合操作;
  4. 针对数据流的分组操作;
  5. 融合与联结操作。

本地分区操作

本地分区操作是在每个分区块上独立运行的操作,其中不涉及网络数据传输。

函数

函数负责接收一个输入域的集合并选择输出或者不输出 tuple。输出 tuple 的域会被添加到原始数据流的输入域中。如果一个函数不输出 tuple,那么原始的输入 tuple 就会被直接过滤掉。否则,每个输出 tuple 都会复制一份输入 tuple 。假设你有下面这样的函数:

public class MyFunction extends BaseFunction {
    public void execute(TridentTuple tuple, TridentCollector collector) {
        for(int i=0; i < tuple.getInteger(0); i++) {
            collector.emit(new Values(i));
        }
    }
}

再假设你有一个名为 “mystream” 的数据流,这个流中包含下面几个 tuple,每个 tuple 中包含有 “a”、“b”、“c” 三个域:

[1, 2, 3]
[4, 1, 6]
[3, 0, 8]

如果你运行这段代码:

mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))

那么最终输出的结果 tuple 就会包含有 “a”、“b”、“c”、“d” 4 个域,就像下面这样:

[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]

过滤器

过滤器负责判断输入的 tuple 是否需要保留。以下面的过滤器为例:

public class MyFilter extends BaseFilter {
    public boolean isKeep(TridentTuple tuple) {
        return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
    }
}

通过使用这段代码:

mystream.each(new Fields("b", "a"), new MyFilter())

就可以将下面这样带有 “a”、“b”、“c” 三个域的 tuple

[1, 2, 3]
[2, 1, 1]
[2, 3, 4]

最终转化成这样的结果 tuple:

[2, 1, 1]

partitionAggregate

partitionAggregate 会在一批 tuple 的每个分区上执行一个指定的功能操作。与上面的函数不同,由 partitionAggregate发送出的 tuple 会将输入 tuple 的域替换。以下面这段代码为例:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

假如输入流中包含有 “a”、“b” 两个域并且有以下几个 tuple 块:

Partition 0:
["a", 1]
["b", 2]

Partition 1:
["a", 3]
["c", 8]

Partition 2:
["e", 1]
["d", 9]
["d", 10]

经过上面的代码之后,输出就会变成带有一个名为 “sum” 的域的数据流,其中的 tuple 就是这样的:

Partition 0:
[3]

Partition 1:
[11]

Partition 2:
[20]

Storm 有三个用于定义聚合器的接口:CombinerAggregatorReducerAggregator 以及 Aggregator

这是 CombinerAggregator 接口:

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}

CombinerAggregator 会将带有一个域的一个单独的 tuple 返回作为输出。CombinerAggregator 会在每个输入 tuple 上运行初始化函数,然后使用组合函数来组合所有输入的值。如果在某个分区中没有 tuple, CombinerAggregator 就会输出zero 方法的结果。例如,下面是 Count 的实现代码:

public class Count implements CombinerAggregator<Long> {
    public Long init(TridentTuple tuple) {
        return 1L;
    }

    public Long combine(Long val1, Long val2) {
        return val1 + val2;
    }

    public Long zero() {
        return 0L;
    }
}

如果你使用 aggregate 方法来代替 partitionAggregate 方法,你就会发现 CombinerAggregator 的好处了。在这种情况下,Trident 会在发送 tuple 之前通过分区聚合操作来优化计算过程。

ReducerAggregator 的接口实现是这样的:

public interface ReducerAggregator<T> extends Serializable {
    T init();
    T reduce(T curr, TridentTuple tuple);
}

ReducerAggregator 会使用 init 方法来产生一个初始化的值,然后使用该值对每个输入 tuple 进行遍历,并最终生成并输出一个单独的 tuple,这个 tuple 中就包含有我们需要的计算结果值。例如,下面是将 Count 定义为 ReducerAggregator 的代码:

public class Count implements ReducerAggregator<Long> {
    public Long init() {
        return 0L;
    }

    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + 1;
    }
}

ReducerAggregator 同样可以用于 persistentAggregate,你会在后面看到这一点。

最常用的聚合器接口还是下面的 Aggregator 接口:

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
    void complete(T state, TridentCollector collector);
}

Aggregator 聚合器可以生成任意数量的 tuple,这些 tuple 也可以带有任意数量的域。聚合器可以在执行过程中的任意一点输出tuple,他们的执行过程是这样的:

  1. 在处理一批数据之前先调用 init 方法。init 方法的返回值是一个代表着聚合状态的对象,这个对象接下来会被传入 aggregate 方法和 complete 方法中。
  2. 对于一个区块中的每个 tuple 都会调用 aggregate 方法。这个方法能够更新状态并且有选择地输出 tuple。
  3. 在区块中的所有 tuple 都被 aggregate 方法处理之后就会调用 complete 方法。

下面是使用 Count 作为聚合器的代码:

public class CountAgg extends BaseAggregator<CountState> {
    static class CountState {
        long count = 0;
    }

    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState();
    }

    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
        state.count+=1;
    }

    public void complete(CountState state, TridentCollector collector) {
        collector.emit(new Values(state.count));
    }
}

有时你可能会需要同时执行多个聚合操作。这个过程叫做链式处理,可以使用下面这样的代码来实现:

mystream.chainedAgg()
        .partitionAggregate(new Count(), new Fields("count"))
        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
        .chainEnd()

这段代码会在每个分区上分别执行 Count 和 Sum 聚合器,而输出中只会包含一个带有 [“count”, “sum”] 域的单独的 tuple。

stateQuery 与 partitionPersist

stateQuery 与 partitionPersist 会分别查询、更新 state 数据源。你可以参考 Trident State 文档 来了解如何使用它们。

projection

projection 方法只会保留操作中指定的域。如果你有一个带有 [“a”, “b”, “c”, “d”] 域的数据流,通过执行这段代码:

mystream.project(new Fields("b", "d"))

就会使得输出数据流中只包含有 [“b”, “d”] 域。

重分区操作

重分区操作会执行一个用来改变在不同的任务间分配 tuple 的方式的函数。在重分区的过程中分区的数量也可能会发生变化(例如,重分区之后的并行度就有可能会增大)。重分区会产生一定的网络数据传输。下面是重分区操作的几个函数:

  1. shuffle:通过随机轮询算法来重新分配目标区块的所有 tuple。
  2. broadcast:每个 tuple 都会被复制到所有的目标区块中。这个函数在 DRPC 中很有用 —— 比如,你可以使用这个函数来获取每个区块数据的查询结果。
  3. partitionBy:该函数会接收一组域作为参数,并根据这些域来进行分区操作。可以通过对这些域进行哈希化,并对目标分区的数量取模的方法来选取目标区块。partitionBy 函数能够保证来自同一组域的结果总会被发送到相同的目标区间。
  4. global:这种方式下所有的 tuple 都会被发送到同一个目标分区中,而且数据流中的所有的块都会由这个分区处理。
  5. batchGlobal:同一个 batch 块中的所有 tuple 会被发送到同一个区块中。当然,在数据流中的不同区块仍然会分配到不同的区块中。
  6. partition:这个函数使用自定义的分区方法,该方法会实现 backtype.storm.grouping.CustomStreamGrouping 接口。

聚类操作

Trident 使用 aggregate 方法和 persistentAggregate 方法来对数据流进行聚类操作。其中,aggregate 方法会分别对数据流中的每个 batch 进行处理,而 persistentAggregate 方法则会对数据流中的所有 batch 执行聚类处理,并将结果存入某个 state 中。

在数据流上执行 aggregate 方法会执行一个全局的聚类操作。在你使用 ReducerAggregator 或者 Aggregator 时,数据流首先会被重新分区成一个单独的分区,然后聚类函数就会在该分区上执行操作。而在你使用 CombinerAggregator 时,Trident 首先会计算每个分区的部分聚类结果,然后将这些结果重分区到一个单独的分区中,最后在网络数据传输完成之后结束这个聚类过程。CombinerAggregator 比其他的聚合器的运行效率更高,在聚类时应该尽可能使用CombinerAggregator

下面是一个使用 aggregate 来获取一个 batch 的全局计数值的例子:

mystream.aggregate(new Count(), new Fields("count"))

与 partitionAggregate 一样,aggregate 的聚合器也可以进行链式处理。然而,如果你在一个处理链中同时使用了CombinerAggregator 和非 CombinerAggregator,Trident 就不能对部分聚类操作进行优化了。

想要了解更多使用 persistentAggregate 的方法,可以参考 Trident State 文档 一文。

对分组数据流的操作

通过对指定的域执行 partitionBy 操作,groupBy 操作可以将数据流进行重分区,使得相同的域的 tuple 分组可以聚集在一起。例如,下面是一个 groupBy 操作的示例:

groupBy

如果你在分组数据流上执行聚合操作,聚合器会在每个分组(而不是整个区块)上运行。persistentAggregate 同样可以在一个分组数据里上运行,这种情况下聚合结果会存储在 MapState 中,其中的 key 就是分组的域名。

和其他操作一样,对分组数据流的聚合操作也可以以链式的方式执行。

融合(Merge)与联结(join)

Trident API 的最后一部分是联结不同的数据流的操作。联结数据流最简单的方式就是将所有的数据流融合到一个流中。你可以使用 TridentTopology 的 merge 方法实现该操作,比如这样:

topology.merge(stream1, stream2, stream3);

Trident 会将融合后的新数据流的域命名为为第一个数据流的输出域。

联结数据流的另外一种方法是使用 join。像 SQL 那样的标准 join 操作只能用于有限的输入数据集,对于无限的数据集就没有用武之地了。Trident 中的 join 只会应用于每个从 spout 中输出的小 batch。

下面是两个流的 join 操作的示例,其中一个流含有 [“key”, “val1″, “val2″] 域,另外一个流含有 [“x”, “val1″] 域:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

上面的例子会使用 “key” 和 “x” 作为 join 的域来联结 stream1 和 stream2。Trident 要求先定义好新流的输出域,因为输入流的域可能会覆盖新流的域名。从 join 中输出的 tuple 中会包含:

  1. join 域的列表。在这个例子里,输出的 “key” 域与 stream1 的 “key” 域以及 stream2 的 “x” 域对应。
  2. 来自所有流的非 join 域的列表。这个列表是按照传入 join 方法的流的顺序排列的。在这个例子里,“ a” 和 “b” 域与 stream1 的 “val1” 和 “val2” 域对应;而 “c” 域则与 stream2 的 “val1” 域相对应。

在对不同的 spout 发送出的流进行 join 时,这些 spout 上会按照他们发送 batch 的方式进行同步处理。也就是说,一个处理中的 batch 中含有每个 spout 发送出的 tuple。

到这里你大概仍然会对如何进行窗口 join 操作感到困惑。窗口操作(包括平滑窗口、滚动窗口等 —— 译者注)主要是指将当前的 tuple 与过去若干小时时间段内的 tuple 联结起来的过程。

你可以使用 partitionPersist 和 stateQuery 来实现这个过程。过去一段时间内的 tuple 会以 join 域为关键字被保存到一个 state 源中。然后就可以使用 stateQuery 查询 join 域来实现这个“联结”(join)的过程。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
Sqoop 企业级大数据迁移方案实战
Sqoop是一个用于在Hadoop和关系数据库服务器之间传输数据的工具。它用于从关系数据库(如MySQL,Oracle)导入数据到Hadoop HDFS,并从Hadoop文件系统导出到关系数据库。 本课程主要讲解了Sqoop的设计思想及原理、部署安装及配置、详细具体的使用方法技巧与实操案例、企业级任务管理等。结合日常工作实践,培养解决实际问题的能力。本课程由黑马程序员提供。
相关文章
|
1月前
|
API
阿里云短信服务文档与实际API不符
阿里云短信服务文档与实际API不符
|
16天前
|
监控 负载均衡 API
Apache Apisix轻松打造亿级流量Api网关
Apache APISIX 是一个动态、实时、高性能的 API 网关,提供负载均衡、动态上行、灰度发布、熔断、鉴权、可观测等丰富的流量管理功能。适用于处理传统南北向流量、服务间东西向流量及 k8s 入口控制。Airflow 是一个可编程、调度和监控的工作流平台,基于有向无环图 (DAG) 定义和执行任务,提供丰富的命令行工具和 Web 管理界面,方便系统运维和管理。
Apache Apisix轻松打造亿级流量Api网关
|
8天前
|
JSON 前端开发 API
后端开发中的API设计与文档编写指南####
本文探讨了后端开发中API设计的重要性,并详细阐述了如何编写高效、可维护的API接口。通过实际案例分析,文章强调了清晰的API设计对于前后端分离项目的关键作用,以及良好的文档习惯如何促进团队协作和提升开发效率。 ####
|
4月前
|
Java API 开发者
在Spring Boot中集成Swagger API文档
在Spring Boot中集成Swagger API文档
|
19天前
|
供应链 数据挖掘 API
电商API接口介绍——sku接口概述
商品SKU(Stock Keeping Unit)接口是电商API接口中的一种,专门用于获取商品的SKU信息。SKU是库存量单位,用于区分同一商品的不同规格、颜色、尺寸等属性。通过商品SKU接口,开发者可以获取商品的SKU列表、SKU属性、库存数量等详细信息。
|
1月前
|
分布式计算 Java 大数据
大数据-147 Apache Kudu 常用 Java API 增删改查
大数据-147 Apache Kudu 常用 Java API 增删改查
28 1
|
1月前
|
机器学习/深度学习 算法 API
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
|
2月前
|
JSON 缓存 API
淘系商品详情API接口概述,API文档说明
在成长的路上,我们都是同行者。这篇关于API接口的文章,希望能帮助到您。期待与您继续分享更多API接口的知识,请记得关注Anzexi58哦! 淘宝API接口文档是淘宝开放平台为开发者提供的一套详细的技术规范和使用指南,旨在帮助开发者通过API接口与淘宝平台进行交互,获取商品详情等数据。以下是对淘宝商品详情数据解析的详细说明:
|
3月前
|
Java API 数据中心
百炼平台Java 集成API上传文档到数据中心并添加索引
本文主要演示阿里云百炼产品,如何通过API实现数据中心文档的上传和索引的添加。
|
3月前
|
存储 算法 Oracle
19 Java8概述(Java8概述+lambda表达式+函数式接口+方法引用+Stream+新时间API)
19 Java8概述(Java8概述+lambda表达式+函数式接口+方法引用+Stream+新时间API)
68 8

推荐镜像

更多