1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package  com.sohu.hot.vis.servlet;
 
import  java.util.concurrent.*;
 
/**
  * 多线程学习之Callable
  *
  * @author liweihan
  * @time 2016-12-29 14:44
  */
public  class  TestCallableAndFuture {
     /**
      * Callable 和 Future接口
 
      * Callable是类似于Runnable的接口,实现Callable接口的类和实现Runnable的类都是可被其它线程执行的任务。
 
      * Callable和Runnable有几点不同:
      * (1)Callable规定的方法是call(),而Runnable规定的方法是run().
      * (2)Callable的任务执行后可返回值,而Runnable的任务是不能返回值的。
      * (3)call()方法可抛出异常,而run()方法是不能抛出异常的。
      * (4)运行Callable任务可拿到一个Future对象,
 
      * Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。
      * 通过Future对象可了解任务执行情况,可取消任务的执行,还可获取任务执行的结果。
      * Future的cancel方法可以取消任务的执行,它有一布尔参数,参数为 true 表示立即中断任务的执行,
      * 参数为 false 表示允许正在运行的任务运行完成。Future的 get 方法等待计算完成,获取计算结果
      */
 
     public  static  class  MyCallable  implements  Callable {
         private  int  flag =  0 ;
         public  MyCallable( int  flag) {
             this .flag = flag;
         }
 
