多线程设计模式 : Master-Worker模式

简介: 多线程设计模式 : Master-Worker模式

概念剖析


Master-Worker是常用的并行计算模式。它的核心思想是系统由两类进程协作工作:Master进程和Worker进程。Master负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完成后,会将结果返回给Master,由Master作归纳总结。其好处就是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。处理过程如下图所示:

Master进程为主要进程,它维护一个Worker进程队列、子任务队列和子结果集。Worker进程队列中的Worker进程不停从任务队列中提取要处理的子任务,并将结果写入结果集。

根据上面的思想,我们来模拟一下这种经典设计模式的实现。


过程分析


  1. 既然Worker是具体的执行任务,那么Worker一定要实现Runnable接口
  2. Matser作为接受和分配任务,得先有个容器来装载用户发出的请求,在不考虑阻塞的情况下我们选择ConcurrentLinkedQueue作为装载容器
  3. Worker对象需要能从Master接收任务,它也得有Master ConcurrentLinkedQueue容器的引用
  4. Master还得有个容器需要能够装载所有的Worker,可以使用HashMap<String,Thread>
  5. Worker处理完后需要将数据返回给Master,那么Master需要有个容器能够装载所有worker并发处理任务的结果集。此容器需要能够支持高并发,所以最好采用ConcurrentHashMap<String,Object>
  6. 同理由于Worker处理完成后将数据填充进MasterConcurrentHashMap,那么它也得有一份ConcurrentHashMap的引用

代码实现


Task任务对象

public class Task {
    private int id;
    private String name;
    private int price;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getPrice() {
        return price;
    }
    public void setPrice(int price) {
        this.price = price;
    }
}


Master对象:

public class Master {
    //任务集合
    private ConcurrentLinkedQueue<Task> taskQueue = new ConcurrentLinkedQueue<>();
    //所有的处理结果
    private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<>();
    //所有的Worker集合
    private HashMap<String,Thread> workerMap = Maps.newHashMap();
    //构造方法,初始化Worker
    public Master(Worker worker,int workerCount){
        //每一个worker对象都需要有Master的引用,taskQueue用于任务的提取,resultMap用于任务的提交
        worker.setTaskQueue(this.taskQueue);
        worker.setResultMap(this.resultMap);
        for(int i = 0 ;i < workerCount; i++){
            //key表示worker的名字,value表示线程执行对象
            workerMap.put("worker"+i,new Thread(worker));
        }
    }
    //用于提交任务
    public void submit(Task task){
        this.taskQueue.add(task);
    }
    //执行方法,启动应用程序让所有的Worker工作
    public void execute(){
        for(Map.Entry<String,Thread> me : workerMap.entrySet()){
            me.getValue().start();
        }
    }
    //判断所有的线程是否都完成任务
    public boolean isComplete() {
        for(Map.Entry<String,Thread> me : workerMap.entrySet()){
           if(me.getValue().getState() != Thread.State.TERMINATED){
               return false;
           }
        }
        return true;
    }
    //总结归纳
    public int getResult(){
        int ret = 0;
        for (Map.Entry<String, Object> entry : resultMap.entrySet()) {
            ret+=(Integer) entry.getValue();
        }
        return ret;
    }
}


Worker对象:

