Storm拓扑并行度与及流分组策略

简介: 笔记

一、Storm拓扑的并行度(parallelism)介绍


(1)运行拓扑的结构

工作进程: Worker Process,也称为Worker

执行器: Executor,即线程Thread

任务: Task

工作进程、执行器、任务三者之间关系如下图:

25.pngTopology由一个或多个Spout/Bolt组件构成。

运行中的Topology由一个或多个Supervisor节点中的Worker构成。默认情况下一个Supervisor节点运行4个Worker,由defaults.yaml/storm.yaml中的属性决定:

supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

在代码中可以使用new Config().setNumWorkers(3),最大数量不能超过配置的supervisor.slots.ports数量。


Executor为特定拓扑的一个或多个组件Spout/Bolt实例运行一个或多个Task。默认情况下一个Executor运行一个Task。


Task执行真正的数据处理,代码中实现的每个Spout/bolt作为很多任务跨集群执行。一个Spout/Bolt组件的Task数量始终贯穿Topology的整个生命周期,但一个Spout/Bolt组件的Executor数量会随着时间而改变。这意味着Threads≤Tasks条件成立。默认情况下Task数量与Executor数量相同,即Storm会使用每个Executor运行一个Task。


(2)配置拓扑的并行度

这里所说的术语“并行度”主要是用于表示所谓的 parallelism_hint,它代表着一个组件的初始 executor (也是线程)数量。在这篇文章里,我们使用这个“并行度”术语来说明在 Storm 拓扑中既可以配置 executor 的数量,也可以配置 worker 和 task 的数量。如果“并行度”的概念需要表示其他的一般情况,我们也会特别指出。


下面的内容里显示了很多可配置选项,以及在代码中配置他们的方法。可以用于配置的方法有很多种,这里列出的只是其中一部分。另外需要注意的是:


Storm 的配置优先级为 defaults.yaml < storm.yaml < 拓扑配置 < 内置型组件信息配置 < 外置型组件信息配置。


(1)工作进程Worker数量

说明:拓扑在集群中运行所需要的工作进程数

配置选项:TOPOLOGY_WORKERS

在代码中使用

Config config = new Config();
//注意此参数不能大于supervisor.slots.ports数量。
config.setNumWorkers(3);    

(2)执行器Executor数量

  • 说明:每个组件需要的执行线程数
  • 配置选项:(没有拓扑级的通用配置项)
  • 在代码中使用
TopologyBuilder builder = new TopologyBuilder();
//设置Spout的Executor数量参数parallelism_hint
builder.setSpout(id, spout, parallelism_hint);
//设置Bolt的Executor数量参数parallelism_hint        
builder.setBolt(id, bolt, parallelism_hint);        

(3)任务Task数量

  • 说明:每个组件需要的执行任务数
  • 配置选项:TOPOLOGY_TASKS
  • 在代码中使用
TopologyBuilder builder = new TopologyBuilder();
//设置Spout的Executor数量参数parallelism_hint,Task数量参数val
builder.setSpout(id, spout, parallelism_hint).setNumTasks(val);  
//设置Bolt的Executor数量参数parallelism_hint,Task数量参数val     
builder.setBolt(id, bolt, parallelism_hint).setNumTasks(val);            


(3)改变运行中拓扑的并行度

Storm一个很好的特性是可以增加或减少工作进程Worker和Executor的数量而不需要重启集群或拓扑,这样的行为成为再平衡(rebalancing)。目前有两种方式可实现拓扑再平衡,如下:


使用Storm的WebUI

使用Storm的命令行工具,如下

# 重新配置拓扑
# “myTopology” 拓扑使用5个Worker进程
# “blue-spout” Spout使用3个Executor
# “yellow-blot” Bolt使用10个Executor


二、Streaming Groupings流分组策略


数据从上游节点发送到下游节点时,当下游节点的并发度大于1时,我们对下 游节基于多并发情况下接受并处理数据的策略称之为分组策略。

30.png

stream grouping就是用来定义一个stream应该如何分配给Bolts上面的多个并发。掌握Shuffle Grouping和 Fields Grouping 即可。


storm里面有6种类型的stream grouping。


1.ShuffleGrouping: 用在非聚合计算,比如过滤、写库等功能性操作 随机派发stream里面的tuple,尽量保证每个bolt并发接收到的tuple数目相同,但不严格相同。0.10之前,shuffleGrouping是轮询分配,即每个bolt得到的数据量相同。


2.FieldsGrouping: 按Field分组进行聚合场景,比如按word来分组, 具有同样word会被分到相同的Bolt。


作用:


过滤,从源端(Spout或上一级Bolt)多输出Fields中选择某些Field

相同的tuple会分发给同一个Executer或task处理

典型场景: 去重操作、Join


2.Non Grouping: 无分组, 这种分组和Shuffle grouping是一样的效果,多线程下不平均分配。


4.All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。


5.Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。


6.Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者决定由消息接收者的哪个task处理这 个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理 者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)


通过Mapreduce来对比FiledsGrouping

SQL:

select count(1), word from tableName group by word;

MR:

<key, value> —— <key, values> 把相同的key进行了聚合 shuffle混淆以后,相同的key发送到同一个reduce进程(线程)里,才能确保该key进行全局聚合。

数据倾斜根本原因:有的key的value少,有的多,两级分化严重。


Storm:

需要确保相同的key(tuple)必须发送给同一个bolt进程(线程),用fiedsGrouping来实现。


三、基于流分组策略应用程序的开发


