深入理解线程通信(下)

简介: 开发中不免会遇到需要所有子线程执行完毕通知主线程处理某些逻辑的场景。 或者是线程 A 在执行到某个条件通知线程 B 执行某个操作。

CyclicBarrier 中文名叫做屏障或者是栅栏,也可以用于线程间通信。


它可以等待 N 个线程都达到某个状态后继续运行的效果。


  1. 首先初始化线程参与者。


  1. 调用 await() 将会在所有参与者线程都调用之前等待。


  1. 直到所有参与者都调用了 await() 后,所有线程从 await() 返回继续后续逻辑。


运行结果:


2018-03-18 22:40:00.731 [Thread-0] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-1] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-2] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [main] INFO  c.c.actual.ThreadCommunication - main thread
2018-03-18 22:40:05.741 [Thread-0] INFO  c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-1] INFO  c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-2] INFO  c.c.actual.ThreadCommunication - thread end do something


可以看出由于其中一个线程休眠了五秒,所有其余所有的线程都得等待这个线程调用 await()


该工具可以实现 CountDownLatch 同样的功能,但是要更加灵活。甚至可以调用 reset() 方法重置 CyclicBarrier (需要自行捕获 BrokenBarrierException 处理) 然后重新执行。


线程响应中断


public class StopThread implements Runnable {
    @Override
    public void run() {
        while ( !Thread.currentThread().isInterrupted()) {
            // 线程执行具体逻辑
            System.out.println(Thread.currentThread().getName() + "运行中。。");
        }
        System.out.println(Thread.currentThread().getName() + "退出。。");
    }
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new StopThread(), "thread A");
        thread.start();
        System.out.println("main 线程正在运行") ;
        TimeUnit.MILLISECONDS.sleep(10) ;
        thread.interrupt();
    }
}


输出结果:


thread A运行中。。
thread A运行中。。
thread A退出。。


可以采用中断线程的方式来通信,调用了 thread.interrupt() 方法其实就是将 thread 中的一个标志属性置为了 true。


并不是说调用了该方法就可以中断线程,如果不对这个标志进行响应其实是没有什么作用(这里对这个标志进行了判断)。


但是如果抛出了 InterruptedException 异常,该标志就会被 JVM 重置为 false。


线程池 awaitTermination() 方法


如果是用线程池来管理线程,可以使用以下方式来让主线程等待线程池中所有任务执行完毕:


private static void executorService() throws Exception{
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10) ;
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1, TimeUnit.MILLISECONDS,queue) ;
        poolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        poolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running2");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        poolExecutor.shutdown();
        while (!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){
            LOGGER.info("线程还在执行。。。");
        }
        LOGGER.info("main over");
    }


输出结果:


2018-03-16 20:18:01.273 [pool-1-thread-2] INFO  c.c.actual.ThreadCommunication - running2
2018-03-16 20:18:01.273 [pool-1-thread-1] INFO  c.c.actual.ThreadCommunication - running
2018-03-16 20:18:02.273 [main] INFO  c.c.actual.ThreadCommunication - 线程还在执行。。。
2018-03-16 20:18:03.278 [main] INFO  c.c.actual.ThreadCommunication - 线程还在执行。。。
2018-03-16 20:18:04.278 [main] INFO  c.c.actual.ThreadCommunication - main over


使用这个 awaitTermination() 方法的前提需要关闭线程池,如调用了 shutdown() 方法。


调用了 shutdown() 之后线程池会停止接受新任务,并且会平滑的关闭线程池中现有的任务。


管道通信


public static void piped() throws IOException {
        //面向于字符 PipedInputStream 面向于字节
        PipedWriter writer = new PipedWriter();
        PipedReader reader = new PipedReader();
        //输入输出流建立连接
        writer.connect(reader);
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running");
                try {
                    for (int i = 0; i < 10; i++) {
                        writer.write(i+"");
                        Thread.sleep(10);
                    }
                } catch (Exception e) {
                } finally {
                    try {
                        writer.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running2");
                int msg = 0;
                try {
                    while ((msg = reader.read()) != -1) {
                        LOGGER.info("msg={}", (char) msg);
                    }
                } catch (Exception e) {
                }
            }
        });
        t1.start();
        t2.start();
    }


