Executor框架-阿里云开发者社区

开发者社区> javaedge> 正文

Executor框架

简介: Executor框架
+关注继续查看

0 执行者Executor的由来

在介绍具体的工具之前,先讲讲设计者的思路。在Java1.4之前,已经提供了Runnable接口、Thread类、Timer类和synchronize关键字,它们已经足以完成各种各样的多线程编程任务,为什么还要提供执行者这样的概念呢?

这是因为Java的设计者想把线程的创建、执行和调度分离。在Concurrency包出现之前,线程的创建基本上靠new一个Thread对象,执行靠start()方法,而线程的调度则完全依赖程序员在具体代码中自己写出来。

而在Concurrency包出现之后,线程的创建还是依靠Thread、Runnable和Callable(新加入)对象的实例化;而线程的执行则靠Executor、ExecutorService的对象执行execute()方法或submit()方法;线程的调度则被固化为几个具体的线程池类,如ThreadPoolExecutor、ScheduledThreadPoolExecutor、ExecutorCompletionService等等。这样表面上增加了复杂度,而实际上成功将线程的创建、执行和调度的业务逻辑分离,使程序员能够将精力集中在线程中业务逻辑的编写,大大提高了编码效率,降低了出错的概率,而且大大提高了性能。

在Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源。同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。

Java的线程既是工作单元,也是执行机制。从JDK

5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。

工具介绍

1 线程执行者

这个功能主要由三个接口和类提供,分别是:

Executor:执行者接口,所有执行者的父类;

ExecutorService:执行者服务接口,具体的执行者类都继承自此接口;

Executors:执行者工具类,大部分执行者的实例以及线程池都由它的工厂方法创建。

先看一个例子:

public class ExecutorExam {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(new Task("task1"));
        service.execute(new Task("task2"));
        service.execute(new Task("task3"));
        service.shutdown();
    }
}

class Task implements Runnable {
    private final String name;

    Task(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(name + "-[" + i + "]");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Executors工具类的newCachedThreadPool方法创建了一个执行者的对象,该对象是ExecutorService的实现类(后面我们会知道它是一个ThreadPoolExecutor对象)。

Task是Runnable接口的实现类,代码中new出了三个Task对象,它们被提交给ExecutorService的execute方法作为参数,即三个线程被ExecutorService执行了,其调度机制隐含在执行者的具体类中。最后ExecutorService调用shutdown方法,阻止继续提交其他线程,并等待执行中的线程结束。

通过这种方式,线程的创建、执行和调度被分离了,程序员可以将精力集中在线程的内部业务中。

2 得到异步执行的结果

在Java1.4之前,如果要得到一个线程运行后产生的值,没有一套现成的机制,程序员可以通过Thread类的成员变量、程序的全局变量等方式来得到一个线程运行后产生的某个值。但是这样的话,你必须不断探测线程是否已经成功结束,或者运用同步技术来等待线程执行完成,再去获取异步执行的结果。

在Java Concurrency中,得到异步结果有了一套固定的机制,即通过Callable接口、Future接口和ExecutorService的submit方法来得到异步执行的结果,相关工具的介绍如下:

Callable:泛型接口,与Runnable接口类似,它的实例可以被另一个线程执行,内部有一个call方法,返回一个泛型变量V

Future:泛型接口,代表依次异步执行的结果值,调用其get方法可以得到一次异步执行的结果,如果运算未完成,则阻塞直到完成;调用其cancel方法可以取消一次异步执行

CompletionService:一种执行者,可将submit的多个任务的结果按照完成的先后顺序存入一个内部队列,然后可以使用take方法从队列中依次取出结果并移除,如果调用take时计算未完成则会阻塞

先看一个简单的例子,得到一个异步执行的整形值

public class CallableExam {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        Future<Integer> future = service.submit(() -> {
            System.out.println("Callable is running");
            TimeUnit.SECONDS.sleep(2);
            return 47;
        });
        service.shutdown();
        System.out.println("future.get = " + future.get());
    }
}

接下来是一个使用CompletionService的例子,创建5个线程计算一些值,执行完成后使用CompletionService依次取出结果并打印

public class CompletionServiceExam {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newCachedThreadPool();
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(service);
        for (int i = 0; i < 5; i++) {
            completionService.submit(new TaskInteger(i));
        }
        service.shutdown();
        //will block
        for (int i = 0; i < 5; i++) {
            Future<Integer> future = completionService.take();
            System.out.println(future.get());
        }

    }
}

class TaskInteger implements Callable<Integer> {
    private final int sum;

    TaskInteger(int sum)  {
        this.sum = sum;
    }

    @Override
    public Integer call() throws Exception {
        TimeUnit.SECONDS.sleep(sum);
        return sum * sum;
    }
}

需要注意的是CompletionService的创建方法,它的构造函数需要一个ExecutorService的对象作为参数

3 重复执行和延期执行

在Java1.4之前,一般使用Timer来重复或者延期执行任务。Java Concurrency为了使之与Executor理念协调,引入了ScheduledExecutorService来完成同样的工作。

  • ScheduledExecutorService:另一种执行者,可以将提交的任务延期执行,也可以将提交的任务反复执行
  • ScheduledFuture:与Future接口类似,代表一个被调度执行的异步任务的返回值

