Java并发编程学习10-任务执行与Executor框架

简介: 【4月更文挑战第12天】本篇 重点讲解任务执行和 Executor框架的基础知识

java-concurrency-logo.png

任务执行

何为任务? 任务通常是一些抽象且离散的工作单元。

大多数并发应用程序都是围绕着 “任务执行” 来构造的。而围绕着 “任务执行” 来设计应用程序结构时,首先要做的就是要找出清晰的任务边界。大多数服务器应用程序都提供了一种自然的任务边界选择方式:以独立的客户请求为边界。将独立的请求作为任务边界,既可以实现任务的独立性,又可以实现合理的任务规模。

1. 串行地执行任务

在应用程序中可以通过多种策略来调度任务,其中最简单的策略就是在单个线程中串行地执行各项任务。

下面我们来看如下的代码示例【SingleThreadWebServer 将串行地处理它的任务(即通过 80 端口接收到的 HTTP 请求)】:

public class SingleThreadWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            Socket connection = socket.accept();
            handleRequest(connection);
        }
    }
}

在实际执行中,上述示例执行性能非常糟糕,因为它每次只能处理一个请求。在服务器应用程序中,串行处理机制通常都无法提供高吞吐率或快速响应性。

在某些情况下,串行处理方式能带来简单性或安全性。大多数 GUI 框架都通过单一的线程来串行地处理任务。后面的博文我们会再次介绍串行模型。

2. 显式地为任务创建线程

下面我们来看如下的代码示例【通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性】:

public class ThreadPerTaskWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            new Thread(task).start();
        }
    }
}

上述 ThreadPerTaskWebServer 与前面的 单线程版本 SingleThreadWebServer 在结构上类似,主线程仍然不断地交替执行 “接受外部连接” 与 “处理相关请求” 等操作。区别在于,ThreadPerTaskWebServer 对于每个连接,主循环都将创建一个新线程来处理请求,而不是在主循环中进行处理。

在正常负载情况下,“为每个任务分配一个线程” 的方法能提升串行执行的性能【即请求的到达速率不超出服务器的请求处理能力】。

3. 无限制创建线程的不足

当需要创建大量的线程时,“为每个任务分配一个线程” 就存在如下的问题了:

  • 线程生命周期的开销非常高。创建线程需要时间,又会延迟请求的处理。如果请求的到达率非常高且请求的处理过程是轻量级的,那么为每个请求创建一个新线程将消耗大量的计算资源。
  • 资源消耗。活跃的线程会消耗系统资源,尤其是内存。如果可运行的线程数量多于可用处理器的数量,那么有些线程将闲置。大量空闲的线程会占用许多内存,给垃圾回收器带来压力,而且大量线程在竞争 CPU 资源时还将产生其他的性能开销。
  • 稳定性。可创建线程的数量存在着一个限制,该限制随着平台不同而不同,并且受多个因素制约,包括JVM的启动参数、Thread 构造函数中请求的栈大小,以及底层操作系统对线程的限制等。如果破坏了这些限制,应用可能就会抛出 OutOfMemoryError 异常。

在一定的范围内,增加线程可以提高系统的吞吐率,但如果超出了这个范围,再创建再多的线程也只会降低程序的执行速度,甚至最后整个应用程序都将崩溃。

Executor框架

任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。

前面我们已经分析了两种通过线程来执行任务的策略:

  • 把所有任务放在单个线程中串行执行【它的问题在于有着糟糕的响应性和吞吐量】
  • 将每个任务放在各自的线程中执行【它的问题在于资源管理的复杂性】

Java 类库中,任务执行的接口是 Executor,如下:

public interface Executor {
    void execute(Runnable command);
}

上述 Executor 虽然简单,但它却为 java.util.concurrent 下的异步任务执行框架提供了基础,该框架有着如下的特点:

  • 支持多种不同类型的任务执行策略。
  • 提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用 Runnable 来表示任务。

Executor 的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。

Executor 基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。

1. 基于 Executor 的 Web 服务器

下面我们来看下如下的示例【用 Executor 代替了硬编码的线程,采用了一个固定长度的线程池,可以容纳 100 个线程】:

public class TaskExecutionWebServer {
    private static final int NTHREADS = 100;
    
    private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
    
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            exec.execute(task);
        }
    }
}

上述 TaskExecutionWebServer 采用 Executor,将请求处理任务的提交与任务的实际执行解耦开来,并且只需采用另一种不同的 Executor 实现,就可以改变服务器的行为。

下面我们来将 TaskExecutionWebServer 修改为类似前面 ThreadPerTaskWebServer 的行为,只需要使用一个为每个请求都创建新线程的 Executor

