Storm中Trident Operations详解(二)

简介: 笔记

七、persistentAggregate持久化聚合操作


persistemAggregate() 对所有 batch 中的所有 tuple 进行聚合, 并将结果存入 state 源中

.persistentAggregate(new MemoryMapState.Factory(),new Fields("_date","amt"),new MyCombinerAggregate(),new Fields("_amt"));


八、Trident Topology并行度设置详解


如果想单独设置Spout,要在Spout之后,Bolt之前增加一个ParallelismHint,并且还要增加 一个分区操。设置了Spout的并发度,必须调用分区操作,不然是没有效果的,因为Trident是不会自动进行分区操作的。


九、GroupBy 分组操作


groupBy 操作先对流中的指定字段做 partitionBy 操作, 让指定字段相同的 tuple 能被发送到同 一个 partition 里.然后在每个 partition 里根据指定字段值对该分区里的 tuple 进行分组.下面演示了 groupBy 操作的过程:

23.png

// 相当于select date,sum(amt) from table group by date;
.groupBy(new Fields("_date"))


十、调用链操作


执行 multiple aggregators (多个聚合)操作, 这个可以使用 chaining (链式)操作完成:

这段代码将会在每个分区中执行 Count 和 Sum aggregators (聚合器), 并输出一个tuple 字段 [“count”, “sum”]。


相当于:

select date,count(1),sum(age) from batch where 1=1 group by date

24.png


十一、合并与关联操作


Trident 将把新的 merged stream 的 output fields 命名为第一个 stream 的 output fields (输出字段).

25.png

另一个合并流的方法是join。类似SQL的join都是对固定输入的。而流的输入是不固定的,所以不能按照sql的方 法做join。

Trident中的join只会在spout发出的每个批次间进行。

如一个流包含字段[“key”,“val1”,“val2”],

另一个流包含字段[“x”,“val1”]:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key","a","b","c"));

Stream1的“key”和stream2的“x”关联,Trident要求所有的字段要改名字。


首先是join字段。例子中stream1中的“key”对应stream2中的“x”。


接下来,会把非join字段依次列出来,排列顺序按照传给join的顺序。例子中“a”,“b”对应stream1中 的“val1”和“val2”,“c”对应stream2中的“val1”。26.png


十二、Tuple数据TopN详解


取Top N

用法:

 stream.applyAssembly(new FirstN(TOP_N, "sortField", true));

每个batch中,取TOP_N个tuple数据,以”sortField”为排序字段, True代表ASC ;false代表DESC

27.png

28.png



相关文章
|
消息中间件 存储 并行计算
|
存储 自然语言处理 分布式计算
|
存储 SQL 运维
storm笔记:Trident应用
Trident是基于Storm的实时计算模型的高级抽象。它可以实现高吞吐(每秒数百万条消息)的有状态流处理和低延迟分布式查询。
201 0
storm笔记:Trident应用
|
存储 消息中间件 缓存
storm笔记:Trident状态
在storm笔记:Trident应用中说了下Trident的使用,这里说下Trident几种状态的变化及其对应API的使用。
100 0
storm笔记:Trident状态