Storm中Trident Operations详解(一)

简介: 笔记

一、Trident 操作类型及使用场景


Trident 有 5 类操作:

1.Partition-local operations: 对每个 partition 的局部操作, 不产生网络传输


2.Repartitioning operations: 对 stream (数据流)的重新划分(仅仅是划分, 但不改变内容), 产生网络传输


3.Aggregation operations (聚合操作): 作为 operation (操作)的一部分进行网络传输


4.Operations on grouped streams: 作用在分组流上的操作


5.Merges 和 joins 操作


20.png


二、Tuple 过滤Filter函数操作


each() 方法中操作batch中的每一个tuple内容,一般与Filter 或者Function函数配合使用。 Filters 收到一个输入 tuple , 并决定是否保留该 tuple

/**
 * Filter Operations
 */
private static class OperFilter extends BaseFilter{
    @Override
    public boolean isKeep(TridentTuple tuple) {
        int amt = tuple.getIntegerByField("amt");
        return amt >= 30;
    }
}
Stream operStream = topology.newStream("spout1", spout).shuffle().parallelismHint(5)
        // "time","amt","city","product"    -> Filter ->   amt < 30
        .each(new Fields("time","amt","city","product"),new OperFilter())


三、Function 函数操作


它比Filter多出一个输出字段,作用是每个tuple在经过这个Function函数处理后,输出字段都会被追加到tuple后 面。

/**
 * Function Operations:它比Filter多出一个输出字段,作用是每个tuple在经过这个Function函数处理后,输出字段都会被追加到tuple后面。
 */
private static class OperFunction extends BaseFunction{
    int numPar;
    @Override
    public void prepare(Map conf, TridentOperationContext context) {
        numPar = context.numPartitions();
        super.prepare(conf, context);
    }
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        String date = tuple.getStringByField("time");
        int amt = tuple.getIntegerByField("amt");
        String _date = date.substring(0, 10);
        System.out.println("原数据" + "   [" + _date + " : " + amt + "]");
        collector.emit(new Values(_date));
    }
}
Stream operStream = topology.newStream("spout1", spout).shuffle().parallelismHint(5)
                // "time","amt","city","product"    -> Function ->   "time","amt","city","product","_date"
                .each(new Fields("time","amt","city","product"),new OperFunction(),new Fields("_date"))


四、Tuple 投影操作


经 Stream 中的 project 方法处理后的 tuple 仅保持指定字段(相当于过滤字段)

如果你有一个包含字段 [“a”, “b”, “c”, “d”] 的 stream , 执行下面代码:

mystream.project(new Fields("b" "d"))

则 output stream 将仅包含 [“b”, “d”] 字段

// "time","amt","city","product","_date" -> project -> "_date","amt"
.project(new Fields("_date","amt"))


五、Partition重分区操作


21.png

Partition概念


partition中文意思是分区,有人将partition理解为Storm里面的task,即并发的基本执行单位。我理解 应该是像数据库里面的分区,是将一个batch的数据分区,分成多个partition,或者可以理解为多个子 batch,然后多个partition可以并发处理。这里关键的区别是:partition是数据,不是执行的代码。你把数 据(tuple)分区以后,如果你没有多个task(并发度)来处理这些分区后的数据,那分区也是没有作用的。 所以这里的关系是这样的:先有batch,因为Trident内部是基于batch来实现的;然后有partition;分区后 再分配并发度,然后才能进行并发处理。并发度的分配是利用parallelismHint来实现的。


重分区操作有如下几种:

shuffle: 通过随机分配算法来均衡tuple到各个分区

broadcast: 每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery

partitionBy: 根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确 保相同字段列表的数据被划分到同一个分区

global: 所有的tuple都被发送到一个分区,这个分区用来处理整个Stream

batchGlobal: 一个Batch中的所有tuple都被发送到同一个分区,不同的Batch会去往不同的分区

Partition: 通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping

// "_date","amt" -> partitionBy 按照 "_date" 分区
.partitionBy(new Fields("_date"))
// 设置并行度为3
.each(new Fields("_date","amt"),new PartitionFunction(),new Fields()).parallelismHint(3);