案例需求:汇总每天的订单的交易量


MainTopology:

package com.kfk.pro1Grouping;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/28
 * @time : 4:47 下午
 */
public class MainTopology {
    public static void main(String[] args) {
        // 创建Topology
        TopologyBuilder builder = new TopologyBuilder();
        // set Spout
        builder.setSpout("spout",new AmtSpout());
        // set Bolt,按Field(time)分组,并设置并行度,是这个组件有几个executor来执行
        builder.setBolt("amtBolt",new AmtBolt(),4).fieldsGrouping("spout",new Fields("time"));
        builder.setBolt("printBolt",new PrintBolt(),2).shuffleGrouping("amtBolt");
        // 设置日志等级
        Config conf = new Config();
        conf.setDebug(false);
        try {
            // 本地模式运行
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("amtTopo", conf, builder.createTopology());
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}

AmtSpout:

package com.kfk.pro1Grouping;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/28
 * @time : 4:52 下午
 */
public class AmtSpout implements IRichSpout {
    Integer[] amt = {10,20,30,40};
    String[] time = {"2020-12-27 12:43","2020-12-25 12:43","2020-12-23 12:43","2020-12-18 12:43"};
    String[] city = {"beijing","nanjing","shenzhen","shanghai","guangzhou"};
    String[] product = {"java","python","c","scala"};
    SpoutOutputCollector spoutOutputCollector = null;
    Random random = new Random();
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        spoutOutputCollector = collector;
    }
    @Override
    public void close() {
    }
    @Override
    public void activate() {
    }
    @Override
    public void deactivate() {
    }
    @Override
    public void nextTuple() {
        // 模拟数据
        int _amt = amt[random.nextInt(4)];
        String _time = time[random.nextInt(4)];
        String _city = city[random.nextInt(5)];
        String _product = product[random.nextInt(4)];
        // emit给Bolt节点
        spoutOutputCollector.emit(new Values(String.valueOf(_amt),_time,_city,_product));
    }
    @Override
    public void ack(Object msgId) {
    }
    @Override
    public void fail(Object msgId) {
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // set Fields
        declarer.declare(new Fields("amt","time","city","product"));
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

AmtBolt:

package com.kfk.pro1Grouping;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/28
 * @time : 4:50 下午
 */
public class AmtBolt implements IRichBolt {
    Map<String,Integer> amtMap = new HashMap<String,Integer>();
    OutputCollector outputCollector = null;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        outputCollector = collector;
    }
    @Override
    public void execute(Tuple input) {
        // Bolt业务逻辑处理
        String time = input.getStringByField("time");
        int amt = Integer.parseInt(input.getStringByField("amt"));
        // 累加amt次数
        if (amtMap.get(time) != null){
            amt += amtMap.get(time);
        }
        amtMap.put(time,amt);
        // emit给Bolt节点
        outputCollector.emit(new Values(amtMap));
    }
    @Override
    public void cleanup() {
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("res"));
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

PrintBolt:

package com.kfk.pro1Grouping;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/28
 * @time : 4:51 下午
 */
public class PrintBolt implements IRichBolt {
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    }
    @Override
    public void execute(Tuple input) {
        System.out.println(input.getValue(0));
    }
    @Override
    public void cleanup() {
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

运行结果:

{2020-12-27 12:43=43471960, 2020-12-23 12:43=43497930}
{2020-12-27 12:43=43472120, 2020-12-23 12:43=43498060}
{2020-12-18 12:43=43479050, 2020-12-25 12:43=43458280}
{2020-12-18 12:43=43480060, 2020-12-25 12:43=43459440}
{2020-12-18 12:43=43480120, 2020-12-25 12:43=43459470}
{2020-12-18 12:43=43480280, 2020-12-25 12:43=43459610}
...








相关文章
|
1月前
|
消息中间件 分布式计算 大数据
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
108 0
|
1月前
|
消息中间件 分布式计算 大数据
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
100 0
|
4月前
|
消息中间件 存储 Kafka
微服务分布问题之Kafka分区的副本和分布如何解决
微服务分布问题之Kafka分区的副本和分布如何解决
|
4月前
|
分布式计算 Hadoop 数据挖掘
|
4月前
|
分布式计算 Hadoop 大数据
|
分布式计算 负载均衡 算法
Flink---5、聚合算子、用户自定义函数、物理分区算子、分流、合流
Flink---5、聚合算子、用户自定义函数、物理分区算子、分流、合流
|
资源调度 分布式计算 调度
Fink--3、Flink运行时架构(并行度、算子链、任务槽、作业提交流程)
Fink--3、Flink运行时架构(并行度、算子链、任务槽、作业提交流程)
|
SQL 负载均衡 算法
自适应批作业调度器:为 Flink 批作业自动推导并行度
1.15 版本新引入的调度器,在作业运行时根据每个算子需要处理的实际数据量来自动推导并行度。
自适应批作业调度器:为 Flink 批作业自动推导并行度
|
消息中间件 Kafka RocketMQ
Kafka重平衡机制
当集群中有新成员加入,或者某些主题增加了分区之后,消费者是怎么进行重新分配分区再进行消费的?这里就涉及到重平衡(Rebalance)的概念,下面我就给大家讲解一下什么是 Kafka 重平衡机制,我尽量做到图文并茂通俗易懂。
1296 0
Kafka重平衡机制
|
流计算
Flink DataStream支持的物理分组方式
Flink DataStream支持的物理分组方式
158 0
下一篇
无影云桌面