storm 1.0版本滑动窗口的实现及原理

简介: 滑动窗口在监控和统计应用的场景比较广泛,比如每隔一段时间(10s)统计最近30s的请求量或者异常次数,根据请求或者异常次数采取相应措施。在storm1.0版本之前,没有提供关于滑动窗口的实现,需要开发者自己实现滑动窗口的功能(storm1.0以前实现滑动窗口的实现原理可以自行百度)。

滑动窗口在监控和统计应用的场景比较广泛,比如每隔一段时间(10s)统计最近30s的请求量或者异常次数,根据请求或者异常次数采取相应措施。在storm1.0版本之前,没有提供关于滑动窗口的实现,需要开发者自己实现滑动窗口的功能(storm1.0以前实现滑动窗口的实现原理可以自行百度)。

原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6481588.html

微信:intsmaze

这里主要演示在storm1.0以后如何通过继承storm1.0提供的类来快速开发出窗口滑动的功能。窗口可以从时间或数量上来划分,由如下两个因素决定:窗口的长度,可以是时间间隔或Tuple数量;滑动间隔(sliding Interval),可以是时间间隔或Tuple数量。比如:每两秒统计最近6秒的请求数量;每接收2个Tuple就统计最近接收的6个Tuple的平均值......。

storm1.0支持的时间和数量的排列组合有如下:

withWindow(Count windowLength, Count slidingInterval)

  每收到slidingInterval条数据统计最近的windowLength条数据。

withWindow(Count windowLength)

  每收到1条数据统计最近的windowLength条数据。

withWindow(Count windowLength, Duration slidingInterval)

  每过slidingInterval秒统计最近的windowLength条数据。

withWindow(Duration windowLength, Count slidingInterval)

  每收到slidingInterval条数据统计最近的windowLength秒的数据。

withWindow(Duration windowLength, Duration slidingInterval)

  每过slidingInterval秒统计最近的windowLength秒的数据。

public withWindow(Duration windowLength)

  每收到1条数据统计最近的windowLength秒的数据。

接下来,简单的演示如何使用storm1.0实现滑动窗口的功能,先编写spout类,RandomSentenceSpout负责发送一个整形数值,数值每次发送都会自动加一,且RandomSentenceSpout固定每隔两秒向bolt发送一次数据。RandomSentenceSpout和前面关于spout的讲解一样。

1.public class RandomSentenceSpout extends BaseRichSpout {
2.
3.    private static final long serialVersionUID = 5028304756439810609L;  
4.
5.    private SpoutOutputCollector collector;  
6.
7.    int intsmaze=0;
8.
9.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
10.        declarer.declare(new Fields("intsmaze"));
11.    }
12.
13.    public void open(Map conf, TopologyContext context, 
14.                          SpoutOutputCollector collector) {
15.        this.collector = collector;
16.    }
17.
18.    public void nextTuple() {
19.        System.out.println("发送数据:"+intsmaze);
20.        collector.emit(new Values(intsmaze++));
21.        try {
22.            Thread.sleep(2000);
23.//         Thread.sleep(1000);
24.        } catch (InterruptedException e) {
25.            e.printStackTrace();
26.        }
27.    }
}

滑动窗口的逻辑实现的重点是bolt类,这里我们编写SlidingWindowBolt类让它继承一个新的类名为BaseWindowedBolt来获得窗口计数的功能。BaseWindowedBolt和前面的BaseBaseBoltBaseWindowedBolt提供的方法名都一样,只是execute方法的参数类型为TupleWindow,TupleWindow参数里面装载了一个窗口长度类的tuple数据。通过对TupleWindow遍历,我们可以计算这一个窗口内tuple数的平均值或总和等指标。具体见代码12-16行,统计了一个窗口内的数值型数据的总和。

