今晚总结一下关于开发中常用的多线程设计模式。
Future设计模式
future相信大家对这个东西都不陌生,那么我们就开门见山的来讲吧。
首先来思考几个问题
future是用来做什么的?
当我们做一些计算机串行化计算的时候,在等待响应的过程中,接收方的线程是处于堵塞状态的,比较浪费资源。
这个时候如果采用future来帮我们异步接收参数,避免同步等待的问题,那么这种方式我们称之为future。
future比较常用功能的方式是futuretask,案例如下:
public static void main(String[] args) throws InterruptedException, ExecutionException { FutureTask<String> futureTask=new FutureTask<String>(new RealData("*")); ExecutorService executorService= Executors.newFixedThreadPool(1); executorService.submit(futureTask); System.out.println("请求完毕了"); Thread.sleep(200); System.out.println("真实数据"+futureTask.get()); } 复制代码
这段代码在工作中如果有需要可以拿去使用。
如何自己来实现一个future的模型?
future的实现实际上离不开两个东西,一个是futuredata,就是用于get真实数据的一个引用句柄,还有一个就是realdata,真实响应数据。
由于获取真实数据的过程可能会比较耗时,所以通常的做法是在异步线程中去获取响应的数据。
案例代码如下:
首先定义一个用于给future执行的task
/** * @author idea * @data 2020/2/15 */ public interface CallableTask<T> { T run(); } 复制代码
然后是futureData
/** * @author idea * @data 2020/2/15 */ public class FutureData { private RealData realData; private boolean isReady=false; public synchronized void setCallableTask(CallableTask callableTask){ realData=new RealData(callableTask); isReady=false; notifyAll(); } public synchronized Object get() throws InterruptedException { while (!isReady){ wait(); } return realData.getResult(); } } 复制代码
接下来是真实接收参数的RealData
/** * @author idea * @data 2020/2/15 */ public class RealData { private CallableTask callableTask; private Object result; public RealData(CallableTask callableTask) { this.callableTask = callableTask; this.result = callableTask.run(); } public Object getResult() { return result; } } 复制代码
接下来便是client客户端
/** * @author idea * @data 2020/2/15 */ public class Client { private FutureData futureData; public void request(CallableTask callableTask){ futureData=new FutureData(); futureData.setCallableTask(callableTask); } public Object get() throws InterruptedException { if(futureData==null){ throw new RuntimeException("futureData can not be null!"); } return futureData.get(); } static class job implements CallableTask<String>{ @Override public String run() { System.out.println("this is run"); return "ten"; } } public static void main(String[] args) throws InterruptedException { Client client=new Client(); client.request(new job()); for(int i=0;i<10;i++){ Thread.sleep(10); System.out.println("======="); } Object result = client.get(); System.out.println(result); } } 复制代码
最后执行client里面的main方法便可以验证了。
根据这样的案例,相信你也可以看懂future是怎么运作的了。但是在实际的jdk实现中,future的设计要比这个复杂很多,所以这里就不做具体的讲解了。
Master-Worker模型
如果有接触过大数据的同学,应该就会对这种模式比较熟悉,常见的hadoop就是基于这种模式设计的。
Master-Worker模式是常用的并行模式之一,它的核心思想是:系统由两类进程协同工作,即Master进程和Worker进程,Master负责接收和分配任务,Wroker负责处理子任务。当各个Worker进程将子任务处理完成后,将结果返回给Master进程,由Master进程进行汇总,从而得到最终的结果,其具体处理过程如下图所示。
Master进程为主要进程,它维护一个Worker进程队列、子任务队列和子结果集。Worker进程队列中的Worker进程不停从任务队列中提取要处理的子任务,并将结果写入结果集。
这里我写了一个简单版的master-worker模型供大家学习:
首先是Master
import 重温_Java并发基础.并行程序的设计模式.code.master_worker.Worker; import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; /** * @author idea * @data 2020/2/15 */ public class Master { protected Queue<Object> workerQueue = new ConcurrentLinkedDeque<>(); protected Map<String, Thread> threadMap = new HashMap<>(); protected Map<String, Object> resultMap = new ConcurrentHashMap<>(); public Master(Worker worker, int countWorker){ worker.setWorkerQueue(workerQueue); worker.setResultMap(resultMap); for(int i=0;i<countWorker;i++){ threadMap.put(Integer.toString(i),new Thread(worker,"worker"+i)); } } public boolean isComplete(){ for(String key:threadMap.keySet()){ if(threadMap.get(key).getState()!=Thread.State.TERMINATED){ return false; } } return true; } public void submit(Object job){ workerQueue.add(job); } /** * 执行任务 */ public void execute(){ for (Map.Entry<String,Thread> stringThreadEntry : threadMap.entrySet()) { stringThreadEntry.getValue().start(); } } } 复制代码
master主要是负责工作的分发和结果的合并
接下来是worker部分的设计:
import java.util.Map; import java.util.Queue; import java.util.Set; /** * @author idea * @data 2020/2/15 */ public class Worker implements Runnable { protected Queue<Object> workerQueue; protected Map<String, Object> resultMap; public Worker setWorkerQueue(Queue<Object> workerQueue) { this.workerQueue = workerQueue; return this; } public Worker setResultMap(Map<String, Object> resultMap) { this.resultMap = resultMap; return this; } public Object handle(Object input) throws InterruptedException { return input; } @Override public void run() { while (true) { Object input = workerQueue.poll(); if (input == null) { break; } Object result = null; try { result = handle(input); } catch (InterruptedException e) { e.printStackTrace(); } resultMap.put(String.valueOf(result.hashCode()), result); } } private void handle_1(){ Master master = new Master(new PlusWorker(), 10); for (int i = 1; i <= 100; i++) { master.submit(i); } int result = 0; master.execute(); Map<String, Object> resultMap = master.resultMap; while (resultMap.size() != 0 || !master.isComplete()) { Set<String> keySet = resultMap.keySet(); String key = null; for (String s : keySet) { key = s; break; } Integer i = null; if (key != null) { i = (Integer) resultMap.get(key); } if (i != null) { result += i; } if (key != null) { resultMap.remove(key); } } System.out.println(result); } //计算1+2+3+4+***+10000 public static void main(String[] args) { Worker w=new Worker(); w.handle_1(); } } 复制代码
给worker设计一个PlusWorker
/** * @author idea * @data 2020/2/15 */ public class PlusWorker extends Worker{ @Override public Object handle(Object input) throws InterruptedException { Integer val= (Integer) input; Thread.sleep(1000); return val; } } 复制代码
此时一个基本的模型也就完成了。