一、Trident 操作类型及使用场景
Trident 有 5 类操作:
1.Partition-local operations: 对每个 partition 的局部操作, 不产生网络传输
2.Repartitioning operations: 对 stream (数据流)的重新划分(仅仅是划分, 但不改变内容), 产生网络传输
3.Aggregation operations (聚合操作): 作为 operation (操作)的一部分进行网络传输
4.Operations on grouped streams: 作用在分组流上的操作
5.Merges 和 joins 操作
二、Tuple 过滤Filter函数操作
each() 方法中操作batch中的每一个tuple内容,一般与Filter 或者Function函数配合使用。 Filters 收到一个输入 tuple , 并决定是否保留该 tuple
/** * Filter Operations */ private static class OperFilter extends BaseFilter{ @Override public boolean isKeep(TridentTuple tuple) { int amt = tuple.getIntegerByField("amt"); return amt >= 30; } }
Stream operStream = topology.newStream("spout1", spout).shuffle().parallelismHint(5) // "time","amt","city","product" -> Filter -> amt < 30 .each(new Fields("time","amt","city","product"),new OperFilter())
三、Function 函数操作
它比Filter多出一个输出字段,作用是每个tuple在经过这个Function函数处理后,输出字段都会被追加到tuple后 面。
/** * Function Operations:它比Filter多出一个输出字段,作用是每个tuple在经过这个Function函数处理后,输出字段都会被追加到tuple后面。 */ private static class OperFunction extends BaseFunction{ int numPar; @Override public void prepare(Map conf, TridentOperationContext context) { numPar = context.numPartitions(); super.prepare(conf, context); } @Override public void execute(TridentTuple tuple, TridentCollector collector) { String date = tuple.getStringByField("time"); int amt = tuple.getIntegerByField("amt"); String _date = date.substring(0, 10); System.out.println("原数据" + " [" + _date + " : " + amt + "]"); collector.emit(new Values(_date)); } }
Stream operStream = topology.newStream("spout1", spout).shuffle().parallelismHint(5) // "time","amt","city","product" -> Function -> "time","amt","city","product","_date" .each(new Fields("time","amt","city","product"),new OperFunction(),new Fields("_date"))
四、Tuple 投影操作
经 Stream 中的 project 方法处理后的 tuple 仅保持指定字段(相当于过滤字段)
如果你有一个包含字段 [“a”, “b”, “c”, “d”] 的 stream , 执行下面代码:
mystream.project(new Fields("b" "d"))
则 output stream 将仅包含 [“b”, “d”] 字段
// "time","amt","city","product","_date" -> project -> "_date","amt" .project(new Fields("_date","amt"))
五、Partition重分区操作
Partition概念
partition中文意思是分区,有人将partition理解为Storm里面的task,即并发的基本执行单位。我理解 应该是像数据库里面的分区,是将一个batch的数据分区,分成多个partition,或者可以理解为多个子 batch,然后多个partition可以并发处理。这里关键的区别是:partition是数据,不是执行的代码。你把数 据(tuple)分区以后,如果你没有多个task(并发度)来处理这些分区后的数据,那分区也是没有作用的。 所以这里的关系是这样的:先有batch,因为Trident内部是基于batch来实现的;然后有partition;分区后 再分配并发度,然后才能进行并发处理。并发度的分配是利用parallelismHint来实现的。
重分区操作有如下几种:
shuffle: 通过随机分配算法来均衡tuple到各个分区
broadcast: 每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery
partitionBy: 根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确 保相同字段列表的数据被划分到同一个分区
global: 所有的tuple都被发送到一个分区,这个分区用来处理整个Stream
batchGlobal: 一个Batch中的所有tuple都被发送到同一个分区,不同的Batch会去往不同的分区
Partition: 通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping
// "_date","amt" -> partitionBy 按照 "_date" 分区 .partitionBy(new Fields("_date")) // 设置并行度为3 .each(new Fields("_date","amt"),new PartitionFunction(),new Fields()).parallelismHint(3);
六、Aggregate聚合操作
Trident 中有 aggregate() 和 persistentAggregate() 方法对流进行聚合操作.
aggregate() 在每个 batch 上 独立的执行,对每个 batch 中的所有 tuple 进行聚合
persistemAggregate() 对所有 batch 中的所有 tuple 进行聚合, 并将结果存入 state 源中.
aggregate() 对 Stream 做全局聚合, 当使用 ReduceAggregator 或者 Aggregator 聚合器时, 流先被重新划分成 一个大分区(仅有一个 partition ), 然后对这个 partition 做聚合操作;另外, 当使用 CombinerAggregator 时, Trident 首先对每个 partition 局部聚合, 然后将所有这些 partition 重新划分到一个 partition 中, 完成全局聚合.相 比而言, CombinerAggregator 更高效, 推荐使用.
(1)ReducerAggregator聚合器详解
ReducerAggregator使用init()方法产生一个初始值,对于每个输入tuple,依次迭代这个初始值,最终产生一个单值输出tuple。
aggregate()对流做全局聚合,当使用ReduceAggregator或者Aggregator聚合器时,流先被重 新划分成一个大分区(仅有一个partition),然后对这个partition做聚合操作;
/** * ReducerAggregator:ReducerAggregator使用init()方法产生一个初始值,对于每个输入tuple,依次迭代这个初始 值,最终产生一个单值输出tuple。 */ private static class MyReducerAggregate implements ReducerAggregator { @Override public Object init() { return 0L; } @Override public Object reduce(Object curr, TridentTuple tuple) { int amt = tuple.getIntegerByField("amt"); System.out.println(curr + ":" + amt); return (long)curr + amt; } }
// aggregate() 在每个 batch 上 独立的执行,对每个 batch 中的所有 tuple 进行聚合 .aggregate(new Fields("amt"),new MyReducerAggregate(),new Fields("amt"))
(2)CombinerAggregator聚合器详解
一个CombinerAggregator仅输出一个tuple(该tuple也只有一个字段)。每收到一个输入 tuple,CombinerAggregator就会执行init()方法(该方法返回一个初始值),并且用combine() 方法汇总这些值,直到剩下一个值为止(聚合值)。如果partition中没有tuple, CombinerAggregator会发送zero()的返回值。
当使用CombinerAggregator时,Trident首先对每个partition局部聚合,然后将所有这些 partition重新划分到一个partition中,完成全局聚合。相比而言,CombinerAggregator更高效, 推荐使用
/** * CombinerAggregator:一个CombinerAggregator仅输出一个tuple(该tuple也只有一个字段)。 * 每收到一个输入 tuple,CombinerAggregator就会执行init()方法(该方法返回一个初始值), * 并且用combine() 方法汇总这些值,直到剩下一个值为止(聚合值)。如果partition中没有tuple, CombinerAggregator会发送zero()的返回值。 */ private static class MyCombinerAggregate implements CombinerAggregator { @Override public Object init(TridentTuple tuple) { long amt = tuple.getIntegerByField("amt"); String date = tuple.getStringByField("_date"); // System.out.println(date + " : " + amt); return amt; } @Override public Object combine(Object val1, Object val2) { return (long)val1 + (long)val2; } @Override public Object zero() { return 0L; } }
.aggregate(new Fields("_date","amt"),new MyCombinerAggregate(),new Fields("_amt"))
(3)Aggregator聚合器详解
Aggregator可以输出任意数量的tuple,且这些tuple的字段也可以有多个。执行过程中的任何时候都 可以输出tuple(三个方法的参数中都有collector)。Aggregator的执行方式如下:
1、处理每个batch之前调用一次init()方法,该方法的返回值是一个对象,代表aggregation的状态,并且会 传递给下面的aggregate()和complete()方法。
2、每个收到一个该batch中的输入tuple就会调用一次aggregate,该方法中可以更新状态(第一点中init() 方法的返回值)。
3、当该batch partition中的所有tuple都被aggregate()方法处理完之后调用complete方法。
/** * Aggregator:Aggregator可以输出任意数量的tuple,且这些tuple的字段也可以有多个。 * 执行过程中的任何时候都 可以输出tuple(三个方法的参数中都有collector)。 */ private static class MyAgg extends BaseAggregator<MyAgg.CountState>{ // 分区数 int parNum; // 分区号 int parIndex; static class CountState{ long count = 0; Object _batchId; long amtSum = 0; String date; CountState(Object batchId){ this._batchId = batchId; } } @Override public void prepare(Map conf, TridentOperationContext context) { this.parNum = context.numPartitions(); this.parIndex = context.getPartitionIndex(); } @Override public CountState init(Object batchId, TridentCollector collector) { return new CountState(batchId); } @Override public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) { int amt = tuple.getIntegerByField("amt"); String date = tuple.getStringByField("_date"); state.date = date; state.amtSum+=amt; } @Override public void complete(CountState state, TridentCollector collector) { // System.out.println("batchId= " + state._batchId + " : amtSum = " + state.amtSum); System.out.println("date= " + state.date + "parNum>>>>>" + parNum + " batchId= " + state._batchId + " : sum=" + state.amtSum); collector.emit(new Values(state.amtSum)); } }
.aggregate(new Fields("_date","amt"),new MyAgg(),new Fields("_amt"));