下面的例子中ScheduledExecutorService的实例scheduler调度了两个任务,

  • 第一个任务使用scheduleAtFixedRate()方法每隔一秒重复打印“beep”
  • 第二个任务使用schedule()方法在10秒后延迟执行,它的作用是取消第一个任务

代码如下

public class ScheduledExecutorServiceExam {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2);
        ScheduledFuture<?> scheduledFuture = scheduler.scheduleAtFixedRate(() -> System.out.println("beep"), 1, 1, TimeUnit.SECONDS);

        scheduler.schedule(() -> {
            System.out.println("cancel beep");
            scheduledFuture.cancel(true);
            scheduler.shutdown();
        }, 10, TimeUnit.SECONDS);
    }
}

4 TimeUnit

上面的例子中用到了TimeUnit,因此这里就先介绍一下。TimeUnit是Java Concurrency包引入的新式的表达时间间隔或延迟的单位。在JDK1.5后面引入的新类中,都使用TimeUnit作为时间的表达方式。例如:

lock.tryLock(50L, TimeUnit.MILLISECONDS)
        public ScheduledFuture<?> schedule (Runnable command,long delay, TimeUnit unit);

还有,使用最多的,替代Thread.sleep的这种用法:

TimeUnit.SECONDS.sleep(5);
TimeUnit有以下6个时间单位:
NANOSECONDS:纳秒,1000纳秒为一微秒
MICROSECONDS:微秒,1000微秒为一毫秒
MILLISECONDS:毫秒,1000毫秒为一秒
SECONDS:秒,60秒为1分钟
MINUTES:分钟,60分钟为1小时
HOURS:小时,24小时为1天
DAYS:天

原来Java的时间单位默认为毫秒,引入TimeUnit后增加了纳秒和微秒,精度更高了,同时引入了很多转换方法,便于使用。

1 Executor框架简介

1.1 Executor框架的两级调度模型

在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。


在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。这种两级调度模型的示意图如下图所示。


从图中可以看出,应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。


1.png

1.2 Executor框架的结构与成员

1.2.1 Executor框架的结构

Executor框架主要由3大部分组成如下。

1.任务。包括被执行任务需要实现的接口:Runnable接口或Callable接口。

2.任务的执行。包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。

3.异步计算的结果。包括接口Future和实现Future接口的FutureTask类。

Executor框架包含的主要的类与接口如图

1.png

下面是这些类和接口的简介。

Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。

ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。

ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。

Future接口和实现Future接口的FutureTask类,代表异步计算的结果。

Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行。

Executor框架的使用示意图如图

1.png



版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
使用注解开发| 学习笔记
快速学习 使用注解开发
23 0
案例_点菜| 学习笔记
快速学习案例_点菜
15 0
我为什么要放弃 RESTful,选择拥抱 GraphQL
REST作为一种现代网络应用非常流行的软件架构风格,自从Roy Fielding博士在2000年他的博士论文中提出来到现在已经有了20年的历史。它的简单易用性,可扩展性,伸缩性受到广大Web开发者的喜爱。
9 0
这样统计代码执行耗时,才足够优雅!
代码耗时统计在日常开发中算是一个十分常见的需求,特别是在需要找出代码性能瓶颈时。 可能也是受限于 Java 的语言特性,总觉得代码写起来不够优雅,大量的耗时统计代码,干扰了业务逻辑。特别是开发功能的时候,有个感受就是刚刚开发完代码很清爽优雅,结果加了一大堆辅助代码后,整个代码就变得臃肿了,自己看着都挺难受。因此总想着能不能把这块写的更优雅一点,今天本文就尝试探讨下“代码耗时统计”这一块。
10 0
Slf4j 包老冲突,每次排查半天,是什么原因?怎么解决?
一、前言 在进行 Java 开发时,通常我们会选择 Slf4j 作为日志门面,但日志实现却不尽相同。如果系统运行中同时存在多个日志实现,就会出现类似下图的 Warning。
10 0
Google 开源的依赖注入库,比 Spring 更小更快!
Guice是Google开源的一个依赖注入类库,相比于Spring IoC来说更小更快。Elasticsearch大量使用了Guice,本文简单的介绍下Guice的基本概念和使用方式。
10 0
听说 Spring AOP 有坑?那就来踩一踩
前言 前几日,有朋友分享了这样一个案例: 原来的项目一直都正常运行,突然
10 0
我为什么要放弃 RESTful,选择拥抱 GraphQL
REST作为一种现代网络应用非常流行的软件架构风格,自从Roy Fielding博士在2000年他的博士论文中提出来到现在已经有了20年的历史。它的简单易用性,可扩展性,伸缩性受到广大Web开发者的喜爱。
10 0
【保姆级教程】Spring Boot 单元测试
【保姆级教程】Spring Boot 单元测试 一、 单元测试的概念 概念: \1. 单元测试(unit testing),是指对软件中的最小可测试单元进行检查和验证。在Java中单元测试的最小单元是类。
8 0
+关注
javaedge
关注公众号:JavaEdge,后台回复面试,领取更多大厂求职资源。曾在百度、携程、华为等大厂搬砖,专注Java生态各种中间件原理、框架源码、微服务、中台等架构设计及落地实战,只生产硬核干货!
2316
文章
1
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载