         @Override
         public  Object call()  throws  Exception {
             if  ( this .flag ==  0 ) {
                 return  "flag = 0" ;
             } else  if  ( this .flag ==  1 ) {
                 try  {
                     while  ( true ) {
                         System.out.println( "循环。。。" );
                         Thread.sleep( 2000 );
                     }
                 catch  (InterruptedException e) {
                     System.out.println( "Interruptered" );
                 }
                 return  false ;
             else  {
                 throw  new  Exception( "Error flag value!!" );
             }
         }
     }
 
     public  static  void  main(String[] args) {
         //定义三个Callable类型的任务
         MyCallable task1 =  new  MyCallable( 0 );
         MyCallable task2 =  new  MyCallable( 1 );
         MyCallable task3 =  new  MyCallable( 2 );
 
         //定义一个执行任务的服务
         ExecutorService es = Executors.newFixedThreadPool( 3 );
 
         try  {
             /**
              * 提交并执行任务,任务启动时返回了一个Future对象。
              * 如果想得到任务执行的结果或者是异常可对这个Future对象进行操作
              */
             Future future1 = es.submit(task1);
             //获得第一个任务的结果,如果调用get方法,当前线程会等待任务执行完毕后才往下执行
             System.out.println( "task1:"  + future1.get());
 
             Future future2 = es.submit(task2);
             //等待5秒后,再停止第二个任务。因为第二个任务进行的是无限循环
             Thread.sleep( 5000 );
             System.out.println( "task2 cancel:"  + future2.cancel( true ));
 
             //获取第三个任务的输出,因为执行第三个任务会引起异常
             //所以下面的语句将引起异常
             Future future3 = es.submit(task3);
             System.out.println( "task3:"  + future3.get());
 
         catch  (Exception e) {
             System.out.println(e.toString());
         }
         //停止任务执行服务
         es.shutdownNow();
     }
}


例子2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package  com.sohu.hot.vis.servlet;
 
 
import  java.util.ArrayList;
import  java.util.HashMap;
import  java.util.List;
import  java.util.Map;
import  java.util.concurrent.Callable;
import  java.util.concurrent.ExecutorService;
import  java.util.concurrent.Executors;
import  java.util.concurrent.Future;
 
/**
  * 多线程学习之Callable
  *
  * @author liweihan
  * @time 2016-12-29 15:44
  */
public  class  TestCallable2 {
 
 
     static  class  StarRelationThread  implements  Callable<Boolean> {
 
         private  Map<String, String> mapThread ;
         private  Map<String, String> map ;
         private  int  threadNum;
 
         public  StarRelationThread(Map<String, String> mapThread ,Map<String, String> map, int  threadNum) {
             this .map = map;
             this .threadNum = threadNum;
             this .mapThread = mapThread;
         }
 
         @Override
         public  Boolean call()  throws  Exception {
             System.out.println( " 第  "  + threadNum +  " 个线程处理-开始 ,此线程处理的数量 "
              + mapThread.size() +  ",总的数量为:" +map.size());
             System.out.println( "处理数据 ,并写入redis中" );
 
             if  (threadNum >  3 ) {
                 try  {
                     Thread.sleep( 20000 );
                 catch  (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
 
             int  sync =  0 ;
             for  (Map.Entry<String, String> en : mapThread.entrySet()) {
                 sync++;
                 if  (sync <  2 ) {
                     System.out.println( "key :"  + en.getKey() +  ", value :"  + en.getValue());
                 }
             }
             System.out.println( " 第 "  + threadNum +  " 个线程执行完毕!" );
             return  true //true和flase,可以根据具体业务再做处理
         }
     }
 
 
     public  static  void  main(String[] args) {
         Map<String, String> map =  new  HashMap<String, String>();
         //测试数据
         for  ( int  i =  0 ; i <  300000 ; i++) {
             map.put( "key"  + i,  "value" +i);
         }
 
         //5.分割map+多线程
         int  totalSize = map.size();
         System.out.println( "Map totalSize : "  + totalSize);
 
         //线程的数量
         int  threadNum =  16 ;
         //每个线程处理的数量
         int  threadSize = totalSize / threadNum;
         System.out.println( "每个线程处理的数量:"  + threadSize);
 
         List<StarRelationThread> threadList =  new  ArrayList<StarRelationThread>();
         for  ( int  i =  0 ; i < threadNum; i++) {
             int  end ;
             if  (i == threadNum -  1 ) {
                 //最后一个线程
                 end = threadSize + totalSize % threadNum;
             else  {
                 end = threadSize;
             }
             int  beginNum = i * threadSize;
             int  endNum = i * threadSize + end;
             System.out.println(i +  " begin : "  + beginNum +  " , "  + endNum);
 
             int  sync =  0 ;
             //分割map
             Map<String, String> mapThread =  new  HashMap<String, String>();
             for (Map.Entry<String, String> entry : map.entrySet()) {
                 sync++;
                 if  (sync > beginNum && sync <= endNum) {
                     mapThread.put(entry.getKey(), entry.getValue());
                 }
             }
             StarRelationThread st =  new  StarRelationThread(mapThread,map,i);
             threadList.add(st);
         }
 
         //执行任务
         try  {
             /**
              * 线程池的了解:http://blog.csdn.net/coding_or_coded/article/details/6856014
              *        http://www.cnblogs.com/yezhenhan/archive/2012/01/07/2315645.html
              *        http://hbiao68.iteye.com/blog/1929245
              *
              *        https://my.oschina.net/u/1419751/blog/359263
              *        http://blog.csdn.net/linghu_java/article/details/17123057
              */
             ExecutorService executorService = Executors.newFixedThreadPool(  4  );
             List<Future<Boolean>> threadFutureList = executorService.invokeAll( threadList );
             executorService.shutdownNow();
             boolean  hasError =  false ;
             for  ( Future<Boolean> threadFuture : threadFutureList ) {
                 boolean  optSuccess = threadFuture.get();
                 if  ( !optSuccess ) {
                     hasError =  true ;
                 }
             }
 
             if  (hasError) {
                 System.out.println( " FAIL---------------" );
             else  {
                 System.out.println( " SUCCESS ------------------" );
             }
         catch  (Exception e) {
             e.printStackTrace();
         }
 
     }
}

当用完一个线程池后,应该调用该线程池的shutdown()方法,该方法将启动线程池的关闭序列,调用shutdown()方法后的线程池不再接受新任务,但将以前所有已提交任务执行完。当线程池中的所有任务都执行完成后,池中的所有线程都会死亡;

 void shutdown();


另外也可以调用线程池中的shutdownNow()方法来关闭线程池,该方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行任务列表。

List<Runnable> shutdownNow();


例子3:Semaphore

一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。拿到信号量的线程可以进入代码,否则就等待。通过acquire()和release()获取和释放访问许可。

<1.>

public void acquire()
             throws InterruptedException
  • 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。获取一个许可(如果提供了一个)并立即返回,将可用的许可数减 1。

<2.>

public void release()
  • 释放一个许可,将其返回给信号量。释放一个许可,将可用的许可数增加 1。如果任意线程试图获取许可,则选中一个线程并将刚刚释放的许可给予它。然后针对线程安排目的启用(或再启用)该线程。


  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    package  com.book.admin.test;
     
    import  java.util.concurrent.ExecutorService;
    import  java.util.concurrent.Executors;
    import  java.util.concurrent.Semaphore;
     
    public  class  SemaphoreTest {
     
         public  static  void  main(String[] args) {
             
             //线程池
             ExecutorService exec = Executors.newCachedThreadPool();
             //只能5个线程同时访问
             final  Semaphore semp =  new  Semaphore( 5 );
             
             for  ( int  i =  0 ; i <  20 ; i++) {
                 final  int  no = i;
                 Runnable runnable =  new  Runnable() {
                     @Override
                     public  void  run() {
                         try  {
                             //获取许可
                             semp.acquire();
                             System.out.println( "Accessing: "  + no);
                             Thread.sleep(( long ) Math.random() *  10000 );
                             //访问完后,释放许可,如果注释掉下面的语句,则控制台只能打印5条记录,之后线程一直阻塞
                             semp.release();
                         catch  (InterruptedException e) {
                             e.printStackTrace();
                         }
                         
                     }
                 };
                 //执行线程
                 exec.execute(runnable);
             }
             //退出线程池
             exec.shutdown();
         }
         
    }