六、Aggregate聚合操作


Trident 中有 aggregate() 和 persistentAggregate() 方法对流进行聚合操作.


aggregate() 在每个 batch 上 独立的执行,对每个 batch 中的所有 tuple 进行聚合

persistemAggregate() 对所有 batch 中的所有 tuple 进行聚合, 并将结果存入 state 源中.

aggregate() 对 Stream 做全局聚合, 当使用 ReduceAggregator 或者 Aggregator 聚合器时, 流先被重新划分成 一个大分区(仅有一个 partition ), 然后对这个 partition 做聚合操作;另外, 当使用 CombinerAggregator 时, Trident 首先对每个 partition 局部聚合, 然后将所有这些 partition 重新划分到一个 partition 中, 完成全局聚合.相 比而言, CombinerAggregator 更高效, 推荐使用.

22.png

(1)ReducerAggregator聚合器详解

ReducerAggregator使用init()方法产生一个初始值,对于每个输入tuple,依次迭代这个初始值,最终产生一个单值输出tuple。


aggregate()对流做全局聚合,当使用ReduceAggregator或者Aggregator聚合器时,流先被重 新划分成一个大分区(仅有一个partition),然后对这个partition做聚合操作;

/**
 * ReducerAggregator:ReducerAggregator使用init()方法产生一个初始值,对于每个输入tuple,依次迭代这个初始 值,最终产生一个单值输出tuple。
 */
private static class MyReducerAggregate implements ReducerAggregator {
    @Override
    public Object init() {
        return 0L;
    }
    @Override
    public Object reduce(Object curr, TridentTuple tuple) {
        int amt = tuple.getIntegerByField("amt");
        System.out.println(curr + ":" + amt);
        return (long)curr + amt;
    }
}
// aggregate() 在每个 batch 上 独立的执行,对每个 batch 中的所有 tuple 进行聚合
.aggregate(new Fields("amt"),new MyReducerAggregate(),new Fields("amt"))

(2)CombinerAggregator聚合器详解

一个CombinerAggregator仅输出一个tuple(该tuple也只有一个字段)。每收到一个输入 tuple,CombinerAggregator就会执行init()方法(该方法返回一个初始值),并且用combine() 方法汇总这些值,直到剩下一个值为止(聚合值)。如果partition中没有tuple, CombinerAggregator会发送zero()的返回值。


当使用CombinerAggregator时,Trident首先对每个partition局部聚合,然后将所有这些 partition重新划分到一个partition中,完成全局聚合。相比而言,CombinerAggregator更高效, 推荐使用

/**
 * CombinerAggregator:一个CombinerAggregator仅输出一个tuple(该tuple也只有一个字段)。
 * 每收到一个输入 tuple,CombinerAggregator就会执行init()方法(该方法返回一个初始值),
 * 并且用combine() 方法汇总这些值,直到剩下一个值为止(聚合值)。如果partition中没有tuple, CombinerAggregator会发送zero()的返回值。
 */
private static class MyCombinerAggregate implements CombinerAggregator {
    @Override
    public Object init(TridentTuple tuple) {
        long amt = tuple.getIntegerByField("amt");
        String date = tuple.getStringByField("_date");
        // System.out.println(date + " : " + amt);
        return amt;
    }
    @Override
    public Object combine(Object val1, Object val2) {
        return (long)val1 + (long)val2;
    }
    @Override
    public Object zero() {
        return 0L;
    }
}
.aggregate(new Fields("_date","amt"),new MyCombinerAggregate(),new Fields("_amt"))

(3)Aggregator聚合器详解

Aggregator可以输出任意数量的tuple,且这些tuple的字段也可以有多个。执行过程中的任何时候都 可以输出tuple(三个方法的参数中都有collector)。Aggregator的执行方式如下:


1、处理每个batch之前调用一次init()方法,该方法的返回值是一个对象,代表aggregation的状态,并且会 传递给下面的aggregate()和complete()方法。


2、每个收到一个该batch中的输入tuple就会调用一次aggregate,该方法中可以更新状态(第一点中init() 方法的返回值)。


3、当该batch partition中的所有tuple都被aggregate()方法处理完之后调用complete方法。

