多线程设计模式 : Master-Worker模式

简介: Master-Worker是常用的并行计算模式。

概念剖析

Master-Worker是常用的并行计算模式。它的核心思想是系统由两类进程协作工作:Master进程和Worker进程。


Master负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完成后,会将结果返回给Master,由Master作归纳总结。


其好处就是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。


处理过程如下图所示:


1.png


Master进程为主要进程,它维护一个Worker进程队列、子任务队列和子结果集。Worker进程队列中的Worker进程不停从任务队列中提取要处理的子任务,并将结果写入结果集。


根据上面的思想,我们来模拟一下这种经典设计模式的实现。


过程分析


  1. 既然Worker是具体的执行任务,那么Worker一定要实现Runnable接口


  1. Matser作为接受和分配任务,得先有个容器来装载用户发出的请求,在不考虑阻塞的情况下我们选择ConcurrentLinkedQueue作为装载容器


  1. Worker对象需要能从Master接收任务,它也得有Master ConcurrentLinkedQueue容器的引用


  1. Master还得有个容器需要能够装载所有的Worker,可以使用HashMap<String,Thread>


  1. Worker处理完后需要将数据返回给Master,那么Master需要有个容器能够装载所有worker并发处理任务的结果集。此容器需要能够支持高并发,所以最好采用ConcurrentHashMap<String,Object>


  1. 同理由于Worker处理完成后将数据填充进Master的ConcurrentHashMap,那么它也得有一份ConcurrentHashMap的引用


代码实现

Task任务对象


publicclassTask {
privateintid;
privateStringname;
privateintprice;
publicintgetId() {
returnid;
    }
publicvoidsetId(intid) {
this.id=id;
    }
publicStringgetName() {
returnname;
    }
publicvoidsetName(Stringname) {
this.name=name;
    }
publicintgetPrice() {
returnprice;
    }
publicvoidsetPrice(intprice) {
this.price=price;
    }
}


Master对象:


publicclassMaster {
//任务集合privateConcurrentLinkedQueue<Task>taskQueue=newConcurrentLinkedQueue<>();
//所有的处理结果privateConcurrentHashMap<String,Object>resultMap=newConcurrentHashMap<>();
//所有的Worker集合privateHashMap<String,Thread>workerMap=Maps.newHashMap();
//构造方法,初始化WorkerpublicMaster(Workerworker,intworkerCount){
//每一个worker对象都需要有Master的引用,taskQueue用于任务的提取,resultMap用于任务的提交worker.setTaskQueue(this.taskQueue);
worker.setResultMap(this.resultMap);
for(inti=0 ;i<workerCount; i  ){
//key表示worker的名字,value表示线程执行对象workerMap.put("worker"i,newThread(worker));
        }
    }
//用于提交任务publicvoidsubmit(Tasktask){
this.taskQueue.add(task);
    }
//执行方法,启动应用程序让所有的Worker工作publicvoidexecute(){
for(Map.Entry<String,Thread>me : workerMap.entrySet()){
me.getValue().start();
        }
    }
//判断所有的线程是否都完成任务publicbooleanisComplete() {
for(Map.Entry<String,Thread>me : workerMap.entrySet()){
if(me.getValue().getState() !=Thread.State.TERMINATED){
returnfalse;
           }
        }
returntrue;
    }
//总结归纳 publicintgetResult(){
intret=0;
for (Map.Entry<String, Object>entry : resultMap.entrySet()) {
ret=(Integer) entry.getValue();
        }
returnret;
    }
}


Worker对象:


