计数器:CountDownLatch
CountDownLatch类似于一个计数器,和Atomic类比较相近,操作是原子的,即多个线程同时只能有一个可以去操作。CountDownLatch对象设置一个初始的数字作为计数值,任何调用这个对象上的await()方法都会阻塞,直到这个计数器的计数值被其他的线程调用countDown()减为0为止。典型的应用场景就是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。例如在Zookeeper的使用过程中,由于客户端与服务器建立连接是异步调用的,因此主线程需要await()阻塞直至异步回调countDown()完成。
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
public
class
CountDownLatchTest {
public
static
void
main(String[] args) {
final
CountDownLatch countDownLatch =
new
CountDownLatch(
2
);
Thread work1 =
new
Thread(
new
Runnable() {
@Override
public
void
run() {
System.out.println(Thread.currentThread() +
" doing work...start"
);
try
{
Thread.sleep(
200
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() +
" doing work...end "
);
countDownLatch.countDown();
}
},
"work1"
);
Thread work2 =
new
Thread(
new
Runnable() {
@Override
public
void
run() {
System.out.println(Thread.currentThread().getName() +
" doing work...start"
);
try
{
Thread.sleep(
200
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +
" doing work...end "
);
countDownLatch.countDown();
}
},
"work2"
);
work1.start();
work2.start();
try
{
countDownLatch.await();
System.out.println(
"all workers finish "
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
|
齐步走:CyclicBarrier
Barrier的意思是栅栏,就是让一组线程相互等待,直至所有线程都到齐了,那么就可以齐步走。Cyclic是循环的意思,就是说Barrier可以循环使用。CyclicBarrier主要的方法就是await(),较CountDownLatch的await()虽然都是阻塞,但是CyclicBarrier.await()有返回值int,即当前线程是第几个到达这个Barrier的线程。
构造CyclicBarrier时指定计数值,await() 方法每被调用一次,计数便会减少1,并阻塞住当前线程。当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始。在构造方法上还可以传递一个Runnable对象,阻塞解除时这个Runnable会得到运行。
CyclicBarrier有点“不见不散”的味道,想一想,如果某个成员因某种原因来不了Barrier这个地方,那么我们一直等待吗?实际中,如果来不了理应通知其他成员,别等了,回家吧!注意到CyclicBarrier.await()独有的BrokenBarrierException异常
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
public
class
CyclicBarrierTest {
public
static
void
main(String[] args) {
final
CyclicBarrier cyclicBarrier =
new
CyclicBarrier(
2
,
new
Runnable() {
@Override
public
void
run() {
System.out.println(
"都准备好啦!"
);
}
});
Thread runman1 =
new
Thread(
new
Runnable() {
@Override
public
void
run() {
try
{
Thread.sleep(
200
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
try
{
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() +
"i am ok"
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
catch
(BrokenBarrierException e) {
e.printStackTrace();
}
}
},
"runman1"
);
Thread runman2 =
new
Thread(
new
Runnable() {
@Override
public
void
run() {
try
{
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() +
"i am ok"
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
catch
(BrokenBarrierException e) {
e.printStackTrace();
}
}
},
"runman2"
);
runman1.start();
runman2.start();
}
}
|
Callable And Future
在博主以前的博客《Java Future模式实现》中有介绍Future模式,Future模式非常适合在处理耗时很长的业务逻辑,可以有效的减少系统的响应时间,提高系统的吞吐量。JDK其实已经为我们提供了API实现,我们来看一段代码即可:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public
class
FutureTest {
public
static
void
main(String[] args) {
FutureTask<String> futureTask =
new
FutureTask<String>(
new
Callable<String>() {
@Override
public
String call()
throws
Exception {
Thread.sleep(
2000
);
return
"ok"
;
}
});
ExecutorService es = Executors.newFixedThreadPool(
1
);
es.submit(futureTask);
System.out.println(
"开启线程去异步处理,主线程继续往下执行!"
);
try
{
System.out.println(
"取得异步处理结果:"
+ futureTask.get());
}
catch
(InterruptedException e) {
e.printStackTrace();
}
catch
(ExecutionException e) {
e.printStackTrace();
}
}
}
|
注意到线程池执行任务,可以利用2个方法:
submit和execute有什么区别呢?从入参和结果类型就知道了。
信号量:Semaphore
Semaphore实现的功能就类似厕所有5个坑,假如有10个人要上厕所,那么显然同时只能有5个人占用厕所,当5个人中的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的fair参数选项。
Semaphore可以控制某个资源可被同时访问的个数(构造方法传入),通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public
static
void
main(String[] args) {
final
Semaphore semaphore =
new
Semaphore(
5
);
for
(
int
i =
0
; i <
6
; i++){
new
Thread(
new
Runnable() {
@Override
public
void
run() {
try
{
semaphore.acquire();
System.out.println(Thread.currentThread().getName() +
" 运行..."
);
Thread.sleep(
1000
);
semaphore.release();
System.out.println(Thread.currentThread().getName() +
" 结束..."
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
},String.valueOf(i)).start();
}
}
|
Condition
JDK由原始的synchronized发展到Lock,以类的方式提供锁机制,发展出重入锁、读写锁,以类的形式存在自然功能更加强大灵活,比如可以tryLock进行锁的嗅探。在synchronized代码块中我们可以使用wait/notify/notifyAll来进行线程的协同工作,那么JDK也发展了这一块,即Condition。Condition.await类似于wait,Condition.signal/signalAll类似于notify/nofityAll。下面我们简单实现一个Condition版的生产者/消费者。
处理核心:Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
public
class
Handler {
//容器
private
LinkedList<String> linkedList =
new
LinkedList<String>();
//限制
private
int
MAX_SIZE =
3
;
//锁
private
Lock lock =
new
ReentrantLock();
//condition 实际上,可以new多个condition,这里暂且只是用给一个
private
Condition condition = lock.newCondition();
public
void
put(String bread){
try
{
lock.lock();
if
(linkedList.size() == MAX_SIZE){
System.out.println(
"容器已满"
);
condition.await();
}
linkedList.add(bread);
System.out.println(
"放入面包"
+ bread);
condition.signalAll();
}
catch
(Exception e){
e.printStackTrace();
}
finally
{
lock.unlock();
}
}
public
void
eat(){
try
{
lock.lock();
if
(linkedList.size() ==
0
){
System.out.println(
"容器为空"
);
condition.await();
}
String bread = linkedList.removeFirst();
System.out.println(
"吃掉一个面包"
+ bread);
condition.signalAll();
}
catch
(Exception e){
e.printStackTrace();
}
finally
{
lock.unlock();
}
}
}
|
生产者:Produce
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public
class
Produce
implements
Runnable{
private
Handler handler;
public
Produce(Handler handler) {
this
.handler = handler;
}
@Override
public
void
run() {
for
(
int
i =
0
; i <
10
; i++){
try
{
Thread.sleep(
new
Random().nextInt(
1000
));
}
catch
(InterruptedException e) {
e.printStackTrace();
}
handler.put(String.valueOf(i));
}
}
}
|
消费者:Consume
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public
class
Consume
implements
Runnable{
private
Handler handler;
public
Consume(Handler handler) {
this
.handler = handler;
}
@Override
public
void
run() {
for
(
int
i =
0
; i <
10
; i++){
try
{
Thread.sleep(
new
Random().nextInt(
1000
));
}
catch
(InterruptedException e) {
e.printStackTrace();
}
handler.eat();
}
}
}
|
Main:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public
class
Main {
public
static
void
main(String[] args) {
Handler handler =
new
Handler();
Produce produce =
new
Produce(handler);
Consume consume =
new
Consume(handler);
new
Thread(consume).start();
new
Thread(produce).start();
new
Thread(produce).start();
}
}
|
本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1883655,如需转载请自行联系原作者