Storm中Trident Operations详解(一)

简介: 笔记

一、Trident 操作类型及使用场景


Trident 有 5 类操作:

1.Partition-local operations: 对每个 partition 的局部操作, 不产生网络传输


2.Repartitioning operations: 对 stream (数据流)的重新划分(仅仅是划分, 但不改变内容), 产生网络传输


3.Aggregation operations (聚合操作): 作为 operation (操作)的一部分进行网络传输


4.Operations on grouped streams: 作用在分组流上的操作


5.Merges 和 joins 操作


20.png


二、Tuple 过滤Filter函数操作


each() 方法中操作batch中的每一个tuple内容,一般与Filter 或者Function函数配合使用。 Filters 收到一个输入 tuple , 并决定是否保留该 tuple

/**
 * Filter Operations
 */
private static class OperFilter extends BaseFilter{
    @Override
    public boolean isKeep(TridentTuple tuple) {
        int amt = tuple.getIntegerByField("amt");
        return amt >= 30;
    }
}
Stream operStream = topology.newStream("spout1", spout).shuffle().parallelismHint(5)
        // "time","amt","city","product"    -> Filter ->   amt < 30
        .each(new Fields("time","amt","city","product"),new OperFilter())


三、Function 函数操作


它比Filter多出一个输出字段,作用是每个tuple在经过这个Function函数处理后,输出字段都会被追加到tuple后 面。

/**
 * Function Operations:它比Filter多出一个输出字段,作用是每个tuple在经过这个Function函数处理后,输出字段都会被追加到tuple后面。
 */
private static class OperFunction extends BaseFunction{
    int numPar;
    @Override
    public void prepare(Map conf, TridentOperationContext context) {
        numPar = context.numPartitions();
        super.prepare(conf, context);
    }
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        String date = tuple.getStringByField("time");
        int amt = tuple.getIntegerByField("amt");
        String _date = date.substring(0, 10);
        System.out.println("原数据" + "   [" + _date + " : " + amt + "]");
        collector.emit(new Values(_date));
    }
}
Stream operStream = topology.newStream("spout1", spout).shuffle().parallelismHint(5)
                // "time","amt","city","product"    -> Function ->   "time","amt","city","product","_date"
                .each(new Fields("time","amt","city","product"),new OperFunction(),new Fields("_date"))


四、Tuple 投影操作


经 Stream 中的 project 方法处理后的 tuple 仅保持指定字段(相当于过滤字段)

如果你有一个包含字段 [“a”, “b”, “c”, “d”] 的 stream , 执行下面代码:

mystream.project(new Fields("b" "d"))

则 output stream 将仅包含 [“b”, “d”] 字段

// "time","amt","city","product","_date" -> project -> "_date","amt"
.project(new Fields("_date","amt"))


五、Partition重分区操作


21.png

Partition概念


partition中文意思是分区,有人将partition理解为Storm里面的task,即并发的基本执行单位。我理解 应该是像数据库里面的分区,是将一个batch的数据分区,分成多个partition,或者可以理解为多个子 batch,然后多个partition可以并发处理。这里关键的区别是:partition是数据,不是执行的代码。你把数 据(tuple)分区以后,如果你没有多个task(并发度)来处理这些分区后的数据,那分区也是没有作用的。 所以这里的关系是这样的:先有batch,因为Trident内部是基于batch来实现的;然后有partition;分区后 再分配并发度,然后才能进行并发处理。并发度的分配是利用parallelismHint来实现的。


重分区操作有如下几种:

shuffle: 通过随机分配算法来均衡tuple到各个分区

broadcast: 每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery

partitionBy: 根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确 保相同字段列表的数据被划分到同一个分区

global: 所有的tuple都被发送到一个分区,这个分区用来处理整个Stream

batchGlobal: 一个Batch中的所有tuple都被发送到同一个分区,不同的Batch会去往不同的分区

Partition: 通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping

// "_date","amt" -> partitionBy 按照 "_date" 分区
.partitionBy(new Fields("_date"))
// 设置并行度为3
.each(new Fields("_date","amt"),new PartitionFunction(),new Fields()).parallelismHint(3);


六、Aggregate聚合操作


Trident 中有 aggregate() 和 persistentAggregate() 方法对流进行聚合操作.


aggregate() 在每个 batch 上 独立的执行,对每个 batch 中的所有 tuple 进行聚合

persistemAggregate() 对所有 batch 中的所有 tuple 进行聚合, 并将结果存入 state 源中.

aggregate() 对 Stream 做全局聚合, 当使用 ReduceAggregator 或者 Aggregator 聚合器时, 流先被重新划分成 一个大分区(仅有一个 partition ), 然后对这个 partition 做聚合操作;另外, 当使用 CombinerAggregator 时, Trident 首先对每个 partition 局部聚合, 然后将所有这些 partition 重新划分到一个 partition 中, 完成全局聚合.相 比而言, CombinerAggregator 更高效, 推荐使用.

22.png

(1)ReducerAggregator聚合器详解

ReducerAggregator使用init()方法产生一个初始值,对于每个输入tuple,依次迭代这个初始值,最终产生一个单值输出tuple。


aggregate()对流做全局聚合,当使用ReduceAggregator或者Aggregator聚合器时,流先被重 新划分成一个大分区(仅有一个partition),然后对这个partition做聚合操作;

