1.线程间通信之wait notify
public class Demo3 { private volatile int signal; public synchronized void set() { this.signal = 1; notifyAll(); } public synchronized int get() { System.out.println(Thread.currentThread().getName() + "方法执行了..." ); if (signal != 1) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "方法执行完毕..." ); return signal; } public static void main(String[] args) { Demo3 d = new Demo3(); Target1 t1 = new Target1(d); Target2 t2 = new Target2(d); new Thread(t2).start(); new Thread(t2).start(); new Thread(t2).start(); new Thread(t2).start(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(t1).start(); } }
public class Target1 implements Runnable{ private Demo3 demo3; public Target1(Demo3 demo3) { this.demo3 = demo3; } @Override public void run() { demo3.set(); } }
public class Target2 implements Runnable { private Demo3 demo3; public Target2(Demo3 demo3) { this.demo3 = demo3; } @Override public void run() { demo3.get(); } }
运行结果:
2.通过生产者和消费者理解等待唤醒机制
public class Tmall { private int count; private final int MAX_COUNT = 10; public synchronized void push() { while (count >= MAX_COUNT) { try { System.out.println(Thread.currentThread().getName() + " 库存数量达到上限,生产者停止时生产"); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生产者生产,当前库存为:" + count); notifyAll(); } public synchronized void take() { while (count <= 0) { try { System.out.println(Thread.currentThread().getName() + "库存数量为0,消费者等待" ); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消费者消费,当前库存为:"+count ); notifyAll(); } }
public class PushTarget implements Runnable { private Tmall tmall; public PushTarget(Tmall tmall) { this.tmall = tmall; } @Override public void run() { while (true){ tmall.push(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class TakeTarget implements Runnable { private Tmall tmall; public TakeTarget(Tmall tmall) { this.tmall = tmall; } @Override public void run() { while (true){ tmall.take(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class Test { public static void main(String[] args) { Tmall tmall = new Tmall(); PushTarget p = new PushTarget(tmall); TakeTarget t = new TakeTarget(tmall); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(t).start(); new Thread(t).start(); new Thread(t).start(); new Thread(t).start(); new Thread(t).start(); } }
运行结果:
3. condition的使用
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Demo { private int signal; Lock lock = new ReentrantLock(); Condition a = lock.newCondition(); Condition b = lock.newCondition(); Condition c = lock.newCondition(); public void a() { lock.lock(); while (signal != 0) { try { a.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("a"); signal++; b.signal(); lock.unlock(); } public void b() { lock.lock(); while (signal != 1) { try { b.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("b"); signal++; c.signal(); lock.unlock(); } public void c() { lock.lock(); while (signal != 2) { try { c.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("c"); signal = 0; a.signal(); lock.unlock(); } public static void main(String[] args) { Demo d = new Demo(); A a = new A(d); B b = new B(d); C c = new C(d); new Thread(a).start(); new Thread(b).start(); new Thread(c).start(); } } class A implements Runnable { private Demo demo; public A(Demo demo) { this.demo = demo; } @Override public void run() { while (true) { demo.a(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } class B implements Runnable { private Demo demo; public B(Demo demo) { this.demo = demo; } @Override public void run() { while (true) { demo.b(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } class C implements Runnable { private Demo demo; public C(Demo demo) { this.demo = demo; } @Override public void run() { while (true) { demo.c(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
运行结果:
a b c方法依次运行。
4.简易连接池实现
①wait notifyAll方式
public class MyDataSource { private LinkedList<Connection> pool = new LinkedList<>(); private static final int INIT_CONNECTIONS = 10; private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver"; private static final String USER = ""; private static final String PWD = ""; private static final String URL = ""; static { try { Class.forName(DRIVER_CLASS); } catch (ClassNotFoundException e) { e.printStackTrace(); } } public MyDataSource() { for (int i=0;i<INIT_CONNECTIONS;i++){ try { Connection conn = DriverManager.getConnection(URL, USER, PWD); pool.addLast(conn); } catch (SQLException e) { e.printStackTrace(); } } } public Connection getConnection(){ Connection result = null; synchronized (pool){ while (pool.size()<=0){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } if (!pool.isEmpty()){ result = pool.removeFirst(); } } return result; } public void release(Connection conn){ if (conn !=null){ synchronized (pool){ pool.addLast(conn); notifyAll(); } } } }
②lock condition方式
import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class MyDataSource { private LinkedList<Connection> pool = new LinkedList<>(); private static final int INIT_CONNECTIONS = 10; private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver"; private static final String USER = ""; private static final String PWD = ""; private static final String URL = ""; private Lock lock = new ReentrantLock(); Condition c1 = lock.newCondition(); static { try { Class.forName(DRIVER_CLASS); } catch (ClassNotFoundException e) { e.printStackTrace(); } } public MyDataSource() { for (int i = 0; i < INIT_CONNECTIONS; i++) { try { Connection conn = DriverManager.getConnection(URL, USER, PWD); pool.addLast(conn); } catch (SQLException e) { e.printStackTrace(); } } } public Connection getConnection() { Connection result = null; lock.lock(); try { while (pool.size() <= 0) { try { c1.await(); } catch (InterruptedException e) { e.printStackTrace(); } } if (!pool.isEmpty()) { result = pool.removeFirst(); } return result; } finally { lock.unlock(); } } public void release(Connection conn) { try { lock.lock(); if (conn != null) { pool.addLast(conn); c1.signal(); } } finally { lock.unlock(); } } }
5.线程之间的通信–join
当一个线程执行的过程中,想要调用另外一个线程就可以使用join。
① 使用案例:
public class Demo { public void a(Thread joinThread) { System.out.println("方法a执行了..."); joinThread.start(); try { joinThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("方法a执行完毕..."); } public void b() { System.out.println("加塞线程开始执行..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("加塞线程执行完毕..."); } public static void main(String[] args) { Demo demo = new Demo(); Thread joinThread = new Thread(new Runnable() { @Override public void run() { demo.b(); } }); new Thread(new Runnable() { @Override public void run() { demo.a(joinThread); } }).start(); } }
运行结果:
②原理
查看join的核心源码如下:
public final void join() throws InterruptedException { join(0); } public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { //如果时执行的join(0) while (isAlive()) { //如果线程是运行状态,就会执行下面的等待 wait(0); } } else { //如果是执行的join(time) while (isAlive()) { //如果线程时运行状态 long delay = millis - now; if (delay <= 0) { break; } wait(delay); //等待delay时间后自动返回继续执行 now = System.currentTimeMillis() - base; } } }
其中核心代码如下:
while (isAlive()) { //调用join方法线程是运行时状态 wait(0); //进入等待 }
isAlive()方法下面会做详细的介绍,先看wait(0),wait(0)是什么意思呢,查看下面wait()方法源码,其实wait()方法就是调用了wait(0)方法实现的,wait(0)就是让其一直等待。到这里会发现,其实join方法本质就是利用上面的线程实例作为对象锁的原理,当线程终止时,会调用线程自身的notifyAll()方法,通知所有等待在该线程对象上的线程的特征。
public final void wait() throws InterruptedException { wait(0); }
6.ThreadLocal原理和使用
线程的局部变量。
使用
public class Demo { private ThreadLocal<Integer> count = new ThreadLocal<Integer>(){ @Override protected Integer initialValue() { return 0; } }; public int getNext() { Integer value = count.get(); value++; count.set(value); return value; } public static void main(String[] args) { Demo d = new Demo(); new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println(Thread.currentThread().getName() + " " + d.getNext()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println(Thread.currentThread().getName() + " " + d.getNext()); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println(Thread.currentThread().getName() + " " + d.getNext()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
运行结果,各自的线程维护自己的变量:
原理
ThreadLoal 变量,线程局部变量,同一个 ThreadLocal 所包含的对象,在不同的 Thread 中有不同的副本。这里有几点需要注意:
因为每个 Thread 内有自己的实例副本,且该副本只能由当前 Thread 使用。这是也是 ThreadLocal 命名的由来。
既然每个 Thread 有自己的实例副本,且其它 Thread 不可访问,那就不存在多线程间共享的问题。
ThreadLocal 提供了线程本地的实例。它与普通变量的区别在于,每个使用该变量的线程都会初始化一个完全独立的实例副本。ThreadLocal 变量通常被private static修饰。当一个线程结束时,它所使用的所有 ThreadLocal 相对的实例副本都可被回收。
总的来说,ThreadLocal 适用于每个线程需要自己独立的实例且该实例需要在多个方法中被使用,也即变量在线程间隔离而在方法或类间共享的场景。
ThreadLocal实现原理
首先 ThreadLocal 是一个泛型类,保证可以接受任何类型的对象。
因为一个线程内可以存在多个 ThreadLocal 对象,所以其实是 ThreadLocal 内部维护了一个 Map ,这个 Map 不是直接使用的 HashMap ,而是 ThreadLocal 实现的一个叫做 ThreadLocalMap 的静态内部类。而我们使用的 get()、set() 方法其实都是调用了这个ThreadLocalMap类对应的 get()、set() 方法。
7.并发工具类:等待多线程完成的CountDownLatch
CountDownLatch介绍
CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。
CountDownLatch是在java1.5被引入的,跟它一起被引入的并发工具类还有CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue,它们都存在于java.util.concurrent包下。
CountDownLatch原理
CountDownLatch是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。
原文链接:https://blog.csdn.net/qq_26368063/article/details/82386581
案例: 对一个文本中的所有数字并行求和
10,20,30,33,12,23 21,12,18,45,11 23,45,67,78,89 34,41,52,61,79
代码:
import java.io.BufferedReader; import java.io.FileReader; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; public class Demo { private int[] nums; public Demo(int line) { this.nums = new int[line]; } public void calc(String line, int index, CountDownLatch latch) { String[] nus = line.split(",");//切分每个值 int total = 0; for (String num : nus) { total += Integer.parseInt(num); } nums[index] = total;//把计算的结果放到数组中指定的位置 System.out.println(Thread.currentThread().getName() + " 执行计算任务..." + line + " 结果为:" + total); latch.countDown(); } public void sum() { System.out.println("汇总线程开始执行..."); int total = 0; for (int i = 0; i < nums.length; i++) { total += nums[i]; } System.out.println("最终的结果为: " + total); } public static void main(String[] args) { List<String> contents = readFile(); int lineCount = contents.size(); CountDownLatch latch = new CountDownLatch(lineCount); Demo d = new Demo(lineCount); for (int i = 0; i < lineCount; i++) { final int j = i; new Thread(new Runnable() { @Override public void run() { d.calc(contents.get(j), j, latch); } }).start(); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } d.sum(); } private static List<String> readFile() { ArrayList<String> contents = new ArrayList<>(); String line = null; try (BufferedReader br = new BufferedReader(new FileReader("E:\\workspace\\nums.txt"))) { while ((line = br.readLine()) != null) { contents.add(line); } } catch (Exception e) { e.printStackTrace(); } return contents; } }
运行结果:
8.并发工具类:同步屏障CyclicBarrier
①介绍
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
②CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。
CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。
③使用
假如有个场景,大家一起去开会,人到齐了才开会。代码如下:
import java.util.Random; import java.util.concurrent.CyclicBarrier; public class Demo { Random random = new Random(); public void meeting(CyclicBarrier barrier){ try { Thread.sleep(random.nextInt(4000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 到达会议室,等待开会.."); try { barrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 发言"); } public static void main(String[] args) { Demo demo = new Demo(); CyclicBarrier barrier = new CyclicBarrier(10, new Runnable() { @Override public void run() { System.out.println("好!我们开始开会..."); } }); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { demo.meeting(barrier); } }).start(); } } }
运行结果:人到齐了才开会(满足设定的到达10个线程数就开始)。
9.并发工具类:控制并发线程数的semaphore
①简介
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯。比如××马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入××马路,但是如果前一百辆中有5辆车已经离开了××马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。
②使用
import java.util.concurrent.Semaphore; public class Demo { public void method(Semaphore semaphore){ try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " is run ..."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(); } public static void main(String[] args) { Demo d = new Demo(); Semaphore semaphore = new Semaphore(10); while(true){ new Thread(new Runnable() { @Override public void run() { d.method(semaphore); } }).start(); } } }
运行结果:虽然创建了很多线程,但是只有10个线程在使用,其他的都在等待。
③ Semaphore和线程池各自是干什么?
信号量Semaphore是一个并发工具类,用来控制可同时并发的线程数,其内部维护了一组虚拟许可,通过构造器指定许可的数量,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。如果无可用许可,那么acquire方法将一直阻塞,直到其它线程释放许可。
线程池用来控制实际工作的线程数量,通过线程复用的方式来减小内存开销。线程池可同时工作的线程数量是一定的,超过该数量的线程需进入线程队列等待,直到有可用的工作线程来执行任务。
使用Seamphore,你创建了多少线程,实际就会有多少线程进行执行,只是可同时执行的线程数量会受到限制。但使用线程池,你创建的线程只是作为任务提交给线程池执行,实际工作的线程由线程池创建,并且实际工作的线程数量由线程池自己管理。
简单来说,线程池实际工作的线程是work线程,不是你自己创建的,是由线程池创建的,并由线程池自动控制实际并发的work线程数量。而Seamphore相当于一个信号灯,作用是对线程做限流,Seamphore可以对你自己创建的的线程做限流(也可以对线程池的work线程做限流),Seamphore的限流必须通过手动acquire和release来实现。
区别就是两点:
1、实际工作的线程是谁创建的?
使用线程池,实际工作线程由线程池创建;使用Seamphore,实际工作的线程由你自己创建。
2、限流是否自动实现?
线程池自动,Seamphore手动。
10.并发工具类:线程转换类exchanger
①介绍
Exchanger可以在两个线程之间交换数据,只能是2个线程,他不支持更多的线程之间互换数据。
当线程A调用Exchange对象的exchange()方法后,他会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行 Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。因此使用Exchanger的重点是成对的线程使用exchange()方法,当有一对线程达到了同步点,就会进行交换数据。因此该工具类的线程对象是成对的。
Exchanger类提供了两个方法,String exchange(V x):用于交换,启动交换并等待另一个线程调用exchange;String exchange(V x,long timeout,TimeUnit unit):用于交换,启动交换并等待另一个线程调用exchange,并且设置最大等待时间,当等待时间超过timeout便停止等待
②示例代码:
import java.util.concurrent.Exchanger; public class Demo { public void a(Exchanger<String> exch){ System.out.println("a方法执行..."); try { System.out.println("a线程正在抓取数据"); Thread.sleep(2000); System.out.println("a线程抓取到数据"); } catch (InterruptedException e) { e.printStackTrace(); } String res = "12345"; try { System.out.println("a等待对比结果..."); exch.exchange(res); } catch (InterruptedException e) { e.printStackTrace(); } } public void b(Exchanger<String> exch){ System.out.println("b方法执行..."); try { System.out.println("b线程正在抓取数据"); Thread.sleep(4000); System.out.println("b线程抓取数据结束"); } catch (InterruptedException e) { e.printStackTrace(); } String res = "12345"; try { String value = exch.exchange(res); System.out.println("b开始进行比对..."); System.out.println("比对结果为: " + value.equals(res)); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { Demo d = new Demo(); Exchanger<String> exch = new Exchanger<>(); new Thread(new Runnable() { @Override public void run() { d.a(exch); } } ).start(); new Thread(new Runnable() { @Override public void run() { d.b(exch); } } ).start(); } }
运行结果:
11.提前完成任务之FutureTask
①使用小案例
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class Demo { public static void main(String[] args) throws ExecutionException, InterruptedException { Callable<Integer> call = new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("正在计算结果。。。"); Thread.sleep(3000); return 1; } }; FutureTask<Integer> task = new FutureTask<>(call); Thread thread = new Thread(task); thread.start(); System.out.println("干点别的..."); Integer result = task.get(); System.out.println("拿到的结果是: " + result); } }
运行结果
②future设计模式实现
自己实现实现future的功能。
public class Product { private int id; private String name; public Product(int id, String name) { System.out.println("开始生产" + name); new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); this.id = id; this.name = name; System.out.println(name+ " 生产完毕"); } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Product{" + "id=" + id + ", name='" + name + '\'' + '}'; } }
/** * 订单 */ public class Future { private Product product; private boolean down; public synchronized void setProduct(Product product) { if (down){ return; } this.product = product; this.down = true; notifyAll(); } public synchronized Product get() { while (!down){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return product; } }
public class ProductFactory { public Future createProduct(String name) { Future f = new Future();//创建一个订单 System.out.println("下单成功,你可以去上班了..."); new Thread(new Runnable() { @Override public void run() { //生产产品 Product p = new Product(new Random().nextInt(), name); f.setProduct(p); } }).start(); return f; } }
public class Demo { public static void main(String[] args) { ProductFactory pf = new ProductFactory(); //下单 交钱 Future f = pf.createProduct("蛋糕"); System.out.println("我去上班了, 下了班我来取蛋糕..."); //拿着订单获取产品 System.out.println("我拿着蛋糕回家." + f.get()); } }
运行结果:
callable和runnable的区别:
Runnable中的run方法是被线程调用的,在run方法是异步调用的。
Callable的call方法,不是异步执行的,是由future的run方法调用的。
完









