一,线程间的通信、协调和协作
前面几篇谈到了单线程的各个属性,接下来在谈线程与线程之间是如何进行通信和协调的
1,通道
在进程中,可以通过管道的方式进行通信,在java线程中,也是可以通过管道的方式进行通信的。如一些文件的上传,可以直接在内存中通过管道的方式进行文件的上传,而不需要先将文件落盘到本地,再将文件上传到ftp服务器上,通过减少写入磁盘这一步骤,从而提高文件的上传效率,减少硬件和资源等的成本。
java中实现管道输入和输出的方式主要有四种,分别是PipedOutputStream、PipedInputStream、PipedReader 和 PipedWriter 。前面两种主要是针对二进制的字节流,后面两种主要是针对文本的字符流。
//构建输入流 PipedReader pipedReader = new PipedReader(); //构建输出流 PipedWriter pipedWriter = new PipedWriter(); try { //建立连接 pipedReader.connect(pipedWriter); } catch (IOException e) { e.printStackTrace(); }
并且在高并发中,这些管道流的操作都是属于线程安全的。
2,join
在日常开发中,比如说存在三个线程,分别是t1,t2,t3这三个线程,需求是想让t2在t1执行完后再执行,t3想再t2执行完后再执行。由于java中采用的是抢占式的线程调度方式,即不能手动的去操控线程的执行状况,因此在后面就出现了这个join 的方式。如下面的代码
public static void main(String[] args){ Thread t1 = new Thread(new Runnable() { @Override public void run() { try { //休眠2s Thread.sleep(2000); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } },"t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { //加入join t1.join(); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } },"t2"); t1.start(); t2.start(); }
可以通过执行结果发现线程t2需要在t1执行完成之后,才会继续执行。
3,synchronized
在jvm中,每个线程都会对应一个虚拟机栈中的栈帧,里面存在操作数栈,局部变量表等,每个线程中的内部参数是线程安全的,但是如果存在一个全局变量,会通过多个线程去操作,那么就会出现不可预料的后果,就是所谓的线程安全的问题。
因此在java中引入了一种阻塞状态,就是通过这个synchronized关键字,来解决多个线程操作一个全局变量可能出现的不可预料的问题。这种关键字可以加载方法上,也可以加在对象的同步块上面。这种方式相当于把并发的访问变成了串行的访问。
//对象锁加在方法上 public synchronized void countAdd(){count++;} //对象锁加在同步块上 synchronized(this){count++} //类锁加在静态方法上 public synchronized static void countAdd(){count++;} //类锁加在其他对象上 synchronized(object){count++}
因此在日常开发中,对号使用对象锁。因为类锁锁的不是同一个对象,因此可能出现锁失效的情况。
synchronized这种方式主要是确保在多线程的同一时刻情况下,有且仅有一个线程可以处于方法或者同步块中,并且同时保证线程对变量访问的可见性和排他性,使得多个线程在操作同一个变量的时候让结果正确。因此在java线程的阻塞状态,就是通过这个关键字来实现的。后续的文章会详细的讲述这个关键字的底层原理
4,volatile
上面谈到了这个synchronized关键字,但是在java中该关键字属于重量级操作,有可能要对操作系统进行调度,从用户态到内核态之间来回的切换等,因此就出现了一个轻量级操作的关键字 volatile。
该关键字可以保证不同线程对某个变量操作的即时可见性,即某一个线程一旦将某个变量的值给改了,那么其他线程是立马可以感应到的。
如下面这段代码,同时开启一个main主线程和一个子线程,在主线程中将变量的值改了,如果变量不加这个volatile关键字,那么子线程将会一直卡住进入死循环,如果变量加了这个volatile关键字,子线程就可以立马的感受到其他线程修改了这个变量的值,从而获取到修改后的值,跳出循环
/** * @author zhenghuisheng * @date : 2023/7/28 */ public class VolatileTest { private static volatile boolean flag = false; private static volatile int count = 0; public static class InnerTest extends Thread{ @Override public void run() { while (!flag){ } System.out.println("当前线程不阻塞了,count值为:" + count); } } public static void main(String[] args) throws InterruptedException { //开启线程 new InnerTest().start(); //休眠一段时间,让子线程空转一会 Thread.sleep(2000); //主线程将数据替换 flag = true; count = 100; //替换完成之后,子线程立马感知看到这个修改的数据 Thread.sleep(2000); System.out.println("main主线程执行完毕"); } }
volatile不是一把真正的锁,所以不能保证数据的原子性,即如果在高并发情况下在变量上使用该关键字,会出现安全问题。该关键字主要适用的场景是:一个线程读、多个线程写
5,等待通知机制
在上篇线程的生命周期中讲过,线程有等待和超时等待这两种状态,线程的等待停止机制就是通过这两种状态来实现的,如典型的生产者消费者模式。
等待和唤醒有着其标准的规范,无论是任何,都需要遵循以下的范式:不管是等待方还是通知方,都需要在锁的范围内使用,如果不在锁的范围内使用,则会抛出异常
等待方的范式如下
加锁(对象){ while(条件不满足){ 对象.wait方法 } 进入后面的业务逻辑 }
通知方的范式如下
加锁(对象){ 业务逻辑,改变消费者不满足的条件 对象.notify方法(通知方法) }
上面的等待方和通知方都需要拿同一把锁,但是当等待方调用wait的时候,等待方所持有的锁将会释放,那么通知方自然而然的就可以拿到这把锁,去做业务逻辑,从而改变这个条件。
接下来针对于这个范式写一个案例,就是简单的生产者与消费者之间的案例,生产者每2s生产100个产品,满了500个产品的时候,通知消费者消费,代码逻辑实现就是如果此时产品的数量小于500,那么消费者线程则处于等待状态,当生产者满了500个,就通过notify或者notifyAll唤醒处于等待的线程。
首先定义一个Product的产品类,其代码如下
/** * @Author: zhenghuisheng * @Date: 2023/8/1 23:19 */ public class Product { //唤醒的数量 public static final int finalCount = 500; //成员变量 public int count; public Product(int count){ this.count = count; } //给生产者改变数量 public void addCount(){ count = count + 100; } //消费者线程 public synchronized void waitCount() throws InterruptedException { //不满足条件 while (this.count < finalCount){ System.out.println("线程即将等待"); //等待 wait(); //业务逻辑 System.out.println("消费者消费了" + this.count + "公里数"); } } }
随后定义一个测试类TestP,里面定义一个线程消费类consum,然后使用主线程作为生产者的线程,生产者将产品生产好了,就去通知消费线程去消费
/** * @Author: zhenghuisheng * @Date: 2023/8/1 23:56 */ public class TestP { private static Product product = new Product(0); //定义一个消费线程类 private static class consum extends Thread{ @Override public void run() { try { product.waitCount(); } catch (InterruptedException e) { e.printStackTrace(); } } } //主线程作为生产线程 public static void main(String[] args) throws InterruptedException { //开启线程 new consum().start(); Thread.sleep(500); //累加 synchronized (product){ for (int i = 0; i < ; i++) { product.addCount(); } product.notifyAll(); } } }
需要注意的是,在使用wait和notify时,都需要在代码的同步块或者成员的同步方法里面,并且在一般场景中,可以使用的notifyAll的就不用notify,因为notify只唤醒一个线程,不利于在多线程中的操作。notify不支持唤醒某个指定的线程,可以通过显示锁来实现。
notify或者notifyAll尽量写在同步块的最后面,因为在调用该方法之后,会直接释放锁。