前言
文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820…
种一棵树最好的时间是十年前,其次是现在
絮叨
昨天把线程这个类的源码,稍微的讲了一下,创建线程啥的大家都懂,就没有讲了,今天我们接着讲线程,下面是前面的章节链接:
- 🔥史上最全的Java并发系列之并发编程的挑战
- 🔥史上最全的Java并发系列之Java并发机制的底层实现原理
- 🔥史上最全的Java并发系列之Java内存模型
- 🔥史上最全的Java并发系列之Java多线程(一)
线程间通信
volatile和synchronized关键字
volatile修饰的变量,程序访问时都需要在共享内存中去读取,对它的改变也必须更新共享内存,保证了线程对变量访问的可见性。
synchronized:对于 同步块 的实现使用了monitorenter和monitorexit指令,而 同步方法 则是依靠方法修饰符上的ACC_SYNCHRONIZED来完成的。无论采用哪种方式,其本质是对一个对象的监视器monitor进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个线程获取到由synchronized所保护对象的监视器。
等待/通知机制——wait和notify
指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。 等待:wait()、wait(long)、wait(long, int) 通知:notify()、notifyAll() 示例:
private static Object object = new Object(); public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { try { TimeUnit.SECONDS.sleep(1); synchronized (object) { System.out.println("t1 start object.wait(), time = " + System.currentTimeMillis() / 1000); object.wait(); System.out.println("t1 after object.wait(), time = " + System.currentTimeMillis() / 1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread t2 = new Thread(() -> { try { TimeUnit.SECONDS.sleep(5); synchronized (object) { System.out.println("t2 start object.notify(), time = " + System.currentTimeMillis() / 1000); object.notify(); System.out.println("t2 after object.notify(), time = " + System.currentTimeMillis() / 1000); } synchronized (object) { System.out.println("t2 hold lock again, time = " + System.currentTimeMillis() / 1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); t1.start(); t2.start(); } 复制代码
输出的结果
t1 start object.wait(), time = 1560138112 t2 start object.notify(), time = 1560138116 t2 after object.notify(), time = 1560138116 t2 hold lock again, time = 1560138116 t1 after object.wait(), time = 1560138116 复制代码
以下结论
- wait()、notify()和notifyAll()时需要先对调用对象加锁,否则会报java.lang.IllegalMonitorStateException异常。
- 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
- notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。
- notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
- 从wait()方法返回的前提是获得了调用对象的锁。
等待/通知的经典范式
包括 等待方(消费者)和 通知方(生产者)。 等待方遵循以下原则:
- 获取对象的锁。
- 如果条件不满足,那么调用对象的wait方法,被通知后任要检查条件。
- 条件不满足则执行对应的逻辑。
对应代码如下:
synchronized (对象) { while (条件不满足) { 对象.wait(); } 对应的处理逻辑 } 复制代码
通知方遵循以下原则:
- 获取对象的锁。
- 改变条件。
- 通知所有在等待在对象上的线程。
synchronized (对象) { 改变条件 对象.notifyAll(); } 复制代码
管道输入/输出流
PipedWriter和PipedReader分别是字符管道输出流和字符管道输入流,是字符流中"管道流",可以实现同一个进程中两个线程之间的通信,与PipedOutputStream和PipedInputStream相比,功能类似.区别是前者传输的字符数据,而后者传输的是字节数据.
不同线程之间通信的流程大致是:PipedWriter与PipedReader流进行连接,写入线程通过将字符管道输出流写入数据,实际将数据写到了与PipedWriter连接的PipedReader流中的缓冲区,然后读取线程通过PipedReader流读取之前存储在缓冲区里面的字符数据.
例子
private static PipedWriter writer; private static PipedReader reader; public static void main(String[] args) throws InterruptedException, IOException { writer = new PipedWriter(); reader = new PipedReader(); //绑定输入输出 writer.connect(reader); Thread t = new Thread(() -> { int res; try { while ((res = reader.read()) != -1) { System.out.print((char) res); } } catch (Exception e) { e.printStackTrace(); } }); t.start(); int res; while ((res = System.in.read()) != -1) { System.out.println(res); writer.write(res); //按回车结束 if (res == 10) { break; } } writer.close(); } 复制代码
ThreadLocal
变量值的共享可以使用public static的形式,所有线程都使用同一个变量,如果想实现每一个线程都有自己的共享变量该如何实现呢?JDK中的ThreadLocal类正是为了解决这样的问题。
ThreadLocal类并不是用来解决多线程环境下的共享变量问题,而是用来提供线程内部的共享变量,在多线程环境下,可以保证各个线程之间的变量互相隔离、相互独立。在线程中,可以通过get()/set()方法来访问变量。ThreadLocal实例通常来说都是private static类型的,它们希望将状态与线程进行关联。这种变量在线程的生命周期内起作用,可以减少同一个线程内多个函数或者组件之间一些公共变量的传递的复杂度。
例子
public class ThreadLocalTest { static class MyThread extends Thread { private static ThreadLocal<Integer> threadLocal = new ThreadLocal<>(); @Override public void run() { super.run(); for (int i = 0; i < 3; i++) { threadLocal.set(i); System.out.println(getName() + " threadLocal.get() = " + threadLocal.get()); } } } public static void main(String[] args) { MyThread myThreadA = new MyThread(); myThreadA.setName("ThreadA"); MyThread myThreadB = new MyThread(); myThreadB.setName("ThreadB"); myThreadA.start(); myThreadB.start(); } } 复制代码
ThreadA threadLocal.get() = 0 ThreadB threadLocal.get() = 0 ThreadA threadLocal.get() = 1 ThreadA threadLocal.get() = 2 ThreadB threadLocal.get() = 1 ThreadB threadLocal.get() = 2 复制代码
ThreadLocal最简单的实现方式就是ThreadLocal类内部有一个线程安全的Map,然后用线程的ID作为Map的key,实例对象作为Map的value,这样就能达到各个线程的值隔离的效果。
一个数据库连接池的简单实现
1.连接池创建
package com.atguigu.ct.producer.Test.BB; import java.sql.Connection; import java.util.LinkedList; public class ConnectionPool { private LinkedList<Connection> pool = new LinkedList<>(); public ConnectionPool(int initialSize){ if(initialSize > 0){ for (int i = 0; i < initialSize ; i++) { pool.addLast(ConnectionDriver.createConnection()); } } } public void releaseConnection(Connection connection){ if(connection != null){ synchronized (pool){ //连接释放后需要进行通知,这样其他消费者能够感知到连接池中已归还了一个连接 pool.addLast(connection); pool.notifyAll(); } } } //在 mills内无法获取到连接,将返回 null public Connection fetchConnection(long mills) throws InterruptedException { synchronized (pool){ //完全超时 if(mills <= 0){ while (pool.isEmpty()){ pool.wait(); } return pool.removeFirst(); }else{ long future = System.currentTimeMillis() + mills; long remaining = mills; while (pool.isEmpty() && remaining > 0){ pool.wait(remaining); remaining = future - System.currentTimeMillis(); } Connection result = null; if(!pool.isEmpty()){ result = pool.removeFirst(); } return result; } } } } 复制代码
2.由于java.sql.Connection 是一个接口,最终的实现是由数据库驱动提供方来实现的,博主只是为了娱乐,就用动态代理构造了一个Connection,改Connection的代理实现仅仅是在commit()方法调用时休眠100毫秒,如下
package com.atguigu.ct.producer.Test.BB; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.sql.Connection; import java.util.concurrent.TimeUnit; public class ConnectionDriver { static class ConnectionHandler implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if(method.getName().equals("commit")){ TimeUnit.MILLISECONDS.sleep(100); } return null; } } //创建一个 Connection 代理,在 commit 时休眠 100 毫秒 public static final Connection createConnection(){ return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(), new Class<?>[]{Connection.class}, new ConnectionHandler()); } } 复制代码