七、persistentAggregate持久化聚合操作
persistemAggregate() 对所有 batch 中的所有 tuple 进行聚合, 并将结果存入 state 源中
.persistentAggregate(new MemoryMapState.Factory(),new Fields("_date","amt"),new MyCombinerAggregate(),new Fields("_amt"));
八、Trident Topology并行度设置详解
如果想单独设置Spout,要在Spout之后,Bolt之前增加一个ParallelismHint,并且还要增加 一个分区操。设置了Spout的并发度,必须调用分区操作,不然是没有效果的,因为Trident是不会自动进行分区操作的。
九、GroupBy 分组操作
groupBy 操作先对流中的指定字段做 partitionBy 操作, 让指定字段相同的 tuple 能被发送到同 一个 partition 里.然后在每个 partition 里根据指定字段值对该分区里的 tuple 进行分组.下面演示了 groupBy 操作的过程:
// 相当于select date,sum(amt) from table group by date; .groupBy(new Fields("_date"))
十、调用链操作
执行 multiple aggregators (多个聚合)操作, 这个可以使用 chaining (链式)操作完成:
这段代码将会在每个分区中执行 Count 和 Sum aggregators (聚合器), 并输出一个tuple 字段 [“count”, “sum”]。
相当于:
select date,count(1),sum(age) from batch where 1=1 group by date
十一、合并与关联操作
Trident 将把新的 merged stream 的 output fields 命名为第一个 stream 的 output fields (输出字段).
另一个合并流的方法是join。类似SQL的join都是对固定输入的。而流的输入是不固定的,所以不能按照sql的方 法做join。
Trident中的join只会在spout发出的每个批次间进行。
如一个流包含字段[“key”,“val1”,“val2”],
另一个流包含字段[“x”,“val1”]:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key","a","b","c"));
Stream1的“key”和stream2的“x”关联,Trident要求所有的字段要改名字。
首先是join字段。例子中stream1中的“key”对应stream2中的“x”。
接下来,会把非join字段依次列出来,排列顺序按照传给join的顺序。例子中“a”,“b”对应stream1中 的“val1”和“val2”,“c”对应stream2中的“val1”。
十二、Tuple数据TopN详解
取Top N
用法:
stream.applyAssembly(new FirstN(TOP_N, "sortField", true));
每个batch中,取TOP_N个tuple数据,以”sortField”为排序字段, True代表ASC ;false代表DESC