5.线程通信
应用场景:生产者和消费者问题
- 假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中产品取走消费。
- 如果仓库中没有产品,则将生产者将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者取走为止。
- 如果仓库中放有产品,则消费者可以将产品取走消费,否则停止消费,直到仓库中再次放入产品为止。
这是一个线程同步问题,生产者和消费者共享同一个资源,并且生产者和消费者之间相互依赖,互为条件。
- 对于生产者,没有生产产品之前,要通知消费者等待.而生产了产品之后,又需要马上通知消费者消费
- 对于消费者,在消费之后,要通知生产者已经结束消费,需要生产新的产品以供消费.
- 在生产者消费者问题中,仅有synchronized是不够的
synchronized 可阻止并发更新同一个共享资源,实现了同步
synchronized 不能用来实现不同线程之间的消息传递(通信)
5.1 解决线程之间通信问题的几个方法
注意:均是 Object 类的方法,都只能在同步方法或者同步代码块中使用,否则会抛出异常 llegalMonitorStateException
\
5.2 解决线程之间通信的方式1:管程法
并发写作模型“生产者/消费者模式”–>管程法
- 生产者:负责生产数据的模块(可能是方法,对象,线程,进程);
- 消费者:负责处理数据的模块(可能是方法,对象,线程,进程)
- 缓冲区:消费者不能直接使用生产者的数据,他们之间有个缓冲区
生产者将生产好的数据放入缓冲区,消费者从缓冲区拿出数据
package com.sjmp.advanced; // 生产者,消费者,产品,缓冲区 public class TestPC { public static void main(String[] args) { SynContainer container = new SynContainer(); new Productor(container).start(); new Consumer(container).start(); } } // 生产者 class Productor extends Thread{ SynContainer container; public Productor(SynContainer container){ this.container = container; } @Override public void run() { for (int i = 0; i < 100; i++) { System.out.println("生产了"+i+"只鸡"); container.push(new Chicken(i)); } } } class SynContainer{ // 需要一个容器的大小 Chicken[] chickens = new Chicken[10]; // 容器计数器 int count = 0; // 生产者放入产品 public synchronized void push(Chicken chicken){ // 如果容器满了,就需要等待消费者消费 if (count == chickens.length){ // 通知消费者消费,生产等待 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } chickens[count] = chicken; count++; this.notifyAll(); } // 消费者消费产品 public synchronized Chicken pop(){ // 判断能否消费 if(count==0){ // 等待生产者生产,消费者等待 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 如果可以消费 count--; Chicken chicken = chickens[count]; // 可以通知消费了 this.notifyAll(); return chicken; } } class Consumer extends Thread{ SynContainer container; public Consumer(SynContainer container){ this.container = container; } @Override public void run() { for (int i = 0; i < 100; i++) { System.out.println("消费了-->"+container.pop().id+"只鸡"); } } } // 产品 class Chicken{ int id; //产品编号 public Chicken(int id){ this.id = id; } }
5.3 解决线程之间通信的方式1:信号灯法
这段代码是一个简单的生产者-消费者模型,其中演员(Player)作为生产者生产节目(voice),观众(Wathcher)作为消费者观看节目。TV 类中的 flag 变量用于控制生产者和消费者之间的配合,flag 为 true 表示可以生产,观众需要等待;flag 为 false 表示可以消费,演员需要等待。
package com.example.democrud.democurd.test01; public class TestSync { public static void main(String[] args) { TV tv = new TV(); // 创建一个 TV 实例 Player player = new Player(tv); // 创建演员线程 Watcher watcher = new Watcher(tv); // 创建观众线程 player.start(); // 启动演员线程 watcher.start(); // 启动观众线程 } } // 生产者 - 演员 class Player extends Thread{ private final TV tv; // TV 对象 public Player(TV tv){ this.tv = tv; } @Override public void run() { for (int i = 0; i < 20; i++) { // 生产 20 个节目 if(i % 2 == 0){ this.tv.play("快乐大本营"); // 生产“快乐大本营”节目 } else { this.tv.play("天天向上"); // 生产“天天向上”节目 } } } } // 消费者 - 观众 class Watcher extends Thread{ private final TV tv; // TV 对象 public Watcher(TV tv){ this.tv = tv; } @Override public void run() { for (int i = 0; i < 20; i++) { // 观看 20 个节目 tv.watch(); // 观众观看 TV 节目 } } } // 产品 - 节目 class TV{ // 标记,用于控制生产者和消费者之间的配合 // flag 为 true 表示可以生产,观众需要等待;flag 为 false 表示可以消费,演员需要等待。 private boolean flag = true; private String voice; // 节目名称 // 生产者生产节目 public synchronized void play(String voice){ while(!flag){ // 如果 flag 为 false,则说明已经有其他生产者在生产,当前线程需要等待 try { this.wait(); // 等待其他线程唤醒 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("演员表演了: "+voice); // 生产完毕,唤醒其他被阻塞的消费者线程 this.notifyAll(); this.voice = voice; // 存储节目名称 this.flag = false; // 修改标记,指示可以消费 } // 消费者观看节目 public synchronized void watch(){ while(flag){ // 如果 flag 为 true,则说明已经有其他消费者在观看,当前线程需要等待 try { this.wait(); // 等待其他线程唤醒 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("观看了: "+voice); // 消费完毕,唤醒其他被阻塞的生产者线程 this.notifyAll(); this.flag = true; // 修改标记,指示可以生产 } }
这段代码实现了生产者-消费者模型,使用线程同步机制 synchronized 和 wait/notifyAll 方法来控制线程安全和数据共享。其中,演员是生产者,观众是消费者,TV 节目是产品。
为了更好地理解和优化该代码,建议从以下几个方面入手:
添加注释:为代码添加必要的注释,解释类、方法和变量的作用和功能。
简化命名:对于类、方法和变量的命名应简洁明了,能快速表达其含义。
优化代码结构:将多余的空格、括号、换行符等去掉,使代码更加紧凑易读。
增加异常处理:在代码中添加必要的异常处理,防止程序因为异常而崩溃或出错。
尝试使用更简洁的方式:可以使用 Java 并发包中的 Lock 和 Condition 类来替代 synchronized 和 wait/notifyAll 方法,实现更加简洁和高效的线程同步和互斥。
增加日志输出:在代码中增加必要的日志输出,记录程序执行过程中的关键信息,方便调试和排查问题。
引入线程池:当需要创建大量线程时,可以考虑使用线程池来避免频繁创建和销毁线程的开销,提高程序的性能和效率。
通过对代码进行优化和升级,可以使其更加易读易懂、性能更加高效。
5.4 使用线程池
背景:经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。
思路:提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。
可以避免频繁创建销毁、实现重复利用。类似生活中的公共交通工具。
优点:
提高响应速度(减少了创建新线程的时间)
降低资源消耗(重复利用线程池中线程,不需要每次都创建)
便于线程管理…
- corePoolSize:核心池的大小
- maximumPoolSize:最大线程数
- keepAliveTime:线程没有任务时最多保持多长时间会终止
package com.example.democrud.democurd.test01; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Testpool { public static void main(String[] args) { //创建线程池 大小为10 ExecutorService service= Executors.newFixedThreadPool(10); service.execute(new MyThread()); service.execute(new MyThread()); service.execute(new MyThread()); service.execute(new MyThread()); service.execute(new MyThread()); //2.关闭链接 service.shutdownNow(); } } class MyThread implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName()); } }
5.5 补充内容
package com.sjmp.Thread01; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class ThreadFutureTest { public static class CallerTask implements Callable<String>{ @Override public String call() throws Exception { return "Thread-Callable- hello"; } } public static void main(String[] args) { // 创建异步任务 FutureTask<String> futureTask = new FutureTask<>(new CallerTask()); new Thread(futureTask).start(); try { String result = futureTask.get(); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
如上代码中的 CallerTask 类实现了 Callable 接口的 call() 方法。在 main 函数内首先创建了一个 FutrueTask 对象(构造函数为 CallerTask 的实例),然后使用创建的 FutrueTask 对象作为任务创建了一个线程并且启动它,最后通过futureTask.get() 等待任务执行完毕并返回结果。
小结:使用继承方式的好处是方便传参,你可以在子类里面添加成员变量,通过 set 方法设置参数或者通过构造函数进行传递,而如果使用 Runnable 方式,则只能使用主线程里面被声明为 final 的变量。不好的地方是 Java 不支持多继承,如果继承了 Thread 类,那么子类不能再继承其他类,而 Runable 则没有这个限制。前两种方式都没办法拿到任务的返回结果,但是 Futuretask 方式可以。
5.6wait() 方法
7.回顾总结
package com.example.democrud.democurd.test01; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.function.Function; public class ThreadNew { public static void main(String[] args) { //1. new MyThread1().start(); //2. new Thread(new MyThread2()).start(); //3. FutureTask<Integer> futureTask = new FutureTask<Integer>(new MyThread3()); new Thread(futureTask).start(); try { Integer integer=futureTask.get(); System.out.println(integer); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } //1.继承Thread class MyThread1 extends Thread { @Override public void run() { System.out.println("我是继承Thread的方法"); } } //2.实现Runnable的接口 class MyThread2 implements Runnable { @Override public void run() { System.out.println("我是实现Runnable的接口"); } } //3.实现callable接口 //Callable<Integer> 中的Integer和重写的call返回值同步的 class MyThread3 implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("我是实现callable接口"); return 10000; } }