概念剖析
Master-Worker是常用的并行计算模式。它的核心思想是系统由两类进程协作工作:Master进程和Worker进程。
Master负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完成后,会将结果返回给Master,由Master作归纳总结。
其好处就是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。
处理过程如下图所示:
Master进程为主要进程,它维护一个Worker进程队列、子任务队列和子结果集。Worker进程队列中的Worker进程不停从任务队列中提取要处理的子任务,并将结果写入结果集。
根据上面的思想,我们来模拟一下这种经典设计模式的实现。
过程分析
- 既然Worker是具体的执行任务,那么Worker一定要实现Runnable接口
- Matser作为接受和分配任务,得先有个容器来装载用户发出的请求,在不考虑阻塞的情况下我们选择ConcurrentLinkedQueue作为装载容器
- Worker对象需要能从Master接收任务,它也得有Master ConcurrentLinkedQueue容器的引用
- Master还得有个容器需要能够装载所有的Worker,可以使用HashMap<String,Thread>
- Worker处理完后需要将数据返回给Master,那么Master需要有个容器能够装载所有worker并发处理任务的结果集。此容器需要能够支持高并发,所以最好采用ConcurrentHashMap<String,Object>
- 同理由于Worker处理完成后将数据填充进Master的ConcurrentHashMap,那么它也得有一份ConcurrentHashMap的引用
代码实现
Task任务对象
publicclassTask { privateintid; privateStringname; privateintprice; publicintgetId() { returnid; } publicvoidsetId(intid) { this.id=id; } publicStringgetName() { returnname; } publicvoidsetName(Stringname) { this.name=name; } publicintgetPrice() { returnprice; } publicvoidsetPrice(intprice) { this.price=price; } }
Master对象:
publicclassMaster { //任务集合privateConcurrentLinkedQueue<Task>taskQueue=newConcurrentLinkedQueue<>(); //所有的处理结果privateConcurrentHashMap<String,Object>resultMap=newConcurrentHashMap<>(); //所有的Worker集合privateHashMap<String,Thread>workerMap=Maps.newHashMap(); //构造方法,初始化WorkerpublicMaster(Workerworker,intworkerCount){ //每一个worker对象都需要有Master的引用,taskQueue用于任务的提取,resultMap用于任务的提交worker.setTaskQueue(this.taskQueue); worker.setResultMap(this.resultMap); for(inti=0 ;i<workerCount; i ){ //key表示worker的名字,value表示线程执行对象workerMap.put("worker"i,newThread(worker)); } } //用于提交任务publicvoidsubmit(Tasktask){ this.taskQueue.add(task); } //执行方法,启动应用程序让所有的Worker工作publicvoidexecute(){ for(Map.Entry<String,Thread>me : workerMap.entrySet()){ me.getValue().start(); } } //判断所有的线程是否都完成任务publicbooleanisComplete() { for(Map.Entry<String,Thread>me : workerMap.entrySet()){ if(me.getValue().getState() !=Thread.State.TERMINATED){ returnfalse; } } returntrue; } //总结归纳 publicintgetResult(){ intret=0; for (Map.Entry<String, Object>entry : resultMap.entrySet()) { ret=(Integer) entry.getValue(); } returnret; } }
Worker对象:
publicclassWorkerimplementsRunnable{ privateConcurrentLinkedQueue<Task>taskQueue; privateConcurrentHashMap<String, Object>resultMap; publicvoidsetTaskQueue(ConcurrentLinkedQueue<Task>taskQueue) { this.taskQueue=taskQueue; } publicvoidsetResultMap(ConcurrentHashMap<String, Object>resultMap) { this.resultMap=resultMap; } publicvoidrun() { while(true){ TaskexecuteTask=this.taskQueue.poll(); if(executeTask==null) break; //真正的任务处理Objectresult=handle(executeTask); this.resultMap.put(executeTask.getName(),result); } } //核心处理逻辑,可以抽离出来由具体子类实现privateObjecthandle(TaskexecuteTask) { Objectresult=null; try { //表示处理任务的耗时....Thread.sleep(500); result=executeTask.getPrice(); } catch (InterruptedExceptione) { e.printStackTrace(); } returnresult; } }
客户端调用
publicclassMain { publicstaticvoidmain(String[] args) { //实际开发中多少个线程最好写成Runtime.getRuntime().availableProcessors()Mastermaster=newMaster(newWorker(), 10); Randomrandom=newRandom(); for(inti=0 ;i<=100 ;i ){ Tasktask=newTask(); task.setId(i); task.setName("任务"i); task.setPrice(random.nextInt(1000)); master.submit(task); } master.execute(); longstart=System.currentTimeMillis(); while(true){ if(master.isComplete()){ longend=System.currentTimeMillis() -start; intret=master.getResult(); System.out.println("计算结果:"ret",执行耗时:"end); break; } } } }
在Worker对象中的核心处理业务逻辑handle()方法最好抽象成公共方法,具体实现由子类覆写。