/**
 * ReducerAggregator:ReducerAggregator使用init()方法产生一个初始值,对于每个输入tuple,依次迭代这个初始 值,最终产生一个单值输出tuple。
 */
private static class MyReducerAggregate implements ReducerAggregator {
    @Override
    public Object init() {
        return 0L;
    }
    @Override
    public Object reduce(Object curr, TridentTuple tuple) {
        int amt = tuple.getIntegerByField("amt");
        System.out.println(curr + ":" + amt);
        return (long)curr + amt;
    }
}
// aggregate() 在每个 batch 上 独立的执行,对每个 batch 中的所有 tuple 进行聚合
.aggregate(new Fields("amt"),new MyReducerAggregate(),new Fields("amt"))

(2)CombinerAggregator聚合器详解

一个CombinerAggregator仅输出一个tuple(该tuple也只有一个字段)。每收到一个输入 tuple,CombinerAggregator就会执行init()方法(该方法返回一个初始值),并且用combine() 方法汇总这些值,直到剩下一个值为止(聚合值)。如果partition中没有tuple, CombinerAggregator会发送zero()的返回值。


当使用CombinerAggregator时,Trident首先对每个partition局部聚合,然后将所有这些 partition重新划分到一个partition中,完成全局聚合。相比而言,CombinerAggregator更高效, 推荐使用

/**
 * CombinerAggregator:一个CombinerAggregator仅输出一个tuple(该tuple也只有一个字段)。
 * 每收到一个输入 tuple,CombinerAggregator就会执行init()方法(该方法返回一个初始值),
 * 并且用combine() 方法汇总这些值,直到剩下一个值为止(聚合值)。如果partition中没有tuple, CombinerAggregator会发送zero()的返回值。
 */
private static class MyCombinerAggregate implements CombinerAggregator {
    @Override
    public Object init(TridentTuple tuple) {
        long amt = tuple.getIntegerByField("amt");
        String date = tuple.getStringByField("_date");
        // System.out.println(date + " : " + amt);
        return amt;
    }
    @Override
    public Object combine(Object val1, Object val2) {
        return (long)val1 + (long)val2;
    }
    @Override
    public Object zero() {
        return 0L;
    }
}
.aggregate(new Fields("_date","amt"),new MyCombinerAggregate(),new Fields("_amt"))

(3)Aggregator聚合器详解

Aggregator可以输出任意数量的tuple,且这些tuple的字段也可以有多个。执行过程中的任何时候都 可以输出tuple(三个方法的参数中都有collector)。Aggregator的执行方式如下:


1、处理每个batch之前调用一次init()方法,该方法的返回值是一个对象,代表aggregation的状态,并且会 传递给下面的aggregate()和complete()方法。


2、每个收到一个该batch中的输入tuple就会调用一次aggregate,该方法中可以更新状态(第一点中init() 方法的返回值)。


3、当该batch partition中的所有tuple都被aggregate()方法处理完之后调用complete方法。

/**
 * Aggregator:Aggregator可以输出任意数量的tuple,且这些tuple的字段也可以有多个。
 * 执行过程中的任何时候都 可以输出tuple(三个方法的参数中都有collector)。
 */
private static class MyAgg extends BaseAggregator<MyAgg.CountState>{
    // 分区数
    int parNum;
    // 分区号
    int parIndex;
    static class CountState{
        long count = 0;
        Object _batchId;
        long amtSum = 0;
        String date;
        CountState(Object batchId){
            this._batchId = batchId;
        }
    }
    @Override
    public void prepare(Map conf, TridentOperationContext context) {
        this.parNum = context.numPartitions();
        this.parIndex = context.getPartitionIndex();
    }
    @Override
    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState(batchId);
    }
    @Override
    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
        int amt = tuple.getIntegerByField("amt");
        String date = tuple.getStringByField("_date");
        state.date = date;
        state.amtSum+=amt;
    }
    @Override
    public void complete(CountState state, TridentCollector collector) {
        // System.out.println("batchId= " + state._batchId + " : amtSum = " + state.amtSum);
        System.out.println("date= " + state.date + "parNum>>>>>" + parNum + " batchId= " + state._batchId + " : sum=" + state.amtSum);
        collector.emit(new Values(state.amtSum));
    }
}
.aggregate(new Fields("_date","amt"),new MyAgg(),new Fields("_amt"));





相关文章
|
Java 数据处理 API
Flink Runtime Architecture(一)|学习笔记
快速学习 Flink Runtime Architecture
127 0
Flink Runtime Architecture(一)|学习笔记
|
消息中间件 存储 并行计算
|
存储 自然语言处理 分布式计算
|
存储 SQL 运维
storm笔记:Trident应用
Trident是基于Storm的实时计算模型的高级抽象。它可以实现高吞吐(每秒数百万条消息)的有状态流处理和低延迟分布式查询。
195 0
storm笔记:Trident应用
|
存储 消息中间件 缓存
storm笔记:Trident状态
在storm笔记:Trident应用中说了下Trident的使用,这里说下Trident几种状态的变化及其对应API的使用。
96 0
storm笔记:Trident状态
|
存储 分布式计算 调度
MapReduce与批处理------《Designing Data-Intensive Applications》读书笔记14
之前的文章大量的内容在和大家探讨分布式存储,接下来的章节进入了分布式计算领域。坦白说,个人之前专业的重心侧重于存储,对许多计算的内容理解可能不是和确切,如果文章中的理解有所不妥,愿虚心赐教。
1361 0