1.public class SlidingWindowBolt extends BaseWindowedBolt {
2.
3.    private OutputCollector collector;
4.
5.    @Override
6.    public void prepare(Map stormConf, TopologyContext context, 
7.            OutputCollector collector) {
8.        this.collector = collector;
9.    }
10.
11.    public void execute(TupleWindow inputWindow) {        
12.        int sum=0;
13.        System.out.print("一个窗口内的数据");
14.        for(Tuple tuple: inputWindow.get()) {
15.            int str=(Integer) tuple.getValueByField("intsmaze");
16.            System.out.print(" "+str);
17.            sum+=str;
18.        }
19.        System.out.println("======="+sum);
20. //        collector.emit(new Values(sum));
21.    }
22.
23.    @Override
24.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
25.//       declarer.declare(new Fields("count"));
26.    }
}

我们已经实现了窗口计数的逻辑代码,现在我们需要提供topology来指明各个组件的关系,以及指定SlidingWindowBolt的窗口的组合,这里我们演示了如何每两秒统计最近6秒的数值总和,如果注释掉10-13行代码,去掉5-8行的注释,这个topology就是告诉SlidingWindowBolt每接收到两条tuple就统计最近接收到的6条tuple的数值的总和。

1.public class WindowsTopology {
2.
3.    public static void main(String[] args) throws Exception {
4.       TopologyBuilder builder = new TopologyBuilder();
5.       builder.setSpout("spout1", new RandomSentenceSpout(), 1);
6.//       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
7.//       .withWindow(new Count(6), new Count(2)),1)
8.//       .shuffleGrouping("spout");
9.//滑窗 窗口长度:tuple数, 滑动间隔: tuple数 每收到2条数据统计当前6条数据的总和。  
10.     
11.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
12.       .withWindow(new Duration(6, TimeUnit.SECONDS), 
13.               new Duration(2, TimeUnit.SECONDS)),1)
14.       .shuffleGrouping("spout");//每两秒统计最近6秒的数据       
15.
16.       Config conf = new Config();
17.       conf.setNumWorkers(1);
18.       LocalCluster cluster = new LocalCluster();
19.       cluster.submitTopology("word-count", conf, builder.createTopology());
20.   }
}

这里演示的是bolt节点并发度为1的窗口功能,实际生产中,因为数据量很大,往往将bolt节点的并发度设置为多个,这个时候我们的SlidingWindowBolt就无法统计出一个窗口的数值总和了。因为每一个bolt的并行节点只能统计自己一个窗口接收到数据的总和,无法统计出一个窗口内全局数据的总和,借助redis来实现是可以的,但是必须引入redis的事务机制或者借助分布式锁,否则会出现脏数据的情况。在这里我们介绍另一种实现方式就是灵活的使用storm提供的窗口功能,只是窗口的tuple数。

仍然是使用上面提供的类,只是我们增加一个bolt类,来统计每个SlidingWindowBolt节点发送给它的数值。

1.public class CountWord extends BaseWindowedBolt{
2.    
3.    private static final long serialVersionUID = -5283595260540124273L;
4.    
5.    private OutputCollector collector;
6.    
7.    public void prepare(Map stormConf, TopologyContext context
8.                             , OutputCollector collector) {
9.        this.collector = collector;
10.    }
11.    
12.    public void execute(TupleWindow inputWindow) {
13.         int sum=0;
14.         for(Tuple tuple: inputWindow.get()) {
15.             int i=(Integer) tuple.getValueByField("count");
16.               System.out.println("接收到一个bolt的总和值为:"+i);
17.               sum+=i;
18.          }
19.         System.out.println("一个窗口内的总值为:"+sum);
20.    }
21.
22.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
23.    }
}

然后我们注释RandomSentenceSpout22行代码,取消对23行代码的注释,方便观察结果。去掉SlidingWindowBolt20和25行代码。

topology启动类如下:

