Storm starter - RollingTopWords

简介:

计算top N words的topology, 用于比如trending topics or trending images on Twitter.

实现了滑动窗口计数和TopN排序, 比较有意思, 具体分析一下代码 

Topology

这是一个稍微复杂些的topology, 主要体现在使用不同的grouping方式, fieldsGrouping和globalGrouping

 String spoutId = "wordGenerator";
 String counterId = "counter";
 String intermediateRankerId = "intermediateRanker";
 String totalRankerId = "finalRanker";
 builder.setSpout(spoutId, new TestWordSpout(), 5);
 builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
 builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj"));
 builder.setBolt(totalRankerId, new TotalRankingsBolt TOP_N)).globalGrouping(intermediateRankerId);

 

RollingCountBolt

首先使用RollingCountBolt, 并且此处是按照word进行fieldsGrouping的, 所以相同的word会被发送到同一个bolt, 这个field id是在上一级的declareOutputFields时指定的

RollingCountBolt, 用于基于时间窗口的counting, 所以需要两个参数, the length of the sliding window in seconds和the emit frequency in seconds

new RollingCountBolt(9, 3), 意味着output the latest 9 minutes sliding window every 3 minutes

1. 创建SlidingWindowCounter(SlidingWindowCounter和SlotBasedCounter参考下面) 
counter = new SlidingWindowCounter(this.windowLengthInSeconds / this.windowUpdateFrequencyInSeconds); 
如何定义slot数? 对于9 min的时间窗口, 每3 min emit一次数据, 那么就需要9/3=3个slot 
那么在3 min以内, 不停的调用countObjAndAck(tuple)来递增所有对象该slot上的计数 
每3分钟会触发调用emitCurrentWindowCounts, 用于滑动窗口(通过getCountsThenAdvanceWindow), 并emit (Map<obj, 窗口内的计数和>, 实际使用时间) 
因为实际emit触发时间, 不可能刚好是3 min, 会有误差, 所以需要给出实际使用时间

 

2. TupleHelpers.isTickTuple(tuple), TickTuple

前面没有说的一点是, 如何触发emit? 这是比较值得说明的一点, 因为其使用Storm的TickTuple特性. 
这个功能挺有用, 比如数据库批量存储, 或者这里的时间窗口的统计等应用 
"__system" component会定时往task发送 "__tick" stream的tuple 
发送频率由TOPOLOGY_TICK_TUPLE_FREQ_SECS来配置, 可以在default.ymal里面配置 
也可以在代码里面通过getComponentConfiguration()来进行配置,

public Map<String, Object> getComponentConfiguration() {
     Map<String, Object> conf = new HashMap<String, Object>();
     conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
     return conf;

配置完成后, storm就会定期的往task发送ticktuple 
只需要通过isTickTuple来判断是否为tickTuple, 就可以完成定时触发的功能

public static boolean isTickTuple(Tuple tuple) {
    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) \\ SYSTEM_COMPONENT_ID == "__system"
        && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); \\ SYSTEM_TICK_STREAM_ID == "__tick"
}

最终, 这个blot的输出为, collector.emit(new Values(obj, count, actualWindowLengthInSeconds)); 
obj, count(窗口内的计数和), 实际使用时间

 

SlotBasedCounter

基于slot的counter, 模板类, 可以指定被计数对象的类型T 
这个类其实很简单, 实现计数对象和一组slot(用long数组实现)的map, 并可以对任意slot做increment或reset等操作

关键结构为Map<T, long[]> objToCounts, 为每个obj都对应于一个大小为numSlots的long数组, 所以对每个obj可以计numSlots个数 
incrementCount, 递增某个obj的某个slot, 如果是第一次需要创建counts数组 
getCount, getCounts, 获取某obj的某slot值, 或某obj的所有slot值的和 
wipeSlot, resetSlotCountToZero, reset所有对象的某solt为0, reset某obj的某slot为0 
wipeZeros, 删除所有total count为0的obj, 以释放空间

public final class SlotBasedCounter<T> implements Serializable {

    private static final long serialVersionUID = 4858185737378394432L;

    private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
    private final int numSlots;