/**
 * Aggregator:Aggregator可以输出任意数量的tuple,且这些tuple的字段也可以有多个。
 * 执行过程中的任何时候都 可以输出tuple(三个方法的参数中都有collector)。
 */
private static class MyAgg extends BaseAggregator<MyAgg.CountState>{
    // 分区数
    int parNum;
    // 分区号
    int parIndex;
    static class CountState{
        long count = 0;
        Object _batchId;
        long amtSum = 0;
        String date;
        CountState(Object batchId){
            this._batchId = batchId;
        }
    }
    @Override
    public void prepare(Map conf, TridentOperationContext context) {
        this.parNum = context.numPartitions();
        this.parIndex = context.getPartitionIndex();
    }
    @Override
    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState(batchId);
    }
    @Override
    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
        int amt = tuple.getIntegerByField("amt");
        String date = tuple.getStringByField("_date");
        state.date = date;
        state.amtSum+=amt;
    }
    @Override
    public void complete(CountState state, TridentCollector collector) {
        // System.out.println("batchId= " + state._batchId + " : amtSum = " + state.amtSum);
        System.out.println("date= " + state.date + "parNum>>>>>" + parNum + " batchId= " + state._batchId + " : sum=" + state.amtSum);
        collector.emit(new Values(state.amtSum));
    }
}
.aggregate(new Fields("_date","amt"),new MyAgg(),new Fields("_amt"));





相关文章
|
关系型数据库 MySQL 分布式数据库
PolarDB操作报错合集之在执行语句时遇到语法错误,是由什么导致的
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。
244 1
|
11月前
|
机器学习/深度学习 监控 搜索推荐
从零开始构建:使用Hologres打造个性化推荐系统的完整指南
【10月更文挑战第9天】随着互联网技术的发展,个性化推荐系统已经成为许多在线服务不可或缺的一部分。一个好的推荐系统可以显著提高用户体验,增加用户粘性,并最终提升业务的转化率。本指南将详细介绍如何使用阿里云的Hologres数据库来构建一个高效的个性化推荐系统。我们将涵盖从数据准备、模型训练到实时推荐的整个流程。
675 0
|
11月前
|
SQL 安全 关系型数据库
SQL自动化注ru-SQLmap入门操作(一)
SQL自动化注ru-SQLmap入门操作(一)
|
10月前
|
JavaScript 持续交付 Docker
解锁新技能:Docker容器化部署在微服务架构中的应用
【10月更文挑战第29天】在数字化转型中,微服务架构因灵活性和可扩展性成为企业首选。Docker容器化技术为微服务的部署和管理带来革命性变化。本文探讨Docker在微服务架构中的应用,包括隔离性、可移植性、扩展性、版本控制等方面,并提供代码示例。
259 1
|
数据采集 分布式计算 并行计算
Dask与Pandas:无缝迁移至分布式数据框架
【8月更文第29天】Pandas 是 Python 社区中最受欢迎的数据分析库之一,它提供了高效且易于使用的数据结构,如 DataFrame 和 Series,以及大量的数据分析功能。然而,随着数据集规模的增大,单机上的 Pandas 开始显现出性能瓶颈。这时,Dask 就成为了一个很好的解决方案,它能够利用多核 CPU 和多台机器进行分布式计算,从而有效地处理大规模数据集。
673 1
|
Kubernetes Cloud Native 云计算
探索K8S的绝佳选择:Killercoda与Play-with-K8s在线练习平台
探索K8S的绝佳选择:Killercoda与Play-with-K8s在线练习平台
2739 1
|
缓存 负载均衡 算法
【Nginx】静态资源部署、反向代理、负载均衡
【Nginx】静态资源部署、反向代理、负载均衡
610 0
No.5 STM32F429IGT6 标准库实战GPIO输出LED灯 点亮闪烁(STM32F429/F767/H743)
No.5 STM32F429IGT6 标准库实战GPIO输出LED灯 点亮闪烁(STM32F429/F767/H743)
|
Java Spring
Javaweb之SpringBootWeb案例之事务进阶的详细解析
Javaweb之SpringBootWeb案例之事务进阶的详细解析
114 0