Java并发编程学习10-任务执行与Executor框架

简介: 【4月更文挑战第12天】本篇 重点讲解任务执行和 Executor框架的基础知识

java-concurrency-logo.png

任务执行

何为任务? 任务通常是一些抽象且离散的工作单元。

大多数并发应用程序都是围绕着 “任务执行” 来构造的。而围绕着 “任务执行” 来设计应用程序结构时,首先要做的就是要找出清晰的任务边界。大多数服务器应用程序都提供了一种自然的任务边界选择方式:以独立的客户请求为边界。将独立的请求作为任务边界,既可以实现任务的独立性,又可以实现合理的任务规模。

1. 串行地执行任务

在应用程序中可以通过多种策略来调度任务,其中最简单的策略就是在单个线程中串行地执行各项任务。

下面我们来看如下的代码示例【SingleThreadWebServer 将串行地处理它的任务(即通过 80 端口接收到的 HTTP 请求)】:

public class SingleThreadWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            Socket connection = socket.accept();
            handleRequest(connection);
        }
    }
}

在实际执行中,上述示例执行性能非常糟糕,因为它每次只能处理一个请求。在服务器应用程序中,串行处理机制通常都无法提供高吞吐率或快速响应性。

在某些情况下,串行处理方式能带来简单性或安全性。大多数 GUI 框架都通过单一的线程来串行地处理任务。后面的博文我们会再次介绍串行模型。

2. 显式地为任务创建线程

下面我们来看如下的代码示例【通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性】:

public class ThreadPerTaskWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            new Thread(task).start();
        }
    }
}

上述 ThreadPerTaskWebServer 与前面的 单线程版本 SingleThreadWebServer 在结构上类似,主线程仍然不断地交替执行 “接受外部连接” 与 “处理相关请求” 等操作。区别在于,ThreadPerTaskWebServer 对于每个连接,主循环都将创建一个新线程来处理请求,而不是在主循环中进行处理。

在正常负载情况下,“为每个任务分配一个线程” 的方法能提升串行执行的性能【即请求的到达速率不超出服务器的请求处理能力】。

3. 无限制创建线程的不足

当需要创建大量的线程时,“为每个任务分配一个线程” 就存在如下的问题了:

  • 线程生命周期的开销非常高。创建线程需要时间,又会延迟请求的处理。如果请求的到达率非常高且请求的处理过程是轻量级的,那么为每个请求创建一个新线程将消耗大量的计算资源。
  • 资源消耗。活跃的线程会消耗系统资源,尤其是内存。如果可运行的线程数量多于可用处理器的数量,那么有些线程将闲置。大量空闲的线程会占用许多内存,给垃圾回收器带来压力,而且大量线程在竞争 CPU 资源时还将产生其他的性能开销。
  • 稳定性。可创建线程的数量存在着一个限制,该限制随着平台不同而不同,并且受多个因素制约,包括JVM的启动参数、Thread 构造函数中请求的栈大小,以及底层操作系统对线程的限制等。如果破坏了这些限制,应用可能就会抛出 OutOfMemoryError 异常。

在一定的范围内,增加线程可以提高系统的吞吐率,但如果超出了这个范围,再创建再多的线程也只会降低程序的执行速度,甚至最后整个应用程序都将崩溃。

Executor框架

任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。

前面我们已经分析了两种通过线程来执行任务的策略:

  • 把所有任务放在单个线程中串行执行【它的问题在于有着糟糕的响应性和吞吐量】
  • 将每个任务放在各自的线程中执行【它的问题在于资源管理的复杂性】

Java 类库中,任务执行的接口是 Executor,如下:

public interface Executor {
    void execute(Runnable command);
}

上述 Executor 虽然简单,但它却为 java.util.concurrent 下的异步任务执行框架提供了基础,该框架有着如下的特点:

  • 支持多种不同类型的任务执行策略。
  • 提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用 Runnable 来表示任务。

Executor 的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。

Executor 基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。

1. 基于 Executor 的 Web 服务器

下面我们来看下如下的示例【用 Executor 代替了硬编码的线程,采用了一个固定长度的线程池,可以容纳 100 个线程】:

public class TaskExecutionWebServer {
    private static final int NTHREADS = 100;
    
    private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
    
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            exec.execute(task);
        }
    }
}

上述 TaskExecutionWebServer 采用 Executor,将请求处理任务的提交与任务的实际执行解耦开来,并且只需采用另一种不同的 Executor 实现,就可以改变服务器的行为。

下面我们来将 TaskExecutionWebServer 修改为类似前面 ThreadPerTaskWebServer 的行为,只需要使用一个为每个请求都创建新线程的 Executor

public class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}

public class TaskExecutionWebServer {
    
    private static final Executor exec = new ThreadPerTaskExecutor();
    
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            exec.execute(task);
        }
    }
}

同样,还可以编写一个 Executor,使得 TaskExecutionWebServer 的行为类似于单线程的行为,即以同步的方式执行每个任务,然后再返回。