输出结果:


2018-03-16 19:56:43.014 [Thread-0] INFO  c.c.actual.ThreadCommunication - running
2018-03-16 19:56:43.014 [Thread-1] INFO  c.c.actual.ThreadCommunication - running2
2018-03-16 19:56:43.130 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=0
2018-03-16 19:56:43.132 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=1
2018-03-16 19:56:43.132 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=2
2018-03-16 19:56:43.133 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=3
2018-03-16 19:56:43.133 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=4
2018-03-16 19:56:43.133 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=5
2018-03-16 19:56:43.133 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=6
2018-03-16 19:56:43.134 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=7
2018-03-16 19:56:43.134 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=8
2018-03-16 19:56:43.134 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=9


Java 虽说是基于内存通信的,但也可以使用管道通信。


需要注意的是,输入流和输出流需要首先建立连接。这样线程 B 就可以收到线程 A 发出的消息了。


实际开发中可以灵活根据需求选择最适合的线程通信方式。


相关文章
|
14天前
|
Java 调度
[Java]线程生命周期与线程通信
本文详细探讨了线程生命周期与线程通信。文章首先分析了线程的五个基本状态及其转换过程,结合JDK1.8版本的特点进行了深入讲解。接着,通过多个实例介绍了线程通信的几种实现方式,包括使用`volatile`关键字、`Object`类的`wait()`和`notify()`方法、`CountDownLatch`、`ReentrantLock`结合`Condition`以及`LockSupport`等工具。全文旨在帮助读者理解线程管理的核心概念和技术细节。
31 1
[Java]线程生命周期与线程通信
|
4月前
|
Java
实现Java多线程中的线程间通信
实现Java多线程中的线程间通信
|
5月前
|
Java 程序员
从菜鸟到大神:JAVA多线程通信的wait()、notify()、notifyAll()之旅
【6月更文挑战第21天】Java多线程核心在于wait(), notify(), notifyAll(),它们用于线程间通信与同步,确保数据一致性。wait()让线程释放锁并等待,notify()唤醒一个等待线程,notifyAll()唤醒所有线程。这些方法在解决生产者-消费者问题等场景中扮演关键角色,是程序员从新手到专家进阶的必经之路。通过学习和实践,每个程序员都能在多线程编程的挑战中成长。
53 6
|
15天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
12 1
|
15天前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
26 1
|
15天前
|
Java
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件成立时被唤醒,从而有效解决数据一致性和同步问题。本文通过对比其他通信机制,展示了 `wait()` 和 `notify()` 的优势,并通过生产者-消费者模型的示例代码,详细说明了其使用方法和重要性。
21 1
|
25天前
|
Java
|
5月前
|
Java
Java Socket编程与多线程:提升客户端-服务器通信的并发性能
【6月更文挑战第21天】Java网络编程中,Socket结合多线程提升并发性能,服务器对每个客户端连接启动新线程处理,如示例所示,实现每个客户端的独立操作。多线程利用多核处理器能力,避免串行等待,提升响应速度。防止死锁需减少共享资源,统一锁定顺序,使用超时和重试策略。使用synchronized、ReentrantLock等维持数据一致性。多线程带来性能提升的同时,也伴随复杂性和挑战。
97 0
|
5月前
|
安全 Java
JAVA多线程通信新解:wait()、notify()、notifyAll()的实用技巧
【6月更文挑战第20天】Java多线程中,`wait()`, `notify()`和`notifyAll()`用于线程通信。在生产者-消费者模型示例中,它们确保线程同步。`synchronized`保证安全,`wait()`在循环内防止虚假唤醒,`notifyAll()`避免唤醒单一线程问题。关键技巧包括:循环内调用`wait()`,优先使用`notifyAll()`以保证可靠性,以及确保线程安全和正确处理`InterruptedException`。
50 0
|
25天前
多线程通信和同步的方式有哪些?
【10月更文挑战第6天】
82 0