1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
package
com.sohu.hot.vis.servlet;
import
java.util.concurrent.*;
/**
* 多线程学习之Callable
*
* @author liweihan
* @time 2016-12-29 14:44
*/
public
class
TestCallableAndFuture {
/**
* Callable 和 Future接口
* Callable是类似于Runnable的接口,实现Callable接口的类和实现Runnable的类都是可被其它线程执行的任务。
* Callable和Runnable有几点不同:
* (1)Callable规定的方法是call(),而Runnable规定的方法是run().
* (2)Callable的任务执行后可返回值,而Runnable的任务是不能返回值的。
* (3)call()方法可抛出异常,而run()方法是不能抛出异常的。
* (4)运行Callable任务可拿到一个Future对象,
* Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。
* 通过Future对象可了解任务执行情况,可取消任务的执行,还可获取任务执行的结果。
* Future的cancel方法可以取消任务的执行,它有一布尔参数,参数为 true 表示立即中断任务的执行,
* 参数为 false 表示允许正在运行的任务运行完成。Future的 get 方法等待计算完成,获取计算结果
*/
public
static
class
MyCallable
implements
Callable {
private
int
flag =
0
;
public
MyCallable(
int
flag) {
this
.flag = flag;
}
@Override
public
Object call()
throws
Exception {
if
(
this
.flag ==
0
) {
return
"flag = 0"
;
}
else
if
(
this
.flag ==
1
) {
try
{
while
(
true
) {
System.out.println(
"循环。。。"
);
Thread.sleep(
2000
);
}
}
catch
(InterruptedException e) {
System.out.println(
"Interruptered"
);
}
return
false
;
}
else
{
throw
new
Exception(
"Error flag value!!"
);
}
}
}
public
static
void
main(String[] args) {
//定义三个Callable类型的任务
MyCallable task1 =
new
MyCallable(
0
);
MyCallable task2 =
new
MyCallable(
1
);
MyCallable task3 =
new
MyCallable(
2
);
//定义一个执行任务的服务
ExecutorService es = Executors.newFixedThreadPool(
3
);
try
{
/**
* 提交并执行任务,任务启动时返回了一个Future对象。
* 如果想得到任务执行的结果或者是异常可对这个Future对象进行操作
*/
Future future1 = es.submit(task1);
//获得第一个任务的结果,如果调用get方法,当前线程会等待任务执行完毕后才往下执行
System.out.println(
"task1:"
+ future1.get());
Future future2 = es.submit(task2);
//等待5秒后,再停止第二个任务。因为第二个任务进行的是无限循环
Thread.sleep(
5000
);
System.out.println(
"task2 cancel:"
+ future2.cancel(
true
));
//获取第三个任务的输出,因为执行第三个任务会引起异常
//所以下面的语句将引起异常
Future future3 = es.submit(task3);
System.out.println(
"task3:"
+ future3.get());
}
catch
(Exception e) {
System.out.println(e.toString());
}
//停止任务执行服务
es.shutdownNow();
}
}
|
例子2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
package
com.sohu.hot.vis.servlet;
import
java.util.ArrayList;
import
java.util.HashMap;
import
java.util.List;
import
java.util.Map;
import
java.util.concurrent.Callable;
import
java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
import
java.util.concurrent.Future;
/**
* 多线程学习之Callable
*
* @author liweihan
* @time 2016-12-29 15:44
*/
public
class
TestCallable2 {
static
class
StarRelationThread
implements
Callable<Boolean> {
private
Map<String, String> mapThread ;
private
Map<String, String> map ;
private
int
threadNum;
public
StarRelationThread(Map<String, String> mapThread ,Map<String, String> map,
int
threadNum) {
this
.map = map;
this
.threadNum = threadNum;
this
.mapThread = mapThread;
}
@Override
public
Boolean call()
throws
Exception {
System.out.println(
" 第 "
+ threadNum +
" 个线程处理-开始 ,此线程处理的数量 "
+ mapThread.size() +
",总的数量为:"
+map.size());
System.out.println(
"处理数据 ,并写入redis中"
);
if
(threadNum >
3
) {
try
{
Thread.sleep(
20000
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
int
sync =
0
;
for
(Map.Entry<String, String> en : mapThread.entrySet()) {
sync++;
if
(sync <
2
) {
System.out.println(
"key :"
+ en.getKey() +
", value :"
+ en.getValue());
}
}
System.out.println(
" 第 "
+ threadNum +
" 个线程执行完毕!"
);
return
true
;
//true和flase,可以根据具体业务再做处理
}
}
public
static
void
main(String[] args) {
Map<String, String> map =
new
HashMap<String, String>();
//测试数据
for
(
int
i =
0
; i <
300000
; i++) {
map.put(
"key"
+ i,
"value"
+i);
}
//5.分割map+多线程
int
totalSize = map.size();
System.out.println(
"Map totalSize : "
+ totalSize);
//线程的数量
int
threadNum =
16
;
//每个线程处理的数量
int
threadSize = totalSize / threadNum;
System.out.println(
"每个线程处理的数量:"
+ threadSize);
List<StarRelationThread> threadList =
new
ArrayList<StarRelationThread>();
for
(
int
i =
0
; i < threadNum; i++) {
int
end ;
if
(i == threadNum -
1
) {
//最后一个线程
end = threadSize + totalSize % threadNum;
}
else
{
end = threadSize;
}
int
beginNum = i * threadSize;
int
endNum = i * threadSize + end;
System.out.println(i +
" begin : "
+ beginNum +
" , "
+ endNum);
int
sync =
0
;
//分割map
Map<String, String> mapThread =
new
HashMap<String, String>();
for
(Map.Entry<String, String> entry : map.entrySet()) {
sync++;
if
(sync > beginNum && sync <= endNum) {
mapThread.put(entry.getKey(), entry.getValue());
}
}
StarRelationThread st =
new
StarRelationThread(mapThread,map,i);
threadList.add(st);
}
//执行任务
try
{
/**
* 线程池的了解:http://blog.csdn.net/coding_or_coded/article/details/6856014
* http://www.cnblogs.com/yezhenhan/archive/2012/01/07/2315645.html
* http://hbiao68.iteye.com/blog/1929245
*
* https://my.oschina.net/u/1419751/blog/359263
* http://blog.csdn.net/linghu_java/article/details/17123057
*/
ExecutorService executorService = Executors.newFixedThreadPool(
4
);
List<Future<Boolean>> threadFutureList = executorService.invokeAll( threadList );
executorService.shutdownNow();
boolean
hasError =
false
;
for
( Future<Boolean> threadFuture : threadFutureList ) {
boolean
optSuccess = threadFuture.get();
if
( !optSuccess ) {
hasError =
true
;
}
}
if
(hasError) {
System.out.println(
" FAIL---------------"
);
}
else
{
System.out.println(
" SUCCESS ------------------"
);
}
}
catch
(Exception e) {
e.printStackTrace();
}
}
}
|
当用完一个线程池后,应该调用该线程池的shutdown()方法,该方法将启动线程池的关闭序列,调用shutdown()方法后的线程池不再接受新任务,但将以前所有已提交任务执行完。当线程池中的所有任务都执行完成后,池中的所有线程都会死亡;
void shutdown();
另外也可以调用线程池中的shutdownNow()方法来关闭线程池,该方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行任务列表。
List<Runnable> shutdownNow();
例子3:Semaphore
一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。拿到信号量的线程可以进入代码,否则就等待。通过acquire()和release()获取和释放访问许可。
<1.>
public void acquire() throws InterruptedException
-
从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。获取一个许可(如果提供了一个)并立即返回,将可用的许可数减 1。
<2.>
public void release()
-
释放一个许可,将其返回给信号量。释放一个许可,将可用的许可数增加 1。如果任意线程试图获取许可,则选中一个线程并将刚刚释放的许可给予它。然后针对线程安排目的启用(或再启用)该线程。
-
1234567891011121314151617181920212223242526272829303132333435363738394041
package
com.book.admin.test;
import
java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
import
java.util.concurrent.Semaphore;
public
class
SemaphoreTest {
public
static
void
main(String[] args) {
//线程池
ExecutorService exec = Executors.newCachedThreadPool();
//只能5个线程同时访问
final
Semaphore semp =
new
Semaphore(
5
);
for
(
int
i =
0
; i <
20
; i++) {
final
int
no = i;
Runnable runnable =
new
Runnable() {
@Override
public
void
run() {
try
{
//获取许可
semp.acquire();
System.out.println(
"Accessing: "
+ no);
Thread.sleep((
long
) Math.random() *
10000
);
//访问完后,释放许可,如果注释掉下面的语句,则控制台只能打印5条记录,之后线程一直阻塞
semp.release();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
};
//执行线程
exec.execute(runnable);
}
//退出线程池
exec.shutdown();
}
}
本文转自韩立伟 51CTO博客,原文链接:http://blog.51cto.com/hanchaohan/1888811
,如需转载请自行联系原作者