线程通信,线程之间的消息传递;
多个线程在操作同一个资源,但对共享资源的操作动作不同;它们共享同一个资源,互为条件,相互依赖,相互通信让任务向前推进。
线程的同步,可以解决并发更新同一个资源,实现线程同步;但不能用来实现线程间的消息传递。
线程通信生产者和消费者和仓库是个典型模型:
生产者:没有生产之前通知消费者等待,生产产品结束之后,马上通知消费者消费
消费者:没有消费之前通知生产者等待,消费产品结束之后,通知生产者继续生产产品以供消费
线程通信:使用java中Object中提供的:
public final void wait(); 注:long timeout=0 表示线程一直等待,直到其它线程通知
public final native void wait(long timeout); 线程等待指定毫秒参数的时间
public final void wait(long timeout, int nanos); 线程等待指定毫秒、微妙的时间timeout最大等待时间,以毫秒为单位,nanos额外的时间,在纳秒范围0-999999。
public final native void notify(); 唤醒一个处于等待状态的线程
public final native void notifyAll(); 唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先运行
这些方法只能在同步方法或者同步代码块中使用,否则会抛出异常。
Exception in thread "Thread-0"java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:485)
at ca.bb.ShareCached.getShareCachedData(ShareCached.java:41) //wait();
以及使用,回调方法,实现线程通信。
此处使用wait()和notify()方法实现。
此例:生产A-D个商品,放入仓库,消费生产。
仓库类:
package ca.bb.threadcommunication; /** * 共享资源缓存和操作类 * */ public class ShareCached { /**产品:此处使用char字符,作为存储共享数据存储类型*/ private char cache; //产品消费标识:线程间通信信号,true未消费(生产),false未生产(消费) private boolean flag = false; /** * 生产操作(生产者):共享数据添加方法 * */ public synchronized void addShareCachedData(char data){ //产品未消费,则生产者(生产操作)等待 if(flag){ System.out.println("产品未消费,生产者生产操作等待"); try { //生产者等待 wait(); } catch (InterruptedException e) { System.out.println("Thread Interrupted Exception,"+e.getMessage()); } } //产品已消费,则生产者继续生产 this.cache = data; //标记已生产 flag = true; //通知消费者已生产 notify(); System.out.println("产品"+data+",已生产,通知消费者消费"); } /** * 消费操作(消费者):共享数据获取方法 * */ public synchronized char getShareCachedData(){ //产品未生产,则消费者(消费操作)等待 if(!flag){ System.out.println("产品未生产,消费者消费操作等待"); try { //消费者等待 wait(); } catch (InterruptedException e) { System.out.println("Thread Interrupted Exception,"+e.getMessage()); } } //标记已消费 flag = false; //通知生产者已消费 notify(); System.out.println("产品"+this.cache+",已消费,通知生产者生产"); //产品已生产,则消费者继续消费 return this.cache; } }
注:如果此处,使用synchronized方法,则内置锁(对象监视锁)为this,则不能多new不同对象,如果为多线程同步,必定多个线程共享同一个对象监视锁。
使用synchronized(obj)代码块,则1.锁使用涉及当前类对象外的其他类对象;2.锁使用静态当前类对象,确保同一个锁。
生产者线程类:
package ca.bb.threadcommunication; /** * 生产者线程类 * */ public class Producer extends Thread{ //共享资源缓存类对象 private ShareCached sCached; /** * 构造加入共享资源操作类 * */ public Producer(ShareCached shareCached){ this.sCached = shareCached; } /** * 生产者生产产品,放入共享数据缓存类(仓库) * 生产A-D类型的产品 * */ @Override public void run() { for(char product = 'A';product <='D';product++){ try { sleep((int)(Math.random()*3000)); } catch (InterruptedException e) { System.out.println("Thread Interrupted Exception,"+e.getMessage()); } //生产产品,放入共享数据缓存类(仓库) sCached.addShareCachedData(product); } } }消费者线程类:
package ca.bb.threadcommunication; /** * 消费者线程类 * */ public class Consumer extends Thread{ //共享资源缓存类对象 private ShareCached sCached; /** * 构造加入共享资源操作类 * */ public Consumer(ShareCached sharedCached){ this.sCached = sharedCached; } /** * 消费者消费产品,获取共享数据缓存类(仓库) * 消费D类型的产品停止消费 * */ @Override public void run() { char product = '\u0000'; do { try { Thread.sleep((int)(Math.random()*3000)); } catch (InterruptedException e) { System.out.println("Consumer thread InterruptedException from run method!"); } //消费,从仓库取走商品 product = sCached.getShareCachedData(); } while (product != 'D'); } }线程通信测试类:
package ca.bb.threadcommunication; public class CPTest { public static void main(String[] args) { //共享同一个共享资源 ShareCached shareCached = new ShareCached(); //启动消费者线程 new Consumer(shareCached).start(); //启动生产者线程 new Producer(shareCached).start(); } }测试结果:
产品未生产,消费者消费操作等待
产品A,已生产,通知消费者消费
产品A,已消费,通知生产者生产
产品B,已生产,通知消费者消费
产品未消费,生产者生产操作等待
产品B,已消费,通知生产者生产
产品C,已生产,通知消费者消费
产品未消费,生产者生产操作等待
产品C,已消费,通知生产者生产
产品D,已生产,通知消费者消费
产品D,已消费,通知生产者生产