public class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}

public class TaskExecutionWebServer {
    
    private static final Executor exec = new ThreadPerTaskExecutor();
    
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            exec.execute(task);
        }
    }
}

同样,还可以编写一个 Executor,使得 TaskExecutionWebServer 的行为类似于单线程的行为,即以同步的方式执行每个任务,然后再返回。

public class WithInThreadExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}

public class TaskExecutionWebServer {
    
    private static final Executor exec = new WithInThreadExecutor();
    
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            exec.execute(task);
        }
    }
}

从上面可以了解到,将任务的提交与执行解耦开来,通过简单的改动就可以为某种类型的任务指定和修改执行策略。

2. 执行策略

各种执行策略本质上都是一种资源管理工具,最佳策略还得取决于可用的计算资源以及对服务质量的需求。

  • 通过限制并发任务的数量,可以确保应用程序不会由于资源耗尽而失败,或者由于在稀缺资源上发生竞争而严重影响性能。
  • 通过将任务的提交与任务的执行策略分离开来,有助于在部署阶段选择与可用硬件资源最匹配的执行策略。

3. 线程池

线程池,是指管理一组同构工作线程的资源池。线程池与工作队列密切相关,其在工作队列中保存了所有等待执行的任务。

工作者线程是如何工作的呢?

它从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

采用线程池的好处有哪些呢?

  • 通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。
  • 当请求到达时,工作线程通常已经存在了,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。
  • 通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存而失败。

Java 类库中,可以通过调用 Executors 中的静态工厂方法来创建一个线程池:

  • newFixedThreadPool。它将创建一个固定长度的线程池,每提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程由于发生了未预期的 Exception 而结束,那么线程池会补充一个新的线程)。
  • newCachedThreadPool。它将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
  • newSingleThreadExecutor。它是一个单线程的 Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。它能确保依照任务在队列中的顺序来串行执行(例如 FIFOLIFO优先级)。
  • newScheduledThreadPool。它创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于 Timer

4. Executor 的生命周期

由于 Executor 以异步方式来执行任务,因此在任何时刻,之前提交的任务的状态并不是立即可见的。这些任务可能的状态如下:

  • 已经完成
  • 正在运行
  • 在队列中等待执行

为了管理任务的不同状态,Executor 扩展了 ExecutorService 接口,添加了一些用于生命周期管理的方法,如下所示:

public interface ExecutorService extends Executor {
    void shutdown();
    
    List<Runnable> shutdownNow();
    
    boolean isShutdown();
    
    boolean isTerminated();
    
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    // ...其他用于任务提交的方法
}

ExecutorService 的生命周期有 3 种状态:

  • 运行
  • 关闭
  • 已终止

ExecutorService 在初始创建时处于运行状态。shutdown 方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成--包括那些还未开始执行的任务。shutdownNow 方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

ExecutorService 关闭后提交的任务将由 “拒绝执行处理器(Rejected Execution Handler)” 来处理,它会抛弃任务,或者让 execute 方法抛出一个未检查的 RejectedExecutionException。等所有任务都完成后,ExecutorService 将转入终止状态。可以调用 awaitTermination 来等待 ExecutorService 到达终止状态,或者通过调用 isTerminated 来轮询 ExecutorService 是否已经终止。通常在调用 awaitTermination 之后会立即调用 shutdown,从而产生同步地关闭 ExecutorService 的效果。

下面我们来看下如下的示例【通过增加生命周期支持来扩展 Web 服务器的功能】;

public class LifecycleWebServer {
    private final Executor exec = Executors.newCachedThreadPool();
    
    public void start() throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (!exec.isShutdown()) {
            try {
                final Socket connection = socket.accept();
                exec.execute(new Runnable() {
                    public void run() {
                        handleRequest(connection);
                    }
                });
            } catch (RejectedExecutionException e) {
            if (!exec.isShutdown()) 
            
            }
        }
    }
    
    public void stop() {
        exec.shutdown();
    }
    
    void handleRequest(Socket connection) {
        Request request = readRequest(connection);
        if (isShutdownRequest(request))
            stop();
        else
            dispatchRequest(request);
    }
}

上述 LifecycleWebServer 可以通过两种方式来关闭 Web 服务器:

  • 在程序中直接调用 stop 方法。
  • 以客户端请求形式向 Web 服务器发送一个特定格式的HTTP请求。

5. 延迟任务与周期任务

Java 类库中,Timer 类负责管理延迟任务【指定时间后执行该任务】以及周期任务【指定周期执行一次该任务】。

