一、Storm拓扑的并行度(parallelism)介绍
(1)运行拓扑的结构
工作进程: Worker Process,也称为Worker
执行器: Executor,即线程Thread
任务: Task
工作进程、执行器、任务三者之间关系如下图:
Topology由一个或多个Spout/Bolt组件构成。
运行中的Topology由一个或多个Supervisor节点中的Worker构成。默认情况下一个Supervisor节点运行4个Worker,由defaults.yaml/storm.yaml
中的属性决定:
supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
在代码中可以使用new Config().setNumWorkers(3),最大数量不能超过配置的supervisor.slots.ports数量。
Executor为特定拓扑的一个或多个组件Spout/Bolt实例运行一个或多个Task。默认情况下一个Executor运行一个Task。
Task执行真正的数据处理,代码中实现的每个Spout/bolt作为很多任务跨集群执行。一个Spout/Bolt组件的Task数量始终贯穿Topology的整个生命周期,但一个Spout/Bolt组件的Executor数量会随着时间而改变。这意味着Threads≤Tasks条件成立。默认情况下Task数量与Executor数量相同,即Storm会使用每个Executor运行一个Task。
(2)配置拓扑的并行度
这里所说的术语“并行度”主要是用于表示所谓的 parallelism_hint,它代表着一个组件的初始 executor (也是线程)数量。在这篇文章里,我们使用这个“并行度”术语来说明在 Storm 拓扑中既可以配置 executor 的数量,也可以配置 worker 和 task 的数量。如果“并行度”的概念需要表示其他的一般情况,我们也会特别指出。
下面的内容里显示了很多可配置选项,以及在代码中配置他们的方法。可以用于配置的方法有很多种,这里列出的只是其中一部分。另外需要注意的是:
Storm 的配置优先级为 defaults.yaml < storm.yaml < 拓扑配置 < 内置型组件信息配置 < 外置型组件信息配置。
(1)工作进程Worker数量
说明:拓扑在集群中运行所需要的工作进程数
配置选项:TOPOLOGY_WORKERS
在代码中使用
Config config = new Config(); //注意此参数不能大于supervisor.slots.ports数量。 config.setNumWorkers(3);
(2)执行器Executor数量
- 说明:每个组件需要的执行线程数
- 配置选项:(没有拓扑级的通用配置项)
- 在代码中使用
TopologyBuilder builder = new TopologyBuilder(); //设置Spout的Executor数量参数parallelism_hint builder.setSpout(id, spout, parallelism_hint); //设置Bolt的Executor数量参数parallelism_hint builder.setBolt(id, bolt, parallelism_hint);
(3)任务Task数量
- 说明:每个组件需要的执行任务数
- 配置选项:TOPOLOGY_TASKS
- 在代码中使用
TopologyBuilder builder = new TopologyBuilder(); //设置Spout的Executor数量参数parallelism_hint,Task数量参数val builder.setSpout(id, spout, parallelism_hint).setNumTasks(val); //设置Bolt的Executor数量参数parallelism_hint,Task数量参数val builder.setBolt(id, bolt, parallelism_hint).setNumTasks(val);
(3)改变运行中拓扑的并行度
Storm一个很好的特性是可以增加或减少工作进程Worker和Executor的数量而不需要重启集群或拓扑,这样的行为成为再平衡(rebalancing)。目前有两种方式可实现拓扑再平衡,如下:
使用Storm的WebUI
使用Storm的命令行工具,如下
# 重新配置拓扑 # “myTopology” 拓扑使用5个Worker进程 # “blue-spout” Spout使用3个Executor # “yellow-blot” Bolt使用10个Executor
二、Streaming Groupings流分组策略
数据从上游节点发送到下游节点时,当下游节点的并发度大于1时,我们对下 游节基于多并发情况下接受并处理数据的策略称之为分组策略。
stream grouping就是用来定义一个stream应该如何分配给Bolts上面的多个并发。掌握Shuffle Grouping和 Fields Grouping 即可。
storm里面有6种类型的stream grouping。
1.ShuffleGrouping: 用在非聚合计算,比如过滤、写库等功能性操作 随机派发stream里面的tuple,尽量保证每个bolt并发接收到的tuple数目相同,但不严格相同。0.10之前,shuffleGrouping是轮询分配,即每个bolt得到的数据量相同。
2.FieldsGrouping: 按Field分组进行聚合场景,比如按word来分组, 具有同样word会被分到相同的Bolt。
作用:
过滤,从源端(Spout或上一级Bolt)多输出Fields中选择某些Field
相同的tuple会分发给同一个Executer或task处理
典型场景: 去重操作、Join
2.Non Grouping: 无分组, 这种分组和Shuffle grouping是一样的效果,多线程下不平均分配。
4.All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。
5.Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
6.Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者决定由消息接收者的哪个task处理这 个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理 者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)
通过Mapreduce来对比FiledsGrouping
SQL:
select count(1), word from tableName group by word;
MR:
<key, value> —— <key, values> 把相同的key进行了聚合 shuffle混淆以后,相同的key发送到同一个reduce进程(线程)里,才能确保该key进行全局聚合。
数据倾斜根本原因:有的key的value少,有的多,两级分化严重。
Storm:
需要确保相同的key(tuple)必须发送给同一个bolt进程(线程),用fiedsGrouping来实现。
三、基于流分组策略应用程序的开发
案例需求:汇总每天的订单的交易量
MainTopology:
package com.kfk.pro1Grouping; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/28 * @time : 4:47 下午 */ public class MainTopology { public static void main(String[] args) { // 创建Topology TopologyBuilder builder = new TopologyBuilder(); // set Spout builder.setSpout("spout",new AmtSpout()); // set Bolt,按Field(time)分组,并设置并行度,是这个组件有几个executor来执行 builder.setBolt("amtBolt",new AmtBolt(),4).fieldsGrouping("spout",new Fields("time")); builder.setBolt("printBolt",new PrintBolt(),2).shuffleGrouping("amtBolt"); // 设置日志等级 Config conf = new Config(); conf.setDebug(false); try { // 本地模式运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("amtTopo", conf, builder.createTopology()); } catch (Exception e){ e.printStackTrace(); } } }
AmtSpout:
package com.kfk.pro1Grouping; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; import java.util.Random; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/28 * @time : 4:52 下午 */ public class AmtSpout implements IRichSpout { Integer[] amt = {10,20,30,40}; String[] time = {"2020-12-27 12:43","2020-12-25 12:43","2020-12-23 12:43","2020-12-18 12:43"}; String[] city = {"beijing","nanjing","shenzhen","shanghai","guangzhou"}; String[] product = {"java","python","c","scala"}; SpoutOutputCollector spoutOutputCollector = null; Random random = new Random(); @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { spoutOutputCollector = collector; } @Override public void close() { } @Override public void activate() { } @Override public void deactivate() { } @Override public void nextTuple() { // 模拟数据 int _amt = amt[random.nextInt(4)]; String _time = time[random.nextInt(4)]; String _city = city[random.nextInt(5)]; String _product = product[random.nextInt(4)]; // emit给Bolt节点 spoutOutputCollector.emit(new Values(String.valueOf(_amt),_time,_city,_product)); } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // set Fields declarer.declare(new Fields("amt","time","city","product")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
AmtBolt:
package com.kfk.pro1Grouping; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.HashMap; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/28 * @time : 4:50 下午 */ public class AmtBolt implements IRichBolt { Map<String,Integer> amtMap = new HashMap<String,Integer>(); OutputCollector outputCollector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { outputCollector = collector; } @Override public void execute(Tuple input) { // Bolt业务逻辑处理 String time = input.getStringByField("time"); int amt = Integer.parseInt(input.getStringByField("amt")); // 累加amt次数 if (amtMap.get(time) != null){ amt += amtMap.get(time); } amtMap.put(time,amt); // emit给Bolt节点 outputCollector.emit(new Values(amtMap)); } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("res")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
PrintBolt:
package com.kfk.pro1Grouping; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/28 * @time : 4:51 下午 */ public class PrintBolt implements IRichBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple input) { System.out.println(input.getValue(0)); } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
运行结果:
{2020-12-27 12:43=43471960, 2020-12-23 12:43=43497930} {2020-12-27 12:43=43472120, 2020-12-23 12:43=43498060} {2020-12-18 12:43=43479050, 2020-12-25 12:43=43458280} {2020-12-18 12:43=43480060, 2020-12-25 12:43=43459440} {2020-12-18 12:43=43480120, 2020-12-25 12:43=43459470} {2020-12-18 12:43=43480280, 2020-12-25 12:43=43459610} ...