Storm时间窗口

简介: 笔记

(1)时间窗口概念


什么是时间窗口?

按一定的时间或者tuple数量来定义打包处理的大小称之为时间窗口。


使用场景:


计算每隔10分钟的销售额。

计算上1分钟成交金额

计算近1个小时最火爆的微博等。

Storm的时间窗口机制

Storm可同时处理窗口内的所有tuple。窗口可以从时间或数量上来划分,就是说可以根据时间段或 Tuple数量来定窗口大小。


有如下两种时间窗口:


滚动窗口 Tumbling Window

滑动窗口 Sliding Window

注意:时间窗口的计算,通常都是单并发(Executer)。


(1)滚动窗口 Tumbling Window1.png

按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。

在这里插入代码片........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
-5      0       5            10          15   -> time
|<------- w1 -->|
        |<---------- w2 ----->|
                |<-------------- w3 ---->|

因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。

BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10))

(2)滑动窗口 Tumbling Window

2.png

按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。

| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0       5             10         15    -> time
   w1         w2            w3

因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。

BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10))

该窗口每5秒评估一次,第一个窗口中的一些元组与第二个窗口重叠。

数据以步长为增量,按照窗口大小从右到左来确定要处理的数据量,并 去掉最左末尾数据。


(2)滚动窗口程序开发及测试


MainTopology:

package com.kfk.window;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 11:58 上午
 */
public class MainTopology {
    public static void main(String[] args) {
        // 创建Topology
        TopologyBuilder builder = new TopologyBuilder();
        // set Spout
        builder.setSpout("spout",new AmtSpout());
        // set 滚动窗口 Tumbling Window
        builder.setBolt("amtBolt",new TumblingBolt()
                .withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)),1)
                .shuffleGrouping("spout");
        // 设置日志等级
        Config conf = new Config();
        conf.setDebug(false);
        try {
            if (args != null && args.length > 0) {
                conf.setNumWorkers(3);
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("filePrint", conf, builder.createTopology());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

AmtSpout:

package com.kfk.window;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 12:24 下午
 */
public class AmtSpout implements IRichSpout {
    Integer[] amt = {10,20,30,40};
    String[] time = {"2020-12-27 12:43","2020-12-25 12:43","2020-12-23 12:43","2020-12-18 12:43"};
    String[] city = {"beijing","nanjing","shenzhen","shanghai","guangzhou"};
    String[] product = {"java","python","c","scala"};
    Random random = new Random();
    SpoutOutputCollector spoutOutputCollector = null;
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        try {
            spoutOutputCollector = collector;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public void close() {
    }
    @Override
    public void activate() {
    }
    @Override
    public void deactivate() {
    }
    @Override
    public void nextTuple() {
        try {
            // 模拟数据
            int _amt = amt[random.nextInt(4)];
            String _time = time[random.nextInt(4)];
            String _city = city[random.nextInt(5)];
            String _product = product[random.nextInt(4)];
            // emit给Bolt节点
            spoutOutputCollector.emit(new Values(String.valueOf(_amt),_time,_city,_product));
            Thread.sleep(1000);
        } catch (Exception e){
            e.printStackTrace();
        }
    }
    @Override
    public void ack(Object msgId) {
    }
    @Override
    public void fail(Object msgId) {
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // set Fields
        declarer.declare(new Fields("amt","time","city","product"));
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

TumblingBolt:

package com.kfk.window;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;
import java.util.List;
import java.util.Map;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 12:09 下午
 */
public class TumblingBolt extends BaseWindowedBolt {
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        super.prepare(stormConf, context, collector);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        super.declareOutputFields(declarer);
    }
    @Override
    public void execute(TupleWindow inputWindow) {
        List<Tuple> list = inputWindow.get();
        int amt = 0;
        for (Tuple tuple:list){
            amt += Integer.parseInt(tuple.getStringByField("amt"));
        }
        System.out.println(amt);
    }
}

(2)滑动窗口程序开发及测试


MainTopology:

package com.kfk.window;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 11:58 上午
 */
public class MainTopology {
    public static void main(String[] args) {
        // 创建Topology
        TopologyBuilder builder = new TopologyBuilder();
        // set Spout
        builder.setSpout("spout",new AmtSpout());
        // set 滑动窗口 Sliding Window
        builder.setBolt("amtBolt",new SliderBolt()
                .withWindow(BaseWindowedBolt.Duration.seconds(5),
                        BaseWindowedBolt.Duration.seconds(2)),1)
                .shuffleGrouping("spout");
        // 设置日志等级
        Config conf = new Config();
        conf.setDebug(false);
        try {
            if (args != null && args.length > 0) {
                conf.setNumWorkers(3);
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("filePrint", conf, builder.createTopology());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

SliderBolt:

package com.kfk.window;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;
import java.util.List;
import java.util.Map;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 12:18 下午
 */
public class SliderBolt extends BaseWindowedBolt {
    int amt = 0;
    OutputCollector outputCollector = null;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        outputCollector = collector;
        super.prepare(stormConf, context, collector);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        super.declareOutputFields(declarer);
    }
    @Override
    public void execute(TupleWindow inputWindow) {
        List<Tuple> listNew = inputWindow.getNew();
        List<Tuple> listExp = inputWindow.getExpired();
        for (Tuple tuple : listNew){
            amt += Integer.parseInt(tuple.getStringByField("amt"));
        }
        for (Tuple tuple : listExp){
            int expAmt= Integer.parseInt(tuple.getStringByField("amt"));
            amt -=expAmt;
        }
        System.out.println("近3秒订单:"+amt);
    }
}



40.png

相关文章
|
3月前
|
流计算 Windows
Flink窗口与状态编程开发(一)
Flink窗口与状态编程开发(一)
|
4月前
|
流计算
Flink窗口——window
Flink窗口——window
24 0
|
6月前
|
消息中间件 程序员 API
Flink中时间和窗口
Flink中时间和窗口
99 0
|
8月前
|
SQL 数据挖掘 HIVE
Flink的窗口类型详解
Flink的窗口类型详解
|
3月前
|
监控 Java 流计算
Flink中的事件时间和处理时间有什么区别?为什么事件时间在流计算中很重要?
Flink中的事件时间和处理时间有什么区别?为什么事件时间在流计算中很重要?
30 0
|
4月前
|
SQL 存储 缓存
Flink(八)【窗口】
Flink(八)【窗口】
|
8月前
|
消息中间件 并行计算 Java
10分钟了解Flink窗口计算
在有状态流处理中,时间在计算中起着重要的作用。比如,当进行时间序列分析、基于特定时间段进行聚合,或者进行事件时间去处理数据时,都与时间相关。接下来将重点介绍在使用实时Flink应用程序时应该考虑的跟时间相关的一些元素。
10分钟了解Flink窗口计算
|
8月前
|
缓存 API 流计算
Flink之窗口 (Window) 下篇1
Flink之窗口 (Window) 下篇
83 0
|
8月前
|
API 流计算 Windows
Flink之窗口 (Window) 下篇2
Flink之窗口 (Window) 下篇
98 0
|
API Apache 流计算
Flink窗口的生命周期
Flink窗口的生命周期
Flink窗口的生命周期