学习java很久很久了,得有个5年了,但是从来都没有真正的走进java世界,希望从这篇文章开始,把自己对java的点点滴滴都记录下来。
从java5开始,java就提供了名叫Executor framework的机制,主要是围绕着Executor接口, 它的接口 ExecutorService, 以及实现了这两个接口的ThreadPoolExecutor类来展开,这种机制把线程的执行和创建分离开了,你只需要创建一个线程,然后把线程丢给Executor,让它执行去吧。使用这个机制的另外一个好处是可以使用Callable接口,它类似于Runnable接口,但是有两个不一样的特性。
- 它的主要方法是call(), 它可以携带一个返回值。
- 当你发送了一个Callable对象给executor之后,你可以拿到一个实现了Future接口的对象,通过这个对象,你可以控制对象的状态以及Callable对象的结果。
public Server(){ executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5); System.out.printf("Server: Task Count: %d\n",executor. getTaskCount()); } //提交带返回值的任务。 public class FactorialCalculator implements Callable<Integer> { private Integer number; public FactorialCalculator(Integer number){ this.number=number; } @Override public Integer call() throws Exception { int result = 1; if ((num==0)||(num==1)) { result=1; } else { for (int i=2; i<=number; i++) { result*=i; TimeUnit.MILLISECONDS.sleep(20); } } System.out.printf("%s: %d\n",Thread.currentThread(). getName(),result); return result; } public class Main { public static void main(String[] args) { ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors. newFixedThreadPool(2); List<Future<Integer>> resultList=new ArrayList<>(); Random random=new Random(); for (int i=0; i<10; i++){ Integer number= random.nextInt(10); FactorialCalculator calculator=new FactorialCalculator(number); Future<Integer> result=executor.submit(calculator); resultList.add(result); } do { for (int i=0; i<resultList.size(); i++) { Future<Integer> result=resultList.get(i); System.out.printf("Main: Task %d: %s\n",i,result. isDone()); } try { TimeUnit.MILLISECONDS.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } while (executor.getCompletedTaskCount()<resultList.size()); System.out.printf("Main: Results\n"); for (int i=0; i<resultList.size(); i++) { Future<Integer> result=resultList.get(i); Integer number=null; try { number=result.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.printf("Main: Task %d: %d\n",i,number); } executor.shutdown(); } } }
AI 代码解读
在Future对象里面,我们可以通过result.isDone()方法来判断线程是否计算完毕。
执行一堆任务,只返回第一个完成的任务。result = executor.invokeAny(taskList);
执行全部,resultList=executor.invokeAll(taskList);
推迟执行,executor.awaitTermination(1, TimeUnit.DAYS);
把任务的执行和结果的处理分开,需要用到CompletionService, CompletionServicei有两个方法,take和poll,poll是如果没有,它就会立刻返回一个null,take是没有的话,会一直等待。
当线程池调用了关闭之后,它需要等待当前所有进行中的线程结束才会完全关闭,在这个过程当中提交的线程,需要拒绝处理,我们需要实现一个RejectedExecutionHandler,重写它的rejectedExecution方法,然后听过executor的setRejectedExecutionHandler()方法来设置。
public class RejectedTaskController implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.printf("RejectedTaskController: The task %s has been rejected\n",r.toString()); System.out.printf("RejectedTaskController: %s\n",executor. toString()); System.out.printf("RejectedTaskController: Terminating: %s\n",executor.isTerminating()); System.out.printf("RejectedTaksController: Terminated: %s\n",executor.isTerminated()); }
AI 代码解读
Fork/Join Framework
Fork/Join Framwork的诞生是为了解决如下图这样的问题的
我们可以利用ForkJoinPool,它是一个特殊的Executors。
ForkJoin 框架主要是有两个操作:
Fork:把一个任务分成几个小任务,然后执行
Join:等待小任务的完成,并生成一个新任务。
这里我把ForkJoin称为刀叉框架,刀叉框架和Executor框架不一样的地方在于work-stealing算法,不同于Executor框架,刀叉框架在等待子任务的完成之前就已经创建并开始运行Join方法,Join方法一直在检测任务是否完成并且开始运行。通过这样的方式,可以很好的利用runtime的优势,提高性能。
这样就有一些限制:
任务只能采用Fork和Join来作为同步机制,而不能采用别的同步机制,如果采用其他的机制,他们在同步操作的时候,不能执行别的任务。比如:当你在刀叉框架里面一个任务sleep的时候,别的正在执行的任务也会停止,直到该线程sleep结束。
刀叉框架的核心是两个类:
(1)ForkJoinPool,它实现了ExecutorService接口和work-stealing算法,通过它可以很好的管理正在运行的任务以及了解任务的信息。
(2)ForkJoinTask,任务的基类,它提供了fork()方法和join()方法来控制任务的状态。不同通常,你会实现他们的两个子类,不带返回结果的RecursiveAction和带返回结果的RecursiveTask。