一、前言
并行模式与算法是并发中常见的问题。例如一些常用的模式化处理,我们也会接触。但是在现在多核下,合理安排好并发保证数据安全一些基本的常识我们还是需要了解的。
二、并行模式与算法
2.1单例模式
借助内部类实现单例,只是在类加载的时候被创建一次,达到单例模式。
适应场景:工具类,实体初始化等
/** * Created by ycy on 16/1/15. * 单例模式 */ public class SingletonDemo { private SingletonDemo() { System.out.println("singleton is create"); } private static class SinggletonHodel { private static SingletonDemo instance = new SingletonDemo(); } public static synchronized SingletonDemo getInstance() { return SinggletonHodel.instance; } public static void main(String[] args) { System.out.println(SingletonDemo.getInstance()); } }
2.2 不变模式
将一些恒定的值在第一次加载类的时候固化,保证多线程环境下的安全;
适应 场景:我们java的基本类型都是这么玩耍的
public final class Product{ private final String no; private final String name; private final double price; public Product(String no,String name,double price){ super(); this.no=no; this.name=name; this.price=price; } public String getNo() { return no; } public String getName() { return name; } public double getPrice() { return price; } }
2.3 生产者消费者模式
具体研究google的disrupt框架,专业处理并发。这里给出阻塞的queue实现。(阻塞的性能没有disrupt快)
2.3.1 生产者与消费用用的数据模型
package pattern; import javax.print.attribute.standard.RequestingUserName; /** * Created by ycy on 16/1/15. */ public final class PCData { private final int intDate;//数据 public PCData(int d){ intDate=d; } public PCData(String d){ intDate=Integer.valueOf(d); } public int getIntDate() { return intDate; } @Override public String toString(){ return "date:"+intDate; } }
2.3.2 生产者 通过block queue的offer插入数据进入队列
package pattern; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * Created by ycy on 16/1/15. */ public class Producer implements Runnable{ <h3> </h3> private volatile boolean isRunning=true; private BlockingQueue<PCData> queue;//内存缓冲区 private static AtomicInteger count=new AtomicInteger();//总数.原子操作 private static final int SLEEPTIME=1000; public Producer(BlockingQueue<PCData> queue){ this.queue=queue; } public void run() { PCData data=null; Random r=new Random(); System.out.println("start produer id=" +Thread.currentThread().getId()); try{ while (true){ Thread.sleep(r.nextInt(SLEEPTIME)); data=new PCData(count.incrementAndGet());//构造任务数据 System.out.println(data+" is put into queue"); if (!queue.offer(data,2, TimeUnit.SECONDS)){ System.out.println("failed to put data:"+data);//提交数据到缓冲区 } } }catch (InterruptedException e){ e.printStackTrace(); Thread.currentThread().interrupt(); } } public void stop(){ isRunning=false; } }
2.3.3 通过block的take提取数据模型,处理数据
package pattern; import java.text.MessageFormat; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.locks.Condition; /** * Created by ycy on 16/1/15. */ public class Consumer implements Runnable{ private BlockingQueue<PCData> queue;//缓冲 去 private static final int SLEEPTIME=1000; public Consumer(BlockingQueue<PCData> queue){ this.queue=queue; } public void run() { System.out.println("start Consume id ="+Thread.currentThread().getId()); Random r=new Random();//随机等待时间 try{ while (true){ PCData data=queue.take();//提前任务 if (null!=data){ int re=data.getIntDate()*data.getIntDate();//计算平方 System.out.println(MessageFormat.format("{0}*{1}={2}",data.getIntDate(),data.getIntDate(),re)); } } }catch (InterruptedException e){ e.printStackTrace();; Thread.currentThread().interrupt(); } } public static void main(String[] args) throws InterruptedException { //建立缓冲区 BlockingQueue<PCData> queue = new LinkedBlockingDeque<PCData>(10); Producer p1 = new Producer(queue); Producer p2 = new Producer(queue); Producer p3 = new Producer(queue); Consumer consumer1 = new Consumer(queue); Consumer consumer2 = new Consumer(queue); Consumer consumer3 = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool();//建立线程池 service.execute(p1); service.execute(p2); service.execute(p3); service.execute(consumer1); service.execute(consumer2); service.execute(consumer3); Thread.sleep(10*1000); p1.stop(); p2.stop(); p3.stop(); Thread.sleep(3000); service.shutdown(); } }