Java并发编程高级内容介绍

简介:

计数器:CountDownLatch

CountDownLatch类似于一个计数器,和Atomic类比较相近,操作是原子的,即多个线程同时只能有一个可以去操作。CountDownLatch对象设置一个初始的数字作为计数值,任何调用这个对象上的await()方法都会阻塞,直到这个计数器的计数值被其他的线程调用countDown()减为0为止。典型的应用场景就是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。例如在Zookeeper的使用过程中,由于客户端与服务器建立连接是异步调用的,因此主线程需要await()阻塞直至异步回调countDown()完成。


代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public  class  CountDownLatchTest {
 
 
     public  static  void  main(String[] args) {
 
         final  CountDownLatch countDownLatch =  new  CountDownLatch( 2 );
 
         Thread work1 =  new  Thread( new  Runnable() {
             @Override
             public  void  run() {
 
                 System.out.println(Thread.currentThread() +  " doing work...start" );
 
                 try  {
                     Thread.sleep( 200 );
                 catch  (InterruptedException e) {
                     e.printStackTrace();
                 }
 
                 System.out.println(Thread.currentThread() +  " doing work...end " );
                 countDownLatch.countDown();
             }
         }, "work1" );
 
 
         Thread work2 =  new  Thread( new  Runnable() {
             @Override
             public  void  run() {
 
                 System.out.println(Thread.currentThread().getName() +  " doing work...start" );
 
                 try  {
                     Thread.sleep( 200 );
                 catch  (InterruptedException e) {
                     e.printStackTrace();
                 }
 
                 System.out.println(Thread.currentThread().getName() +  " doing work...end " );
                 countDownLatch.countDown();
             }
         }, "work2" );
 
         work1.start();
         work2.start();
 
         try  {
             countDownLatch.await();
             System.out.println( "all workers finish " );
         catch  (InterruptedException e) {
             e.printStackTrace();
         }
 
 
     }
 
 
}



齐步走:CyclicBarrier

Barrier的意思是栅栏,就是让一组线程相互等待,直至所有线程都到齐了,那么就可以齐步走。Cyclic是循环的意思,就是说Barrier可以循环使用。CyclicBarrier主要的方法就是await(),较CountDownLatch的await()虽然都是阻塞,但是CyclicBarrier.await()有返回值int,即当前线程是第几个到达这个Barrier的线程。

构造CyclicBarrier时指定计数值,await() 方法每被调用一次,计数便会减少1,并阻塞住当前线程。当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始。在构造方法上还可以传递一个Runnable对象,阻塞解除时这个Runnable会得到运行。

CyclicBarrier有点“不见不散”的味道,想一想,如果某个成员因某种原因来不了Barrier这个地方,那么我们一直等待吗?实际中,如果来不了理应通知其他成员,别等了,回家吧!注意到CyclicBarrier.await()独有的BrokenBarrierException异常

wKioL1hU62jTIP18AAAgAV0NFVo139.png


代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public  class  CyclicBarrierTest {
 
     public  static  void  main(String[] args) {
 
         final  CyclicBarrier cyclicBarrier =  new  CyclicBarrier( 2 new  Runnable() {
             @Override
             public  void  run() {
                 System.out.println( "都准备好啦!" );
             }
         });
 
         Thread runman1 =  new  Thread( new  Runnable() {
             @Override
             public  void  run() {
 
                 try  {
                     Thread.sleep( 200 );
                 catch  (InterruptedException e) {
                     e.printStackTrace();
                 }
 
                 try  {
                     cyclicBarrier.await();
                     System.out.println(Thread.currentThread().getName() +  "i am ok" );
 
                 catch  (InterruptedException e) {
                     e.printStackTrace();
                 catch  (BrokenBarrierException e) {
                     e.printStackTrace();
                 }
             }
         }, "runman1" );
 
 
         Thread runman2 =  new  Thread( new  Runnable() {
             @Override
             public  void  run() {
 
                 try  {
                     cyclicBarrier.await();
                     System.out.println(Thread.currentThread().getName() +  "i am ok" );
                 catch  (InterruptedException e) {
                     e.printStackTrace();
                 catch  (BrokenBarrierException e) {
                     e.printStackTrace();
                 }
             }
         }, "runman2" );
 
         runman1.start();
         runman2.start();
 
     }
 
}


Callable And Future

在博主以前的博客Java Future模式实现中有介绍Future模式,Future模式非常适合在处理耗时很长的业务逻辑,可以有效的减少系统的响应时间,提高系统的吞吐量。JDK其实已经为我们提供了API实现,我们来看一段代码即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public  class  FutureTest {
 
 
     public  static  void  main(String[] args) {
 
         FutureTask<String> futureTask =  new  FutureTask<String>( new  Callable<String>() {
             @Override
             public  String call()  throws  Exception {
 
                 Thread.sleep( 2000 );
                 return  "ok" ;
             }
         });
 
         ExecutorService es = Executors.newFixedThreadPool( 1 );
         es.submit(futureTask);
         System.out.println( "开启线程去异步处理,主线程继续往下执行!" );
         try  {
 
             System.out.println( "取得异步处理结果:"  + futureTask.get());
 
         catch  (InterruptedException e) {
             e.printStackTrace();
         catch  (ExecutionException e) {
             e.printStackTrace();
         }
 
 
     }
 
}

注意到线程池执行任务,可以利用2个方法:

wKiom1hVO5KTzT65AAAF84kvGAc039.png

wKiom1hVPBaj2Sh-AAAGyfavKEw459.png

wKiom1hVO-jTO8zFAAAGw9T46oo296.png

submit和execute有什么区别呢?从入参和结果类型就知道了。



信号量:Semaphore

Semaphore实现的功能就类似厕所有5个坑,假如有10个人要上厕所,那么显然同时只能有5个人占用厕所,当5个人中的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的fair参数选项。

Semaphore可以控制某个资源可被同时访问的个数(构造方法传入),通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。


代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public  static  void  main(String[] args) {
 
     final  Semaphore semaphore =  new  Semaphore( 5 );
 
     for ( int  i =  0  ; i <  6  ; i++){
 
         new  Thread( new  Runnable() {
             @Override
             public  void  run() {
 
                 try  {
                     semaphore.acquire();
 
                     System.out.println(Thread.currentThread().getName() +  " 运行..." );
 
                     Thread.sleep( 1000 );
 
                     semaphore.release();
 
                     System.out.println(Thread.currentThread().getName() +  " 结束..." );
 
 
                 catch  (InterruptedException e) {
                     e.printStackTrace();
                 }
 
             }
         },String.valueOf(i)).start();
 
 
     }
 
}


Condition

JDK由原始的synchronized发展到Lock,以类的方式提供锁机制,发展出重入锁、读写锁,以类的形式存在自然功能更加强大灵活,比如可以tryLock进行锁的嗅探。在synchronized代码块中我们可以使用wait/notify/notifyAll来进行线程的协同工作,那么JDK也发展了这一块,即Condition。Condition.await类似于wait,Condition.signal/signalAll类似于notify/nofityAll。下面我们简单实现一个Condition版的生产者/消费者。


处理核心:Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public  class  Handler {
 
     //容器
     private  LinkedList<String> linkedList =  new  LinkedList<String>();
 
     //限制
     private  int  MAX_SIZE =  3 ;
 
     //锁
     private  Lock lock =  new  ReentrantLock();
 
     //condition  实际上,可以new多个condition,这里暂且只是用给一个
     private  Condition condition = lock.newCondition();
 
     public  void  put(String bread){
 
         try {
             lock.lock();
 
             if (linkedList.size() == MAX_SIZE){
                 System.out.println( "容器已满" );
                 condition.await();
             }
 
             linkedList.add(bread);
             System.out.println( "放入面包"  + bread);
             condition.signalAll();
         } catch  (Exception e){
             e.printStackTrace();
         } finally  {
             lock.unlock();
         }
 
     }
 
     public  void  eat(){
 
         try {
             lock.lock();
 
             if (linkedList.size() ==  0 ){
                 System.out.println( "容器为空" );
                 condition.await();
             }
 
             String bread = linkedList.removeFirst();
             System.out.println( "吃掉一个面包"  + bread);
             condition.signalAll();
 
         } catch (Exception e){
 
             e.printStackTrace();
         } finally  {
             lock.unlock();
         }
 
 
     }
 
}


生产者:Produce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public  class  Produce  implements  Runnable{
 
     private  Handler handler;
 
     public  Produce(Handler handler) {
         this .handler = handler;
     }
 
     @Override
     public  void  run() {
 
         for ( int  i =  0  ; i <  10  ; i++){
             try  {
                 Thread.sleep( new  Random().nextInt( 1000 ));
             catch  (InterruptedException e) {
                 e.printStackTrace();
             }
             handler.put(String.valueOf(i));
         }
 
     }
}


消费者:Consume

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public  class  Consume  implements  Runnable{
 
     private  Handler handler;
 
     public  Consume(Handler handler) {
         this .handler = handler;
     }
 
     @Override
     public  void  run() {
         for  ( int  i =  0  ; i <  10  ; i++){
             try  {
                 Thread.sleep( new  Random().nextInt( 1000 ));
             catch  (InterruptedException e) {
                 e.printStackTrace();
             }
             handler.eat();
         }
     }
}


Main:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public  class  Main {
 
     public  static  void  main(String[] args) {
 
         Handler handler =  new  Handler();
 
         Produce produce =  new  Produce(handler);
         Consume consume =  new  Consume(handler);
 
         new  Thread(consume).start();
         new  Thread(produce).start();
         new  Thread(produce).start();
     }
}


本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1883655,如需转载请自行联系原作者

相关文章
|
2月前
|
Java API 微服务
为什么虚拟线程将改变Java并发编程?
为什么虚拟线程将改变Java并发编程?
251 83
|
30天前
|
安全 Java 数据库连接
2025 年最新 Java 学习路线图含实操指南助你高效入门 Java 编程掌握核心技能
2025年最新Java学习路线图,涵盖基础环境搭建、核心特性(如密封类、虚拟线程)、模块化开发、响应式编程、主流框架(Spring Boot 3、Spring Security 6)、数据库操作(JPA + Hibernate 6)及微服务实战,助你掌握企业级开发技能。
215 3
|
2天前
|
Java 开发者
Java并发编程:CountDownLatch实战解析
Java并发编程:CountDownLatch实战解析
|
19天前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
49 16
|
12天前
|
NoSQL Java 关系型数据库
超全 Java 学习路线,帮你系统掌握编程的超详细 Java 学习路线
本文为超全Java学习路线,涵盖基础语法、面向对象编程、数据结构与算法、多线程、JVM原理、主流框架(如Spring Boot)、数据库(MySQL、Redis)及项目实战等内容,助力从零基础到企业级开发高手的进阶之路。
95 1
|
28天前
|
安全 算法 Java
Java泛型编程:类型安全与擦除机制
Java泛型详解:从基础语法到类型擦除机制,深入解析通配符与PECS原则,探讨运行时类型获取技巧及最佳实践,助你掌握泛型精髓,写出更安全、灵活的代码。
|
27天前
|
安全 Java Shell
Java模块化编程(JPMS)简介与实践
本文全面解析Java 9模块化系统(JPMS),帮助开发者解决JAR地狱、类路径冲突等常见问题,提升代码的封装性、性能与可维护性。内容涵盖模块化核心概念、module-info语法、模块声明、实战迁移、多模块项目构建、高级特性及最佳实践,同时提供常见问题和面试高频题解析,助你掌握Java模块化编程精髓,打造更健壮的应用。
|
1月前
|
Java
Java编程:理解while循环的使用
总结而言, 使用 while 迴圈可以有效解决需要多次重复操作直至特定條件被触发才停止執行任务场景下问题; 它简单、灵活、易于实现各种逻辑控制需求但同时也要注意防止因邏各错误导致無限迁璇発生及及時處理可能発生异常以确保程序稳定运作。
174 0
|
1月前
|
安全 Cloud Native Java
Java:历久弥新的企业级编程基石
Java:历久弥新的企业级编程基石