(1)时间窗口概念
什么是时间窗口?
按一定的时间或者tuple数量来定义打包处理的大小称之为时间窗口。
使用场景:
计算每隔10分钟的销售额。
计算上1分钟成交金额
计算近1个小时最火爆的微博等。
Storm的时间窗口机制
Storm可同时处理窗口内的所有tuple。窗口可以从时间或数量上来划分,就是说可以根据时间段或 Tuple数量来定窗口大小。
有如下两种时间窗口:
滚动窗口 Tumbling Window
滑动窗口 Sliding Window
注意:时间窗口的计算,通常都是单并发(Executer)。
(1)滚动窗口 Tumbling Window
按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。
在这里插入代码片........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |... -5 0 5 10 15 -> time |<------- w1 -->| |<---------- w2 ----->| |<-------------- w3 ---->|
因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。
BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10))
(2)滑动窗口 Tumbling Window
按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。
| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |... 0 5 10 15 -> time w1 w2 w3
因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。
BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10))
该窗口每5秒评估一次,第一个窗口中的一些元组与第二个窗口重叠。
数据以步长为增量,按照窗口大小从右到左来确定要处理的数据量,并 去掉最左末尾数据。
(2)滚动窗口程序开发及测试
MainTopology:
package com.kfk.window; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseWindowedBolt; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 11:58 上午 */ public class MainTopology { public static void main(String[] args) { // 创建Topology TopologyBuilder builder = new TopologyBuilder(); // set Spout builder.setSpout("spout",new AmtSpout()); // set 滚动窗口 Tumbling Window builder.setBolt("amtBolt",new TumblingBolt() .withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)),1) .shuffleGrouping("spout"); // 设置日志等级 Config conf = new Config(); conf.setDebug(false); try { if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("filePrint", conf, builder.createTopology()); } }catch (Exception e){ e.printStackTrace(); } } }
AmtSpout:
package com.kfk.window; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; import java.util.Random; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 12:24 下午 */ public class AmtSpout implements IRichSpout { Integer[] amt = {10,20,30,40}; String[] time = {"2020-12-27 12:43","2020-12-25 12:43","2020-12-23 12:43","2020-12-18 12:43"}; String[] city = {"beijing","nanjing","shenzhen","shanghai","guangzhou"}; String[] product = {"java","python","c","scala"}; Random random = new Random(); SpoutOutputCollector spoutOutputCollector = null; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { spoutOutputCollector = collector; } catch (Exception e) { e.printStackTrace(); } } @Override public void close() { } @Override public void activate() { } @Override public void deactivate() { } @Override public void nextTuple() { try { // 模拟数据 int _amt = amt[random.nextInt(4)]; String _time = time[random.nextInt(4)]; String _city = city[random.nextInt(5)]; String _product = product[random.nextInt(4)]; // emit给Bolt节点 spoutOutputCollector.emit(new Values(String.valueOf(_amt),_time,_city,_product)); Thread.sleep(1000); } catch (Exception e){ e.printStackTrace(); } } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // set Fields declarer.declare(new Fields("amt","time","city","product")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
TumblingBolt:
package com.kfk.window; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseWindowedBolt; import org.apache.storm.tuple.Tuple; import org.apache.storm.windowing.TupleWindow; import java.util.List; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 12:09 下午 */ public class TumblingBolt extends BaseWindowedBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { super.prepare(stormConf, context, collector); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { super.declareOutputFields(declarer); } @Override public void execute(TupleWindow inputWindow) { List<Tuple> list = inputWindow.get(); int amt = 0; for (Tuple tuple:list){ amt += Integer.parseInt(tuple.getStringByField("amt")); } System.out.println(amt); } }
(2)滑动窗口程序开发及测试
MainTopology:
package com.kfk.window; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseWindowedBolt; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 11:58 上午 */ public class MainTopology { public static void main(String[] args) { // 创建Topology TopologyBuilder builder = new TopologyBuilder(); // set Spout builder.setSpout("spout",new AmtSpout()); // set 滑动窗口 Sliding Window builder.setBolt("amtBolt",new SliderBolt() .withWindow(BaseWindowedBolt.Duration.seconds(5), BaseWindowedBolt.Duration.seconds(2)),1) .shuffleGrouping("spout"); // 设置日志等级 Config conf = new Config(); conf.setDebug(false); try { if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("filePrint", conf, builder.createTopology()); } }catch (Exception e){ e.printStackTrace(); } } }
SliderBolt:
package com.kfk.window; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseWindowedBolt; import org.apache.storm.tuple.Tuple; import org.apache.storm.windowing.TupleWindow; import java.util.List; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 12:18 下午 */ public class SliderBolt extends BaseWindowedBolt { int amt = 0; OutputCollector outputCollector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { outputCollector = collector; super.prepare(stormConf, context, collector); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { super.declareOutputFields(declarer); } @Override public void execute(TupleWindow inputWindow) { List<Tuple> listNew = inputWindow.getNew(); List<Tuple> listExp = inputWindow.getExpired(); for (Tuple tuple : listNew){ amt += Integer.parseInt(tuple.getStringByField("amt")); } for (Tuple tuple : listExp){ int expAmt= Integer.parseInt(tuple.getStringByField("amt")); amt -=expAmt; } System.out.println("近3秒订单:"+amt); } }