public class WithInThreadExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}

public class TaskExecutionWebServer {
    
    private static final Executor exec = new WithInThreadExecutor();
    
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            exec.execute(task);
        }
    }
}

从上面可以了解到,将任务的提交与执行解耦开来,通过简单的改动就可以为某种类型的任务指定和修改执行策略。

2. 执行策略

各种执行策略本质上都是一种资源管理工具,最佳策略还得取决于可用的计算资源以及对服务质量的需求。

  • 通过限制并发任务的数量,可以确保应用程序不会由于资源耗尽而失败,或者由于在稀缺资源上发生竞争而严重影响性能。
  • 通过将任务的提交与任务的执行策略分离开来,有助于在部署阶段选择与可用硬件资源最匹配的执行策略。

3. 线程池

线程池,是指管理一组同构工作线程的资源池。线程池与工作队列密切相关,其在工作队列中保存了所有等待执行的任务。

工作者线程是如何工作的呢?

它从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

采用线程池的好处有哪些呢?

  • 通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。
  • 当请求到达时,工作线程通常已经存在了,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。
  • 通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存而失败。

Java 类库中,可以通过调用 Executors 中的静态工厂方法来创建一个线程池:

  • newFixedThreadPool。它将创建一个固定长度的线程池,每提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程由于发生了未预期的 Exception 而结束,那么线程池会补充一个新的线程)。
  • newCachedThreadPool。它将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
  • newSingleThreadExecutor。它是一个单线程的 Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。它能确保依照任务在队列中的顺序来串行执行(例如 FIFOLIFO优先级)。
  • newScheduledThreadPool。它创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于 Timer

4. Executor 的生命周期

由于 Executor 以异步方式来执行任务,因此在任何时刻,之前提交的任务的状态并不是立即可见的。这些任务可能的状态如下:

  • 已经完成
  • 正在运行
  • 在队列中等待执行

为了管理任务的不同状态,Executor 扩展了 ExecutorService 接口,添加了一些用于生命周期管理的方法,如下所示:

public interface ExecutorService extends Executor {
    void shutdown();
    
    List<Runnable> shutdownNow();
    
    boolean isShutdown();
    
    boolean isTerminated();
    
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    // ...其他用于任务提交的方法
}

ExecutorService 的生命周期有 3 种状态:

  • 运行
  • 关闭
  • 已终止

ExecutorService 在初始创建时处于运行状态。shutdown 方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成--包括那些还未开始执行的任务。shutdownNow 方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

ExecutorService 关闭后提交的任务将由 “拒绝执行处理器(Rejected Execution Handler)” 来处理,它会抛弃任务,或者让 execute 方法抛出一个未检查的 RejectedExecutionException。等所有任务都完成后,ExecutorService 将转入终止状态。可以调用 awaitTermination 来等待 ExecutorService 到达终止状态,或者通过调用 isTerminated 来轮询 ExecutorService 是否已经终止。通常在调用 awaitTermination 之后会立即调用 shutdown,从而产生同步地关闭 ExecutorService 的效果。

下面我们来看下如下的示例【通过增加生命周期支持来扩展 Web 服务器的功能】;

public class LifecycleWebServer {
    private final Executor exec = Executors.newCachedThreadPool();
    
    public void start() throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (!exec.isShutdown()) {
            try {
                final Socket connection = socket.accept();
                exec.execute(new Runnable() {
                    public void run() {
                        handleRequest(connection);
                    }
                });
            } catch (RejectedExecutionException e) {
            if (!exec.isShutdown()) 
            
            }
        }
    }
    
    public void stop() {
        exec.shutdown();
    }
    
    void handleRequest(Socket connection) {
        Request request = readRequest(connection);
        if (isShutdownRequest(request))
            stop();
        else
            dispatchRequest(request);
    }
}

上述 LifecycleWebServer 可以通过两种方式来关闭 Web 服务器:

  • 在程序中直接调用 stop 方法。
  • 以客户端请求形式向 Web 服务器发送一个特定格式的HTTP请求。

5. 延迟任务与周期任务

Java 类库中,Timer 类负责管理延迟任务【指定时间后执行该任务】以及周期任务【指定周期执行一次该任务】。

不过,Timer 使用上存在着如下的缺陷 :

  • Timer 在执行所有的定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他 TimerTask 的定时准确性。
  • Timer 在执行任务时并不捕获异常。因此当 TimerTask 抛出了一个未检查的异常时,将会终止定时线程。这时,Timer 也不会恢复线程的执行,而是会错误地认为整个 Timer 都被取消了。已经被调度但尚未执行的 TimerTask 将不会再执行,新的任务也不能被调度。【这个问题也称之为 “线程泄漏”,后续的博文将会介绍】
Timer 支持基于绝对时间而不是相对时间的调度机制,因此任务的执行对系统时钟变化很敏感,而 ScheduledThreadPoolExecutor 只支持基于相对时间的调度。

使用 线程池 就可以弥补上述缺陷,它可以提供多个线程来执行延时任务和周期任务。

