Storm时间窗口

简介: 笔记

(1)时间窗口概念


什么是时间窗口?

按一定的时间或者tuple数量来定义打包处理的大小称之为时间窗口。


使用场景:


计算每隔10分钟的销售额。

计算上1分钟成交金额

计算近1个小时最火爆的微博等。

Storm的时间窗口机制

Storm可同时处理窗口内的所有tuple。窗口可以从时间或数量上来划分,就是说可以根据时间段或 Tuple数量来定窗口大小。


有如下两种时间窗口:


滚动窗口 Tumbling Window

滑动窗口 Sliding Window

注意:时间窗口的计算,通常都是单并发(Executer)。


(1)滚动窗口 Tumbling Window1.png

按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。

在这里插入代码片........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
-5      0       5            10          15   -> time
|<------- w1 -->|
        |<---------- w2 ----->|
                |<-------------- w3 ---->|

因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。

BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10))

(2)滑动窗口 Tumbling Window

2.png

按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。

| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0       5             10         15    -> time
   w1         w2            w3

因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。

BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10))

该窗口每5秒评估一次,第一个窗口中的一些元组与第二个窗口重叠。

数据以步长为增量,按照窗口大小从右到左来确定要处理的数据量,并 去掉最左末尾数据。


(2)滚动窗口程序开发及测试


MainTopology:

package com.kfk.window;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 11:58 上午
 */