publicclassWorkerimplementsRunnable{
privateConcurrentLinkedQueue<Task>taskQueue;
privateConcurrentHashMap<String, Object>resultMap;
publicvoidsetTaskQueue(ConcurrentLinkedQueue<Task>taskQueue) {
this.taskQueue=taskQueue;
    }
publicvoidsetResultMap(ConcurrentHashMap<String, Object>resultMap) {
this.resultMap=resultMap;
    }
@Overridepublicvoidrun() {
while(true){
TaskexecuteTask=this.taskQueue.poll();
if(executeTask==null) break;
//真正的任务处理Objectresult=handle(executeTask);
this.resultMap.put(executeTask.getName(),result);
        }
    }
//核心处理逻辑,可以抽离出来由具体子类实现privateObjecthandle(TaskexecuteTask) {
Objectresult=null;
try {
//表示处理任务的耗时....Thread.sleep(500);
result=executeTask.getPrice();
        } catch (InterruptedExceptione) {
e.printStackTrace();
        }
returnresult;
    }
}


客户端调用


publicclassMain {
publicstaticvoidmain(String[] args) {
//实际开发中多少个线程最好写成Runtime.getRuntime().availableProcessors()Mastermaster=newMaster(newWorker(), 10);
Randomrandom=newRandom();
for(inti=0 ;i<=100 ;i  ){
Tasktask=newTask();
task.setId(i);
task.setName("任务"i);
task.setPrice(random.nextInt(1000));
master.submit(task);
        }
master.execute();
longstart=System.currentTimeMillis();
while(true){
if(master.isComplete()){
longend=System.currentTimeMillis() -start;
intret=master.getResult();
System.out.println("计算结果:"ret",执行耗时:"end);
break;
            }
        }
    }
}


在Worker对象中的核心处理业务逻辑handle()方法最好抽象成公共方法,具体实现由子类覆写。

目录
相关文章
|
5月前
|
存储 容灾 Java
分布式系统详解--基础知识(线程)
分布式系统详解--基础知识(线程)
52 0
|
6月前
|
设计模式 BI
访问器模式--设计模式
访问器模式--设计模式
38 0
|
6月前
|
设计模式 安全 Java
多线程设计模式【线程安全、 Future 设计模式、Master-Worker 设计模式 】(一)-全面详解(学习总结---从入门到深化)
多线程设计模式【线程安全、 Future 设计模式、Master-Worker 设计模式 】(一)-全面详解(学习总结---从入门到深化)
79 0
|
设计模式 Java
状态切换:解析Java设计模式中的状态模式
在软件开发领域,设计模式是一组经过验证的最佳实践方法,用于解决各种常见问题。状态模式是一种行为型设计模式,其目标是将对象的不同状态抽象成不同的状态类,并允许对象在不同状态间进行切换。在本文中,我们将深入了解状态模式的核心思想、应用场景以及它在Java中的实际运用。
231 0
|
设计模式 并行计算 安全
并发编程模式(future,Master-Worker,生产者消费者模式)
在网上购物时,提交订单后,在收货的这段时间里无需一直在家里等候,可以先干别的事情。类推到程序设计中时,当提交请求时,期望得到答复时,如果这个答复可能很慢。传统的是一直等待到这个答复收到时再去做别的事情,但如果利用Future设计模式就无需等待答复的到来,在等待答复的过程中可以干其他事情。
|
设计模式 并行计算 容器
多线程设计模式 : Master-Worker模式
多线程设计模式 : Master-Worker模式
268 0
|
设计模式 前端开发 Java
多线程设计模式 - Future模式
多线程设计模式 - Future模式
61 0
|
Java
【多线程:Monitor 概念】
【多线程:Monitor 概念】
146 0
|
存储 算法 Unix
bthread源码剖析(四): 通过ParkingLot实现Worker间任务状态同步
通过之前的文章我们知道TaskGroup(以下简称TG)是在死循环等待任务,然后切换栈去执行任务。在当前TG没有任务的时候会进行“工作窃取”窃取其他TG的任务。在没有任务的时候TG会“休眠”,当任务出现的时候被唤醒然后消费。
318 0
|
设计模式 Java 程序员
【并发编程神器】,Worker Thread模式
【并发编程神器】,Worker Thread模式
149 0
【并发编程神器】,Worker Thread模式