public class Worker implements Runnable{
    private ConcurrentLinkedQueue<Task> taskQueue;
    private ConcurrentHashMap<String, Object> resultMap;
    public void setTaskQueue(ConcurrentLinkedQueue<Task> taskQueue) {
        this.taskQueue = taskQueue;
    }
    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }
    @Override
    public void run() {
        while(true){
            Task executeTask = this.taskQueue.poll();
            if(executeTask == null) break;
            //真正的任务处理
            Object result = handle(executeTask);
            this.resultMap.put(executeTask.getName(),result);
        }
    }
    //核心处理逻辑,可以抽离出来由具体子类实现
    private Object handle(Task executeTask) {
        Object result = null;
        try {
            //表示处理任务的耗时....
            Thread.sleep(500);
            result = executeTask.getPrice();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}


客户端调用

public class Main {
    public static void main(String[] args) {
        //实际开发中多少个线程最好写成Runtime.getRuntime().availableProcessors()
        Master master = new Master(new Worker(), 10);
        Random random = new Random();
        for(int i = 0 ;i <= 100 ;i++){
            Task task = new Task();
            task.setId(i);
            task.setName("任务"+i);
            task.setPrice(random.nextInt(1000));
            master.submit(task);
        }
        master.execute();
        long start = System.currentTimeMillis();
        while(true){
            if(master.isComplete()){
                long end  = System.currentTimeMillis() - start;
                int ret = master.getResult();
                System.out.println("计算结果:"+ret+",执行耗时:"+end);
                break;
            }
        }
    }
}


在Worker对象中的核心处理业务逻辑handle()方法最好抽象成公共方法,具体实现由子类覆写。

了,各位朋友们,本期的内容到此就全部结束啦,能看到这里的都是最优秀的小伙子!如果觉得这篇文章对你有帮助的话,请动动你的小手指点个 “在看” 或者“转发,这个对我很重要。当然如果你还没看过瘾的话可以看看下面的热文推荐,咱们下期见!

目录
相关文章
|
6月前
|
负载均衡 算法 安全
基于Reactor模式的高性能网络库之线程池组件设计篇
EventLoopThreadPool 是 Reactor 模式中实现“一个主线程 + 多个工作线程”的关键组件,用于高效管理多个 EventLoop 并在多核 CPU 上分担高并发 I/O 压力。通过封装 Thread 类和 EventLoopThread,实现线程创建、管理和事件循环的调度,形成线程池结构。每个 EventLoopThread 管理一个子线程与对应的 EventLoop(subloop),主线程(base loop)通过负载均衡算法将任务派发至各 subloop,从而提升系统性能与并发处理能力。
329 3
|
8月前
|
设计模式 Java 数据库连接
【设计模式】【创建型模式】工厂方法模式(Factory Methods)
一、入门 什么是工厂方法模式? 工厂方法模式(Factory Method Pattern)是一种创建型设计模式,它定义了一个用于创建对象的接口,但由子类决定实例化哪个类。工厂方法模式使类的实例化延迟
236 16
|
3月前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
213 2
|
3月前
|
设计模式 消息中间件 安全
【JUC】(3)常见的设计模式概念分析与多把锁使用场景!!理解线程状态转换条件!带你深入JUC!!文章全程笔记干货!!
JUC专栏第三篇,带你继续深入JUC! 本篇文章涵盖内容:保护性暂停、生产者与消费者、Park&unPark、线程转换条件、多把锁情况分析、可重入锁、顺序控制 笔记共享!!文章全程干货!
342 1
|
8月前
|
设计模式 负载均衡 监控
并发设计模式实战系列(2):领导者/追随者模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第二章领导者/追随者(Leader/Followers)模式,废话不多说直接开始~
238 0
|
8月前
|
设计模式 监控 Java
并发设计模式实战系列(1):半同步/半异步模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第一章半同步/半异步(Half-Sync/Half-Async)模式,废话不多说直接开始~
222 0
|
8月前
|
设计模式 运维 监控
并发设计模式实战系列(4):线程池
需要建立持续的性能剖析(Profiling)和调优机制。通过以上十二个维度的系统化扩展,构建了一个从。设置合理队列容量/拒绝策略。动态扩容/优化任务处理速度。检查线程栈定位热点代码。调整最大用户进程数限制。CPU占用率100%
518 0
|
8月前
|
设计模式 安全 Java
并发设计模式实战系列(12):不变模式(Immutable Object)
🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第十二章,废话不多说直接开始~
192 0
|
8月前
|
设计模式 算法 Java
设计模式觉醒系列(04)策略模式|简单工厂模式的升级版
本文介绍了简单工厂模式与策略模式的概念及其融合实践。简单工厂模式用于对象创建,通过隐藏实现细节简化代码;策略模式关注行为封装与切换,支持动态替换算法,增强灵活性。两者结合形成“策略工厂”,既简化对象创建又保持低耦合。文章通过支付案例演示了模式的应用,并强调实际开发中应根据需求选择合适的设计模式,避免生搬硬套。最后推荐了JVM调优、并发编程等技术专题,助力开发者提升技能。