不过,Timer 使用上存在着如下的缺陷 :

  • Timer 在执行所有的定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他 TimerTask 的定时准确性。
  • Timer 在执行任务时并不捕获异常。因此当 TimerTask 抛出了一个未检查的异常时,将会终止定时线程。这时,Timer 也不会恢复线程的执行,而是会错误地认为整个 Timer 都被取消了。已经被调度但尚未执行的 TimerTask 将不会再执行,新的任务也不能被调度。【这个问题也称之为 “线程泄漏”,后续的博文将会介绍】
Timer 支持基于绝对时间而不是相对时间的调度机制,因此任务的执行对系统时钟变化很敏感,而 ScheduledThreadPoolExecutor 只支持基于相对时间的调度。

使用 线程池 就可以弥补上述缺陷,它可以提供多个线程来执行延时任务和周期任务。

下面我们来看一个演示 Timer 问题的示例:

public class OutOfTime {
    public static void main(String[] args) throws Exception {
        Timer timer = new Timer();
        timer.schedule(new ThrowTask(), 1);
        SECONDS.sleep(1);
        timer.schedule(new ThrowTask(), 1);
        SECONDS.sleep(5);
    }
    
    static class ThrowTask extends TimerTask {
        public void run() {
            throw new RuntimeException();
        }
    }
}

上述程序,你也许会认为运行 6 秒后退出,但实际情况是运行 1 秒就结束了,并抛出了一个异常消息 “Timer already cancelled”。 如下图所示:

image.png

注意:在 Java 5.0 或 更高的 JDK 中,将很少再使用 Timer。如果需要, ScheduledThreadPoolExecutor 是个不错的选择。

总结

本篇我们重点讲了任务执行与 Executor 框架的基础知识。Executor 帮助指定执行策略,但如果要使用 Executor,必须将任务表述为一个 Runnable【即必须定义一个清晰的任务边界】。大多数服务器应用程序中都存在一个明显的任务边界:单个客户请求。但有时候,任务边界并非是显而易见的,需要进一步的揭示其粒度更细的并发性。说到这里,那么下一遍博文就将通过 Demo 演示如何去找出可利用的并行性需求,敬请期待!!!

目录
相关文章
|
5天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
9天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
39 12
|
6天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
65 2
|
22天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
1月前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
1月前
|
安全 Java 调度
Java中的多线程编程入门
【10月更文挑战第29天】在Java的世界中,多线程就像是一场精心编排的交响乐。每个线程都是乐团中的一个乐手,他们各自演奏着自己的部分,却又和谐地共同完成整场演出。本文将带你走进Java多线程的世界,让你从零基础到能够编写基本的多线程程序。
37 1
|
1月前
|
Java 数据处理 开发者
Java多线程编程的艺术:从入门到精通####
【10月更文挑战第21天】 本文将深入探讨Java多线程编程的核心概念,通过生动实例和实用技巧,引导读者从基础认知迈向高效并发编程的殿堂。我们将一起揭开线程管理的神秘面纱,掌握同步机制的精髓,并学习如何在实际项目中灵活运用这些知识,以提升应用性能与响应速度。 ####
51 3
|
2月前
|
Java
Java中的多线程编程:从入门到精通
本文将带你深入了解Java中的多线程编程。我们将从基础概念开始,逐步深入探讨线程的创建、启动、同步和通信等关键知识点。通过阅读本文,你将能够掌握Java多线程编程的基本技能,为进一步学习和应用打下坚实的基础。
|
4月前
|
算法 Java 开发者
Java 编程入门:从零到一的旅程
本文将带领读者开启Java编程之旅,从最基础的语法入手,逐步深入到面向对象的核心概念。通过实例代码演示,我们将一起探索如何定义类和对象、实现继承与多态,并解决常见的编程挑战。无论你是编程新手还是希望巩固基础的开发者,这篇文章都将为你提供有价值的指导和灵感。
|
4月前
|
机器学习/深度学习 Java TensorFlow
深度学习中的图像识别:从理论到实践Java中的多线程编程入门指南
【8月更文挑战第29天】本文将深入探讨深度学习在图像识别领域的应用,从基础理论到实际应用案例,带领读者一步步理解如何利用深度学习技术进行图像识别。我们将通过一个简单的代码示例,展示如何使用Python和TensorFlow库实现一个基本的图像识别模型。无论你是初学者还是有一定经验的开发者,都能从中获得启发和学习。 【8月更文挑战第29天】在Java世界里,线程是程序执行的最小单元,而多线程则是提高程序效率和响应性的关键武器。本文将深入浅出地引导你理解Java多线程的核心概念、创建方法以及同步机制,帮助你解锁并发编程的大门。