下面我们来看一个演示 Timer 问题的示例:

public class OutOfTime {
    public static void main(String[] args) throws Exception {
        Timer timer = new Timer();
        timer.schedule(new ThrowTask(), 1);
        SECONDS.sleep(1);
        timer.schedule(new ThrowTask(), 1);
        SECONDS.sleep(5);
    }
    
    static class ThrowTask extends TimerTask {
        public void run() {
            throw new RuntimeException();
        }
    }
}

上述程序,你也许会认为运行 6 秒后退出,但实际情况是运行 1 秒就结束了,并抛出了一个异常消息 “Timer already cancelled”。 如下图所示:

image.png

注意:在 Java 5.0 或 更高的 JDK 中,将很少再使用 Timer。如果需要, ScheduledThreadPoolExecutor 是个不错的选择。

总结

本篇我们重点讲了任务执行与 Executor 框架的基础知识。Executor 帮助指定执行策略,但如果要使用 Executor,必须将任务表述为一个 Runnable【即必须定义一个清晰的任务边界】。大多数服务器应用程序中都存在一个明显的任务边界:单个客户请求。但有时候,任务边界并非是显而易见的,需要进一步的揭示其粒度更细的并发性。说到这里,那么下一遍博文就将通过 Demo 演示如何去找出可利用的并行性需求,敬请期待!!!

目录
相关文章
|
5天前
|
安全 Java 调度
Java编程时多线程操作单核服务器可以不加锁吗?
Java编程时多线程操作单核服务器可以不加锁吗?
18 2
|
7天前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
18 5
|
9天前
|
算法 安全 Java
JAVA并发编程系列(12)ThreadLocal就是这么简单|建议收藏
很多人都以为TreadLocal很难很深奥,尤其被问到ThreadLocal数据结构、以及如何发生的内存泄漏问题,候选人容易谈虎色变。 日常大家用这个的很少,甚至很多近10年资深研发人员,都没有用过ThreadLocal。本文由浅入深、并且才有通俗易懂方式全面分析ThreadLocal的应用场景、数据结构、内存泄漏问题。降低大家学习啃骨头的心理压力,希望可以帮助大家彻底掌握并应用这个核心技术到工作当中。
|
9天前
|
Java 程序员 编译器
死磕-高效的Java编程(二)
死磕-高效的Java编程(二)
|
3天前
|
Java
JAVA并发编程系列(13)Future、FutureTask异步小王子
本文详细解析了Future及其相关类FutureTask的工作原理与应用场景。首先介绍了Future的基本概念和接口方法,强调其异步计算特性。接着通过FutureTask实现了一个模拟外卖订单处理的示例,展示了如何并发查询外卖信息并汇总结果。最后深入分析了FutureTask的源码,包括其内部状态转换机制及关键方法的实现原理。通过本文,读者可以全面理解Future在并发编程中的作用及其实现细节。
|
7天前
|
Java 数据处理 调度
Java中的多线程编程:从基础到实践
本文深入探讨了Java中多线程编程的基本概念、实现方式及其在实际项目中的应用。首先,我们将了解什么是线程以及为何需要多线程编程。接着,文章将详细介绍如何在Java中创建和管理线程,包括继承Thread类、实现Runnable接口以及使用Executor框架等方法。此外,我们还将讨论线程同步和通信的问题,如互斥锁、信号量、条件变量等。最后,通过具体的示例展示了如何在实际项目中有效地利用多线程提高程序的性能和响应能力。
|
7天前
|
安全 算法 Java
Java中的多线程编程:从基础到高级应用
本文深入探讨了Java中的多线程编程,从最基础的概念入手,逐步引导读者了解并掌握多线程开发的核心技术。无论是初学者还是有一定经验的开发者,都能从中获益。通过实例和代码示例,本文详细讲解了线程的创建与管理、同步与锁机制、线程间通信以及高级并发工具等主题。此外,还讨论了多线程编程中常见的问题及其解决方案,帮助读者编写出高效、安全的多线程应用程序。
|
Java API
java并发编程笔记--Executor相关API整理
Executor框架是concurrent包提供的用于执行线程任务的框架,它基于生产者-消费者模式实现,将提交任务的线程和执行任务的线程解耦。提交任务的线程视作生产者,执行任务的线程视作消费者。任务的执行策略可以通过定制不同的消费者实现,比如:任务可以同步执行,也可以异步执行;任务可以按照编排优先级,高优先级的任务可以优先执行;任务可以延迟执行或者按周期执行...这些实现对于生产者而言透明,生产者无需关注消费者的具体实现,仅需要按照业务需求提交任务即可。
3420 0
|
9天前
|
存储 缓存 Java
java线程内存模型底层实现原理
java线程内存模型底层实现原理
java线程内存模型底层实现原理
|
13天前
|
缓存 Java 应用服务中间件
Java虚拟线程探究与性能解析
本文主要介绍了阿里云在Java-虚拟-线程任务中的新进展和技术细节。
下一篇
无影云桌面