1.public class WindowsTopology {
2.
3.    public static void main(String[] args) throws Exception {
4.       TopologyBuilder builder = new TopologyBuilder();
5.       builder.setSpout("spout", new RandomSentenceSpout(), 1);
6.       
7.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
8.       .withWindow(new Duration(6, TimeUnit.SECONDS), 
9.               new Duration(2, TimeUnit.SECONDS)),2)
10.       .shuffleGrouping("spout");//每两秒统计最近6秒的数据
11.       
12.       builder.setBolt("countwordbolt", new CountWord()
13.       .withWindow(new Count(2), new Count(2)),1)
14.       .shuffleGrouping("slidingwindowbolt");
15.       //每收到2条tuple就统计最近两条统的数据
16.       Config conf = new Config();
17.       conf.setNumWorkers(1);
18.       LocalCluster cluster = new LocalCluster();
19.       cluster.submitTopology("word-count", conf, builder.createTopology());
20.   }
}

 

作者: intsmaze(刘洋)
老铁,你的--->推荐,--->关注,--->评论--->是我继续写作的动力。
微信公众号号:Apache技术研究院
由于博主能力有限,文中可能存在描述不正确,欢迎指正、补充!
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关文章
|
3月前
|
消息中间件 Java Kafka
Flink-08 Flink Java 3分钟上手 滑动窗口 SlidingWindow 时间驱动 事件驱动 TimeWindow CountWindow GlobalWindow
Flink-08 Flink Java 3分钟上手 滑动窗口 SlidingWindow 时间驱动 事件驱动 TimeWindow CountWindow GlobalWindow
96 7
|
分布式计算 数据处理 流计算
【原理】Flink如何巧用WaterMark机制解决乱序问题
【原理】Flink如何巧用WaterMark机制解决乱序问题
|
流计算
图解flink watermark工作原理
watermark 我们从因为单词拆解翻译,就是水位线。既然是水位线,所以我觉得通过试管(化学实验器皿)来描述这个最合适不过了
293 0
图解flink watermark工作原理
|
BI Apache 流计算
Apache Flink 概念介绍:有状态流式处理引擎的基石(二)| 学习笔记
快速学习 Apache Flink 概念介绍:有状态流式处理引擎的基石。
Apache Flink 概念介绍:有状态流式处理引擎的基石(二)| 学习笔记
|
大数据 数据处理 Apache
Apache Flink 概念介绍:有状态流式处理引擎的基石(一)| 学习笔记
快速学习 Apache Flink 概念介绍:有状态流式处理引擎的基石。
Apache Flink 概念介绍:有状态流式处理引擎的基石(一)| 学习笔记
|
存储 消息中间件 传感器
超越Storm,SparkStreaming——Flink如何实现有状态的计算
超越Storm,SparkStreaming——Flink如何实现有状态的计算
205 0
超越Storm,SparkStreaming——Flink如何实现有状态的计算
|
分布式计算 搜索推荐 NoSQL
|
SQL HIVE
Hive实战—时间滑动窗口计算
今天遇到一个需求大致是这样的,我们有一个业务涉及到用户打卡,用户可以一天多次打卡,我们希望计算出7天内打卡8次以上,且打卡时间分布在4天以上的时间。
765 0
Hive实战—时间滑动窗口计算
|
算法 流计算
Flink 实现自定义滑动窗口
背景 一般情况下 Flink 提供的窗口可以满足我们大部分的场景,但是有时候我们需要计算一个固定时间范围内的数据,比如实时计算每天凌晨到第二天凌晨的数据,或者每天上午 7 点到第二天上午 7 点。类似于这种情况 Flink 默认提供的窗口是不支持的,因为 Flink 计算窗口的开始时间和结束时间是根据数据本身携带的时间戳然后把数据划分到不同的窗口的,所以它不是一个固定的范围。这个时候就需要我们自己实现窗口划分的逻辑。Flink 提供了 WindowAssigner 抽象类,我们只需要实现 assignWindows 方法即可。
|
存储 Java API
Flink 原理与实现:Window 机制
Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。Flink 提供了非常完善的窗口机制,这是我认为的 Flink 最大的亮点之一(其他的亮点包括消息乱序处理,和 checkpoint 机制)。本文我们将介绍流式处理中的窗口概念,介绍 F
12242 0