    public SlotBasedCounter(int numSlots) {
        if (numSlots <= 0) {
            throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots
                + ")");
        }
        this.numSlots = numSlots;
    }

    public void incrementCount(T obj, int slot) {
        long[] counts = objToCounts.get(obj);
        if (counts == null) {
            counts = new long[this.numSlots];
            objToCounts.put(obj, counts);
        }
        counts[slot]++;
    }

    public long getCount(T obj, int slot) {
        long[] counts = objToCounts.get(obj);
        if (counts == null) {
            return 0;
        }
        else {
            return counts[slot];
        }
    }

    public Map<T, Long> getCounts() {
        Map<T, Long> result = new HashMap<T, Long>();
        for (T obj : objToCounts.keySet()) {
            result.put(obj, computeTotalCount(obj));
        }
        return result;
    }

    private long computeTotalCount(T obj) {
        long[] curr = objToCounts.get(obj);
        long total = 0;
        for (long l : curr) {
            total += l;
        }
        return total;
    }

    /**
     * Reset the slot count of any tracked objects to zero for the given slot.
     * 
     * @param slot
     */
    public void wipeSlot(int slot) {
        for (T obj : objToCounts.keySet()) {
            resetSlotCountToZero(obj, slot);
        }
    }

    private void resetSlotCountToZero(T obj, int slot) {
        long[] counts = objToCounts.get(obj);
        counts[slot] = 0;
    }

    private boolean shouldBeRemovedFromCounter(T obj) {
        return computeTotalCount(obj) == 0;
    }

    /**
     * Remove any object from the counter whose total count is zero (to free up memory).
     */
    public void wipeZeros() {
        Set<T> objToBeRemoved = new HashSet<T>();
        for (T obj : objToCounts.keySet()) {
            if (shouldBeRemovedFromCounter(obj)) {
                objToBeRemoved.add(obj);
            }
        }
        for (T obj : objToBeRemoved) {
            objToCounts.remove(obj);
        }
    }
}

SlidingWindowCounter

SlidingWindowCounter只是对SlotBasedCounter做了进一步的封装, 通过headSlot和tailSlot提供sliding window的概念

incrementCount, 只能对headSlot进行increment, 其他slot作为窗口中的历史数据

核心的操作为, getCountsThenAdvanceWindow 
1. 取出Map<T, Long> counts, 对象和窗口内所有slots求和值的map 
2. 调用wipeZeros, 删除已经不被使用的obj, 释放空间 
3. 最重要的一步, 清除tailSlot, 并advanceHead, 以实现滑动窗口 
    advanceHead的实现, 如何在数组实现循环的滑动窗口

public final class SlidingWindowCounter<T> implements Serializable {

    private static final long serialVersionUID = -2645063988768785810L;

    private SlotBasedCounter<T> objCounter;
    private int headSlot;
    private int tailSlot;
    private int windowLengthInSlots;

    public SlidingWindowCounter(int windowLengthInSlots) {
        if (windowLengthInSlots < 2) {
            throw new IllegalArgumentException("Window length in slots must be at least two (you requested "
                + windowLengthInSlots + ")");
        }
        this.windowLengthInSlots = windowLengthInSlots;
        this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);

        this.headSlot = 0;
        this.tailSlot = slotAfter(headSlot);
    }

    public void incrementCount(T obj) {
        objCounter.incrementCount(obj, headSlot);
    }

    /**
     * Return the current (total) counts of all tracked objects, then advance the window.
     * 
     * Whenever this method is called, we consider the counts of the current sliding window to be available to and
     * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
     * objects within the next "chunk" of the sliding window.
     * 
     * @return
     */
    public Map<T, Long> getCountsThenAdvanceWindow() {
        Map<T, Long> counts = objCounter.getCounts();
        objCounter.wipeZeros();
        objCounter.wipeSlot(tailSlot);
        advanceHead();
        return counts;
    }

    private void advanceHead() {
        headSlot = tailSlot;
        tailSlot = slotAfter(tailSlot);
    }

    private int slotAfter(int slot) {
        return (slot + 1) % windowLengthInSlots;
    }
}
 

IntermediateRankingsBolt

这个bolt作用就是对于中间结果的排序, 为什么要增加这步, 应为数据量比较大, 如果直接全放到一个节点上排序, 会负载太重 
所以先通过IntermediateRankingsBolt, 过滤掉一些 
这里仍然使用, 对于obj进行fieldsGrouping, 保证对于同一个obj, 不同时间段emit的统计数据会被发送到同一个task

IntermediateRankingsBolt继承自AbstractRankerBolt(参考下面) 
并实现了updateRankingsWithTuple,

void updateRankingsWithTuple(Tuple tuple) {
    Rankable rankable = RankableObjectWithFields.from(tuple);
    super.getRankings().updateWith(rankable);
}
逻辑很简单, 将Tuple转化Rankable, 并更新Rankings列表
参考AbstractRankerBolt, 该bolt会定时将Ranking列表emit出去

Rankable

Rankable除了继承Comparable接口, 还增加getObject()和getCount()接口

public interface Rankable extends Comparable<Rankable> {
    Object getObject();
    long getCount();
}

RankableObjectWithFields

RankableObjectWithFields实现Rankable接口 
1. 提供将Tuple转化为RankableObject 
Tuple由若干field组成, 第一个field作为obj, 第二个field作为count, 其余的都放到List<Object> otherFields中

