Storm时间窗口

简介: 笔记

(1)时间窗口概念


什么是时间窗口?

按一定的时间或者tuple数量来定义打包处理的大小称之为时间窗口。


使用场景:


计算每隔10分钟的销售额。

计算上1分钟成交金额

计算近1个小时最火爆的微博等。

Storm的时间窗口机制

Storm可同时处理窗口内的所有tuple。窗口可以从时间或数量上来划分,就是说可以根据时间段或 Tuple数量来定窗口大小。


有如下两种时间窗口:


滚动窗口 Tumbling Window

滑动窗口 Sliding Window

注意:时间窗口的计算,通常都是单并发(Executer)。


(1)滚动窗口 Tumbling Window1.png

按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。

在这里插入代码片........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
-5      0       5            10          15   -> time
|<------- w1 -->|
        |<---------- w2 ----->|
                |<-------------- w3 ---->|

因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。

BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10))

(2)滑动窗口 Tumbling Window

2.png

按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。

| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0       5             10         15    -> time
   w1         w2            w3

因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。

BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10))

该窗口每5秒评估一次,第一个窗口中的一些元组与第二个窗口重叠。

数据以步长为增量,按照窗口大小从右到左来确定要处理的数据量,并 去掉最左末尾数据。


(2)滚动窗口程序开发及测试


MainTopology:

package com.kfk.window;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 11:58 上午
 */
public class MainTopology {
    public static void main(String[] args) {
        // 创建Topology
        TopologyBuilder builder = new TopologyBuilder();
        // set Spout
        builder.setSpout("spout",new AmtSpout());
        // set 滚动窗口 Tumbling Window
        builder.setBolt("amtBolt",new TumblingBolt()
                .withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)),1)
                .shuffleGrouping("spout");
        // 设置日志等级
        Config conf = new Config();
        conf.setDebug(false);
        try {
            if (args != null && args.length > 0) {
                conf.setNumWorkers(3);
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("filePrint", conf, builder.createTopology());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

AmtSpout:

package com.kfk.window;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 12:24 下午
 */
public class AmtSpout implements IRichSpout {
    Integer[] amt = {10,20,30,40};
    String[] time = {"2020-12-27 12:43","2020-12-25 12:43","2020-12-23 12:43","2020-12-18 12:43"};
    String[] city = {"beijing","nanjing","shenzhen","shanghai","guangzhou"};
    String[] product = {"java","python","c","scala"};
    Random random = new Random();
    SpoutOutputCollector spoutOutputCollector = null;
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        try {
            spoutOutputCollector = collector;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public void close() {
    }
    @Override
    public void activate() {
    }
    @Override
    public void deactivate() {
    }
    @Override
    public void nextTuple() {
        try {
            // 模拟数据
            int _amt = amt[random.nextInt(4)];
            String _time = time[random.nextInt(4)];
            String _city = city[random.nextInt(5)];
            String _product = product[random.nextInt(4)];
            // emit给Bolt节点
            spoutOutputCollector.emit(new Values(String.valueOf(_amt),_time,_city,_product));
            Thread.sleep(1000);
        } catch (Exception e){
            e.printStackTrace();
        }
    }
    @Override
    public void ack(Object msgId) {
    }
    @Override
    public void fail(Object msgId) {
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // set Fields
        declarer.declare(new Fields("amt","time","city","product"));
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

TumblingBolt:

package com.kfk.window;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;
import java.util.List;
import java.util.Map;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 12:09 下午
 */
public class TumblingBolt extends BaseWindowedBolt {
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        super.prepare(stormConf, context, collector);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        super.declareOutputFields(declarer);
    }
    @Override
    public void execute(TupleWindow inputWindow) {
        List<Tuple> list = inputWindow.get();
        int amt = 0;
        for (Tuple tuple:list){
            amt += Integer.parseInt(tuple.getStringByField("amt"));
        }
        System.out.println(amt);
    }
}

(2)滑动窗口程序开发及测试


MainTopology:

package com.kfk.window;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 11:58 上午
 */
public class MainTopology {
    public static void main(String[] args) {
        // 创建Topology
        TopologyBuilder builder = new TopologyBuilder();
        // set Spout
        builder.setSpout("spout",new AmtSpout());
        // set 滑动窗口 Sliding Window
        builder.setBolt("amtBolt",new SliderBolt()
                .withWindow(BaseWindowedBolt.Duration.seconds(5),
                        BaseWindowedBolt.Duration.seconds(2)),1)
                .shuffleGrouping("spout");
        // 设置日志等级
        Config conf = new Config();
        conf.setDebug(false);
        try {
            if (args != null && args.length > 0) {
                conf.setNumWorkers(3);
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("filePrint", conf, builder.createTopology());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

SliderBolt:

package com.kfk.window;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;
import java.util.List;
import java.util.Map;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 12:18 下午
 */
public class SliderBolt extends BaseWindowedBolt {
    int amt = 0;
    OutputCollector outputCollector = null;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        outputCollector = collector;
        super.prepare(stormConf, context, collector);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        super.declareOutputFields(declarer);
    }
    @Override
    public void execute(TupleWindow inputWindow) {
        List<Tuple> listNew = inputWindow.getNew();
        List<Tuple> listExp = inputWindow.getExpired();
        for (Tuple tuple : listNew){
            amt += Integer.parseInt(tuple.getStringByField("amt"));
        }
        for (Tuple tuple : listExp){
            int expAmt= Integer.parseInt(tuple.getStringByField("amt"));
            amt -=expAmt;
        }
        System.out.println("近3秒订单:"+amt);
    }
}



40.png

相关文章
|
6月前
|
流计算 Windows
Flink窗口与状态编程开发(一)
Flink窗口与状态编程开发(一)
|
消息中间件 程序员 API
Flink中时间和窗口
Flink中时间和窗口
167 0
|
SQL 数据挖掘 HIVE
Flink的窗口类型详解
Flink的窗口类型详解
|
Scala 流计算
Flink / Scala - 使用 CountWindow 实现按条数触发窗口
CountWindow 数量窗口分为滑动窗口与滚动窗口,类似于之前 TimeWindow 的滚动时间与滑动时间,这里滚动窗口不存在元素重复而滑动窗口存在元素重复的情况,下面 demo 场景为非重复场景,所以将采用滚动窗口。......
893 0
Flink / Scala - 使用 CountWindow 实现按条数触发窗口
|
1月前
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
96 0
|
1月前
|
SQL 分布式计算 大数据
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
71 0
|
3月前
|
SQL 缓存 测试技术
实时计算 Flink版产品使用问题之如何实现滚动窗口统计用户不重复的总数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL 数据处理 API
实时计算 Flink版产品使用问题之设置什么可以控制非binlogSource表的轮询时间
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
BI 数据处理 Apache
[AIGC] 深入理解Flink中的窗口、水位线和定时器
[AIGC] 深入理解Flink中的窗口、水位线和定时器
103 2
|
6月前
|
监控 Java 流计算
Flink中的事件时间和处理时间有什么区别?为什么事件时间在流计算中很重要?
Flink中的事件时间和处理时间有什么区别?为什么事件时间在流计算中很重要?
97 0