public class MainTopology {
    public static void main(String[] args) {
        // 创建Topology
        TopologyBuilder builder = new TopologyBuilder();
        // set Spout
        builder.setSpout("spout",new AmtSpout());
        // set 滚动窗口 Tumbling Window
        builder.setBolt("amtBolt",new TumblingBolt()
                .withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)),1)
                .shuffleGrouping("spout");
        // 设置日志等级
        Config conf = new Config();
        conf.setDebug(false);
        try {
            if (args != null && args.length > 0) {
                conf.setNumWorkers(3);
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("filePrint", conf, builder.createTopology());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

AmtSpout:

package com.kfk.window;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 12:24 下午
 */
public class AmtSpout implements IRichSpout {
    Integer[] amt = {10,20,30,40};
    String[] time = {"2020-12-27 12:43","2020-12-25 12:43","2020-12-23 12:43","2020-12-18 12:43"};
    String[] city = {"beijing","nanjing","shenzhen","shanghai","guangzhou"};
    String[] product = {"java","python","c","scala"};
    Random random = new Random();
    SpoutOutputCollector spoutOutputCollector = null;
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        try {
            spoutOutputCollector = collector;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public void close() {
    }
    @Override
    public void activate() {
    }
    @Override
    public void deactivate() {
    }
    @Override
    public void nextTuple() {
        try {
            // 模拟数据
            int _amt = amt[random.nextInt(4)];
            String _time = time[random.nextInt(4)];
            String _city = city[random.nextInt(5)];
            String _product = product[random.nextInt(4)];
            // emit给Bolt节点
            spoutOutputCollector.emit(new Values(String.valueOf(_amt),_time,_city,_product));
            Thread.sleep(1000);
        } catch (Exception e){
            e.printStackTrace();
        }
    }
    @Override
    public void ack(Object msgId) {
    }
    @Override
    public void fail(Object msgId) {
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // set Fields
        declarer.declare(new Fields("amt","time","city","product"));
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

TumblingBolt:

package com.kfk.window;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;
import java.util.List;
import java.util.Map;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 12:09 下午
 */
public class TumblingBolt extends BaseWindowedBolt {
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        super.prepare(stormConf, context, collector);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        super.declareOutputFields(declarer);
    }
    @Override
    public void execute(TupleWindow inputWindow) {
        List<Tuple> list = inputWindow.get();
        int amt = 0;
        for (Tuple tuple:list){
            amt += Integer.parseInt(tuple.getStringByField("amt"));
        }
        System.out.println(amt);
    }
}

(2)滑动窗口程序开发及测试


MainTopology:

package com.kfk.window;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 11:58 上午
 */
public class MainTopology {
    public static void main(String[] args) {
        // 创建Topology
        TopologyBuilder builder = new TopologyBuilder();
        // set Spout
        builder.setSpout("spout",new AmtSpout());
        // set 滑动窗口 Sliding Window
        builder.setBolt("amtBolt",new SliderBolt()
                .withWindow(BaseWindowedBolt.Duration.seconds(5),
                        BaseWindowedBolt.Duration.seconds(2)),1)
                .shuffleGrouping("spout");
        // 设置日志等级
        Config conf = new Config();
        conf.setDebug(false);
        try {
            if (args != null && args.length > 0) {
                conf.setNumWorkers(3);
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("filePrint", conf, builder.createTopology());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

SliderBolt:

package com.kfk.window;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;
import java.util.List;
import java.util.Map;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 12:18 下午
 */
public class SliderBolt extends BaseWindowedBolt {
    int amt = 0;
    OutputCollector outputCollector = null;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        outputCollector = collector;
        super.prepare(stormConf, context, collector);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        super.declareOutputFields(declarer);
    }
    @Override
    public void execute(TupleWindow inputWindow) {
        List<Tuple> listNew = inputWindow.getNew();
        List<Tuple> listExp = inputWindow.getExpired();
        for (Tuple tuple : listNew){
            amt += Integer.parseInt(tuple.getStringByField("amt"));
        }
        for (Tuple tuple : listExp){
            int expAmt= Integer.parseInt(tuple.getStringByField("amt"));
            amt -=expAmt;
        }
        System.out.println("近3秒订单:"+amt);
    }
}



40.png

相关文章
|
Docker 容器
Star 1.8k! 推荐一款更优雅的微信文章订阅工具,让阅读更便捷!
Star 1.8k! 推荐一款更优雅的微信文章订阅工具,让阅读更便捷!
227 3
|
测试技术 C++ iOS开发
c++IO库详细介绍
前言 简单分享一下c++ IO相关的一些知识点,希望对大家有用
217 0
|
5月前
|
Ubuntu 关系型数据库 Linux
Linux数据库安装
本文介绍了在CentOS 8.0和Ubuntu 22.04系统上安装、配置和启动MariaDB数据库服务器的详细步骤。包括通过`yum`和`apt`包管理器安装MariaDB服务,启动并检查服务运行状态,设置root用户密码以及连接数据库的基本操作。此外,还展示了如何在Ubuntu上更新软件包列表、安装依赖项,并验证MariaDB的版本和运行状态。通过这些步骤,用户可以成功部署并初始化MariaDB环境,为后续数据库管理与应用开发奠定基础。
247 61
|
7月前
|
缓存 Ubuntu Linux
Linux中yum、rpm、apt-get、wget的区别,yum、rpm、apt-get常用命令,CentOS、Ubuntu中安装wget
通过本文,我们详细了解了 `yum`、`rpm`、`apt-get`和 `wget`的区别、常用命令以及在CentOS和Ubuntu中安装 `wget`的方法。`yum`和 `apt-get`是高层次的包管理器,分别用于RPM系和Debian系发行版,能够自动解决依赖问题;而 `rpm`是低层次的包管理工具,适合处理单个包;`wget`则是一个功能强大的下载工具,适用于各种下载任务。在实际使用中,根据系统类型和任务需求选择合适的工具,可以大大提高工作效率和系统管理的便利性。
694 25
|
9月前
|
存储 关系型数据库 数据库
极简开发,极速上线:构建端到端大模型应用
本文将以一个经典的 RAG(检索增强生成)知识问答系统为例,详细介绍从智能体设计到最终应用部署的全流程。
1397 82
|
11月前
|
弹性计算
新手必看,阿里云国际购买服务器带宽如何选择
新手必看,阿里云国际购买服务器带宽如何选择
|
Web App开发 监控 前端开发
《吐血整理》保姆级系列教程-玩转Fiddler抓包教程(2)-初识Fiddler让你理性认识一下
【7月更文挑战第17天】Fiddler是一款强大的HTTP(S)抓包与调试工具,适用于Windows,免费且跨平台,可用于查看、分析、修改客户端与服务器之间的数据包。它在开发和测试中尤其有用,帮助定位bug、进行接口测试、模拟数据、分析性能。Fiddler支持HTTPS解密和移动设备抓包,与其他抓包工具如Wireshark、Charles相比,它更易用且支持移动应用。通过设置代理,Fiddler能监控所有通过的HTTP流量,包括请求和响应,允许设置断点和篡改数据。对于初学者,它是理解HTTP协议和解决网络问题的利器。
331 3
|
11月前
|
机器学习/深度学习 算法 搜索推荐
深度学习之差分隐私
基于深度学习的差分隐私是一种在保护用户隐私的同时使用数据进行模型训练的技术。它的核心理念是通过加入随机噪声来隐藏个体数据的影响,防止在分析或模型训练过程中泄露个人信息。
1063 1
|
10月前
|
调度 Docker 容器
【赵渝强老师】Docker Swarm集群的体系架构
Docker Swarm自1.12.0版本起集成至Docker引擎,无需单独安装。它内置服务发现功能,支持跨多服务器或宿主机创建容器,形成集群提供服务。相比之下,Docker Compose仅限于单个宿主机。Docker Swarm采用主从架构,Swarm Manager负责管理和调度集群中的容器资源,用户通过其接口发送指令,Swarm Node根据指令创建容器运行应用。
156 0
|
Web App开发 Android开发
FFmpeg开发笔记(四十六)利用SRT协议构建手机APP的直播Demo
实时数据传输在互联网中至关重要,不仅支持即时通讯如QQ、微信的文字与图片传输,还包括音视频通信。一对一通信常采用WebRTC技术,如《Android Studio开发实战》中的App集成示例;而一对多的在线直播则需部署独立的流媒体服务器,使用如SRT等协议。SRT因其优越的直播质量正逐渐成为主流。本文档概述了SRT协议的使用,包括通过OBS Studio和SRT Streamer进行SRT直播推流的方法,并展示了推流与拉流的成功实例。更多细节参见《FFmpeg开发实战》一书。
274 1
FFmpeg开发笔记(四十六)利用SRT协议构建手机APP的直播Demo