2. 实现Rankable定义的getObject()和getCount()接口

3. 实现Comparable接口, 包含compareTo, equals

public class RankableObjectWithFields implements Rankable
public static RankableObjectWithFields from(Tuple tuple) {
    List<Object> otherFields = Lists.newArrayList(tuple.getValues());
    Object obj = otherFields.remove(0);
    Long count = (Long) otherFields.remove(0);
    return new RankableObjectWithFields(obj, count, otherFields.toArray());
}

Rankings

Rankings维护需要排序的List, 并提供对List相应的操作

核心的数据结构如下, 用来存储rankable对象的list 
List<Rankable> rankedItems = Lists.newArrayList();

提供一些简单的操作, 比如设置maxsize(list size), getRankings(返回rankedItems, 排序列表)

核心的操作是,

public void updateWith(Rankable r) {
    addOrReplace(r);
    rerank();
    shrinkRankingsIfNeeded();
}
上一级的blot会定期的发送某个时间窗口的(obj, count), 所以obj之间的排序是在不断变化的 
1. 替换已有的, 或新增rankable对象(包含obj, count) 
2. 从新排序(Collections.sort) 
3. 由于只需要topN, 所以大于maxsize的需要删除

AbstractRankerBolt

首先以TopN为参数, 创建Rankings对象

private final Rankings rankings;
public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
    count = topN;
    this.emitFrequencyInSeconds = emitFrequencyInSeconds;
    rankings = new Rankings(count);
}

在execute中, 也是定时触发emit, 同样是通过emitFrequencyInSeconds来配置tickTuple 
一般情况, 只是使用updateRankingsWithTuple不断更新Rankings 
这里updateRankingsWithTuple是abstract函数, 需要子类重写具体的update逻辑

public final void execute(Tuple tuple, BasicOutputCollector collector) {
    if (TupleHelpers.isTickTuple(tuple)) {
        emitRankings(collector);
    }
    else {
        updateRankingsWithTuple(tuple);
    }
}
最终将整个rankings列表emit出去 
private void emitRankings(BasicOutputCollector collector) {
    collector.emit(new Values(rankings));
    getLogger().info("Rankings: " + rankings);
}

TotalRankingsBolt

该bolt会使用globalGrouping, 意味着所有的数据都会被发送到同一个task进行最终的排序. 
TotalRankingsBolt同样继承自AbstractRankerBolt

void updateRankingsWithTuple(Tuple tuple) {
    Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
    super.getRankings().updateWith(rankingsToBeMerged);
}
唯一的不同是, 这里updateWith的参数是个rankable列表, 在Rankings里面的实现一样, 只是多了遍历 

最终可以得到, 全局的TopN的Rankings列表


本文章摘自博客园,原文发布日期:2013-05-22

目录
相关文章
|
存储 Java 大数据
Spring Boot 2.x :通过 spring-boot-starter-hbase 集成 HBase
HBase 是在 Hadoop 分布式文件系统(简称:HDFS)之上的分布式面向列的数据库。而且是 2007 最初原型,历史悠久。 那追根究底,Hadoop 是什么?Hadoop是一个分布式环境存储并处理大数据。本文介绍通过 spring-boot-starter-hbase 集成 HBase。
13015 0
|
4月前
|
消息中间件 Java Kafka
spring boot 整合kafka
spring boot 整合kafka
55 8
|
5月前
|
存储 Java 分布式数据库
Spring Boot 优雅实现hbase功能
【6月更文挑战第24天】要在 Spring Boot 项目中实现 HBase 和 Memcached 的功能,首先需要理解各自的原理和作用,然后通过实际操作将其集成到 Spring Boot 项目中。
141 6
|
4月前
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka Streams的集成
Spring Boot与Apache Kafka Streams的集成
|
4月前
|
消息中间件 Java Kafka
Spring Boot与Kafka的集成应用
Spring Boot与Kafka的集成应用
|
5月前
|
消息中间件 Java Kafka
教程:Spring Boot集成Kafka Streams流处理框架
教程:Spring Boot集成Kafka Streams流处理框架
|
6月前
|
Java 调度 流计算
在使用Spring Boot启动Flink处理任务时
在使用Spring Boot启动Flink处理任务时【1月更文挑战第22天】【1月更文挑战第108篇】
229 1
|
6月前
|
消息中间件 Java Kafka
spring boot 集成kafka
spring boot 集成kafka
97 0
spring boot 集成kafka
|
消息中间件 XML Java
Kafka与Spring的整合使用
Kafka与Spring的整合使用
158 0
|
消息中间件 存储 Java
「Spring和Kafka」Kafka整合Spring 深入挖掘第2部分:Kafka和Spring Cloud Stream
「Spring和Kafka」Kafka整合Spring 深入挖掘第2部分:Kafka和Spring Cloud Stream