一、前言
流水线,就工厂而言,组装一个汽车;冲骨架到最后的座椅等,每一步都是有顺序。看来这个是串行的程序,但是在实际工作中,我们看到的工厂其实都不是串行的,都是一个部门甚至一个公司专业生产组装一样零件。这个情况下,每个公司其实已经并行的运行工作了。在程序开发中,也 存在这样的实际情况,例如一些基本的计算也是如此;
例子:(B+C)*B/2 这样的一个简单运算,我们仍然可以将其并行处理。(现实中这种计算不是需要并行,但是你想象航天数据的计算呢??)
二、流水线并行模式pipeline
2.1 首先我们需要一个中间仓库存放我们的计算中间值等等
package pattern.pipeline; /** * Created by ycy on 16/1/16. * 定义一个计算中间体 * 计算(B+C)*B/2=? * 适用场景:将我们串行的程序分为一步步流水 */ public class Msg { public double i; public double j; public String orgStr=null; }
2.2首先计算括号里面加法
package pattern.pipeline; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; /** * Created by ycy on 16/1/16. */ public class Plus implements Runnable { public static BlockingQueue<Msg> bq = new LinkedBlockingDeque<Msg>(); public void run() { while (true){ try { Msg msg=bq.take(); msg.j=msg.i+msg.j; Multiply.bq.add(msg); }catch (InterruptedException e){ e.printStackTrace(); } } } }
2.3 再次计算乘法
package pattern.pipeline; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; /** * Created by ycy on 16/1/16. */ public class Multiply implements Runnable{ public static BlockingQueue<Msg> bq=new LinkedBlockingDeque<Msg>(); public void run() { while (true){ try { Msg msg=bq.take(); msg.i=msg.i*msg.j; Div.bq.add(msg); }catch (InterruptedException e){ e.printStackTrace(); } } } }
2.4最后计算除法
package pattern.pipeline; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; /** * Created by ycy on 16/1/16. */ public class Div implements Runnable{ public static BlockingQueue<Msg> bq=new LinkedBlockingDeque<Msg>(); public void run() { while (true) { try { Msg msg = bq.take(); msg.i = msg.i / 2; System.out.println("计算结果" + msg.orgStr + "=" + msg.i); } catch (InterruptedException e) { e.printStackTrace(); } } } }
2.5 最后我们开启三个并发计算这个公式的结果
package pattern.pipeline; /** * Created by ycy on 16/1/16. */ public class PStreamMain { public static void main(String[] args) { new Thread(new Plus()).start(); new Thread(new Multiply()).start(); new Thread(new Div()).start(); for (int i = 0; i <10 ; i++) { for (int j = 0; j <10 ; j++) { Msg msg=new Msg(); msg.i=i; msg.j=j; msg.orgStr="(("+i+"+"+j+")*"+i+")/2"; Plus.bq.add(msg); } } } }