2w字 + 41张图带你参透并发编程!(四)

简介: 在计算机最早期的时候,没有操作系统,执行程序只需要一种方式,那就是从头到尾依次执行。任何资源都会为这个程序服务,在计算机使用某些资源时,其他资源就会空闲,就会存在 浪费资源 的情况。

使用线程池来创建线程

首先先来认识一下顶级接口 Executor,Executor 虽然不是传统线程创建的方式之一,但是它却成为了创建线程的替代者,使用线程池的好处如下

  • 利用线程池能够复用线程、控制最大并发数。
  • 实现任务线程队列缓存策略拒绝机制
  • 实现某些与时间相关的功能,如定时执行、周期执行等。
  • 隔离线程环境。比如,交易服务和搜索服务在同一台服务器上,分别开启两个线程池,交易线程的资源消耗明显要大;因此,通过配置独立的线程池,将较慢的交易服务与搜索服务隔开,避免服务线程互相影响。

你可以使用如下操作来替换线程创建

new Thread(new(RunnableTask())).start()
// 替换为
Executor executor = new ExecutorSubClass() // 线程池实现类;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

ExecutorService 是 Executor 的默认实现,也是 Executor 的扩展接口,ThreadPoolExecutor 类提供了线程池的扩展实现。Executors 类为这些 Executor 提供了方便的工厂方法。下面是使用 ExecutorService 创建线程的几种方式

CachedThreadPool

从而简化了并发编程。Executor 在客户端和任务之间提供了一个间接层;与客户端直接执行任务不同,这个中介对象将执行任务。Executor 允许你管理异步任务的执行,而无须显示地管理线程的生命周期。

public static void main(String[] args) {
  ExecutorService service = Executors.newCachedThreadPool();
  for(int i = 0;i < 5;i++){
    service.execute(new TestThread());
  }
  service.shutdown();
}

CachedThreadPool 会为每个任务都创建一个线程。

注意:ExecutorService 对象是使用静态的 Executors 创建的,这个方法可以确定 Executor 类型。对 shutDown 的调用可以防止新任务提交给 ExecutorService ,这个线程在 Executor 中所有任务完成后退出。

FixedThreadPool

FixedThreadPool 使你可以使用有限的线程集来启动多线程

public static void main(String[] args) {
  ExecutorService service = Executors.newFixedThreadPool(5);
  for(int i = 0;i < 5;i++){
    service.execute(new TestThread());
  }
  service.shutdown();
}

有了 FixedThreadPool 使你可以一次性的预先执行高昂的线程分配,因此也就可以限制线程的数量。这可以节省时间,因为你不必为每个任务都固定的付出创建线程的开销。

SingleThreadExecutor

SingleThreadExecutor 就是线程数量为 1的 FixedThreadPool,如果向 SingleThreadPool 一次性提交了多个任务,那么这些任务将会排队,每个任务都会在下一个任务开始前结束,所有的任务都将使用相同的线程。SingleThreadPool 会序列化所有提交给他的任务,并会维护它自己(隐藏)的悬挂队列。

public static void main(String[] args) {
  ExecutorService service = Executors.newSingleThreadExecutor();
  for(int i = 0;i < 5;i++){
    service.execute(new TestThread());
  }
  service.shutdown();
}

从输出的结果就可以看到,任务都是挨着执行的。我为任务分配了五个线程,但是这五个线程不像是我们之前看到的有换进换出的效果,它每次都会先执行完自己的那个线程,然后余下的线程继续走完这条线程的执行路径。你可以用 SingleThreadExecutor 来确保任意时刻都只有唯一一个任务在运行。

休眠

影响任务行为的一种简单方式就是使线程 休眠,选定给定的休眠时间,调用它的 sleep() 方法, 一般使用的TimeUnit 这个时间类替换 Thread.sleep() 方法,示例如下:

public class SuperclassThread extends TestThread{
    @Override
    public void run() {
        System.out.println(Thread.currentThread() + "starting ..." );
        try {
            for(int i = 0;i < 5;i++){
                if(i == 3){
                    System.out.println(Thread.currentThread() + "sleeping ...");
                    TimeUnit.MILLISECONDS.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread() + "wakeup and end ...");
    }
    public static void main(String[] args) {
        ExecutorService executors = Executors.newCachedThreadPool();
        for(int i = 0;i < 5;i++){
            executors.execute(new SuperclassThread());
        }
        executors.shutdown();
    }
}

关于 TimeUnit 中的 sleep() 方法和 Thread.sleep() 方法的比较,请参考下面这篇博客

(https://www.cnblogs.com/xiadongqing/p/9925567.html)

优先级

上面提到线程调度器对每个线程的执行都是不可预知的,随机执行的,那么有没有办法告诉线程调度器哪个任务想要优先被执行呢?你可以通过设置线程的优先级状态,告诉线程调度器哪个线程的执行优先级比较高,「请给这个骑手马上派单」,线程调度器倾向于让优先级较高的线程优先执行,然而,这并不意味着优先级低的线程得不到执行,也就是说,优先级不会导致死锁的问题。优先级较低的线程只是执行频率较低。

public class SimplePriorities implements Runnable{
    private int priority;
    public SimplePriorities(int priority) {
        this.priority = priority;
    }
    @Override
    public void run() {
        Thread.currentThread().setPriority(priority);
        for(int i = 0;i < 100;i++){
            System.out.println(this);
            if(i % 10 == 0){
                Thread.yield();
            }
        }
    }
    @Override
    public String toString() {
        return Thread.currentThread() + " " + priority;
    }
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        for(int i = 0;i < 5;i++){
            service.execute(new SimplePriorities(Thread.MAX_PRIORITY));
        }
        service.execute(new SimplePriorities(Thread.MIN_PRIORITY));
    }
}

toString() 方法被覆盖,以便通过使用 Thread.toString() 方法来打印线程的名称。你可以改写线程的默认输出,这里采用了 「Thread[pool-1-thread-1,10,main]」 这种形式的输出。

通过输出,你可以看到,最后一个线程的优先级最低,其余的线程优先级最高。注意,优先级是在 run 开头设置的,在构造器中设置它们不会有任何好处,因为这个时候线程还没有执行任务。

尽管 JDK 有 10 个优先级,但是一般只有「MAX_PRIORITY,NORM_PRIORITY,MIN_PRIORITY」 三种级别。

作出让步

我们上面提过,如果知道一个线程已经在 run() 方法中运行的差不多了,那么它就可以给线程调度器一个提示:我已经完成了任务中最重要的部分,可以让给别的线程使用 CPU 了。这个暗示将通过 yield() 方法作出。

有一个很重要的点就是,Thread.yield() 是建议执行切换CPU,而不是强制执行CPU切换。

对于任何重要的控制或者在调用应用时,都不能依赖于 yield()方法,实际上, yield() 方法经常被滥用。

后台线程

后台(daemon)线程,是指运行时在后台提供的一种服务线程,这种线程不是属于必须的。当所有非后台线程结束时,程序也就停止了,**同时会终止所有的后台线程。**反过来说,只要有任何非后台线程还在运行,程序就不会终止。

public class SimpleDaemons implements Runnable{
    @Override
    public void run() {
        while (true){
            try {
                TimeUnit.MILLISECONDS.sleep(100);
                System.out.println(Thread.currentThread() + " " + this);
            } catch (InterruptedException e) {
                System.out.println("sleep() interrupted");
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        for(int i = 0;i < 10;i++){
            Thread daemon = new Thread(new SimpleDaemons());
            daemon.setDaemon(true);
            daemon.start();
        }
        System.out.println("All Daemons started");
        TimeUnit.MILLISECONDS.sleep(175);
    }
}

在每次的循环中会创建 10 个线程,并把每个线程设置为后台线程,然后开始运行,for 循环会进行十次,然后输出信息,随后主线程睡眠一段时间后停止运行。在每次 run 循环中,都会打印当前线程的信息,主线程运行完毕,程序就执行完毕了。因为 daemon 是后台线程,无法影响主线程的执行。

但是当你把 daemon.setDaemon(true) 去掉时,while(true) 会进行无限循环,那么主线程一直在执行最重要的任务,所以会一直循环下去无法停止。

ThreadFactory

按需要创建线程的对象。使用线程工厂替换了 Thread 或者 Runnable 接口的硬连接,使程序能够使用特殊的线程子类,优先级等。一般的创建方式为

class SimpleThreadFactory implements ThreadFactory {
  public Thread newThread(Runnable r) {
    return new Thread(r);
  }
}

Executors.defaultThreadFactory 方法提供了一个更有用的简单实现,它在返回之前将创建的线程上下文设置为已知值

ThreadFactory是一个接口,它只有一个方法就是创建线程的方法

public interface ThreadFactory {
    // 构建一个新的线程。实现类可能初始化优先级,名称,后台线程状态和 线程组等
    Thread newThread(Runnable r);
}

下面来看一个 ThreadFactory 的例子

public class DaemonThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setDaemon(true);
        return t;
    }
}
public class DaemonFromFactory implements Runnable{
    @Override
    public void run() {
        while (true){
            try {
                TimeUnit.MILLISECONDS.sleep(100);
                System.out.println(Thread.currentThread() + " " + this);
            } catch (InterruptedException e) {
                System.out.println("Interrupted");
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool(new DaemonThreadFactory());
        for(int i = 0;i < 10;i++){
            service.execute(new DaemonFromFactory());
        }
        System.out.println("All daemons started");
        TimeUnit.MILLISECONDS.sleep(500);
    }
}

Executors.newCachedThreadPool 可以接受一个线程池对象,创建一个根据需要创建新线程的线程池,但会在它们可用时重用先前构造的线程,并在需要时使用提供的 ThreadFactory 创建新线程。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>(),
                                threadFactory);
}

加入一个线程

一个线程可以在其他线程上调用 join() 方法,其效果是等待一段时间直到第二个线程结束才正常执行。如果某个线程在另一个线程 t 上调用 t.join() 方法,此线程将被挂起,直到目标线程 t 结束才回复(可以用 t.isAlive() 返回为真假判断)。

也可以在调用 join 时带上一个超时参数,来设置到期时间,时间到期,join方法自动返回。

对 join 的调用也可以被中断,做法是在线程上调用 interrupted 方法,这时需要用到 try...catch 子句

public class TestJoinMethod extends Thread{
    @Override
    public void run() {
        for(int i = 0;i < 5;i++){
            try {
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println("Interrupted sleep");
            }
            System.out.println(Thread.currentThread() + " " + i);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        TestJoinMethod join1 = new TestJoinMethod();
        TestJoinMethod join2 = new TestJoinMethod();
        TestJoinMethod join3 = new TestJoinMethod();
        join1.start();
//        join1.join();
        join2.start();
        join3.start();
    }
}

join() 方法等待线程死亡。换句话说,它会导致当前运行的线程停止执行,直到它加入的线程完成其任务。

线程异常捕获

由于线程的本质,使你不能捕获从线程中逃逸的异常,一旦异常逃出任务的 run 方法,它就会向外传播到控制台,除非你采取特殊的步骤捕获这种错误的异常,在 Java5 之前,你可以通过线程组来捕获,但是在 Java 5 之后,就需要用 Executor 来解决问题,因为线程组不是一次好的尝试。

下面的任务会在 run 方法的执行期间抛出一个异常,并且这个异常会抛到 run 方法的外面,而且 main 方法无法对它进行捕获

public class ExceptionThread implements Runnable{
    @Override
    public void run() {
        throw new RuntimeException();
    }
    public static void main(String[] args) {
        try {
            ExecutorService service = Executors.newCachedThreadPool();
            service.execute(new ExceptionThread());
        }catch (Exception e){
            System.out.println("eeeee");
        }
    }
}

为了解决这个问题,我们需要修改 Executor 产生线程的方式,Java5 提供了一个新的接口 Thread.UncaughtExceptionHandler ,它允许你在每个 Thread 上都附着一个异常处理器。Thread.UncaughtExceptionHandler.uncaughtException() 会在线程因未捕获临近死亡时被调用。

public class ExceptionThread2 implements Runnable{
    @Override
    public void run() {
        Thread t = Thread.currentThread();
        System.out.println("run() by " + t);
        System.out.println("eh = " + t.getUncaughtExceptionHandler());
       // 手动抛出异常
        throw new RuntimeException();
    }
}
// 实现Thread.UncaughtExceptionHandler 接口,创建异常处理器
public class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println("caught " + e);
    }
}
public class HandlerThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        System.out.println(this + " creating new Thread");
        Thread t = new Thread(r);
        System.out.println("created " + t);
        t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
        System.out.println("ex = " + t.getUncaughtExceptionHandler());
        return t;
    }
}
public class CaptureUncaughtException {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool(new HandlerThreadFactory());
        service.execute(new ExceptionThread2());
    }
}

在程序中添加了额外的追踪机制,用来验证工厂创建的线程会传递给UncaughtExceptionHandler,你可以看到,未捕获的异常是通过 uncaughtException 来捕获的。

相关文章
|
设计模式
110.【十万字带你深入学习23种设计模式】(二十二)
110.【十万字带你深入学习23种设计模式】
63 1
110.【十万字带你深入学习23种设计模式】(二十二)
|
设计模式 Java
110.【十万字带你深入学习23种设计模式】(二十六)
110.【十万字带你深入学习23种设计模式】
83 1
|
设计模式 应用服务中间件 智能硬件
110.【十万字带你深入学习23种设计模式】(十七)
110.【十万字带你深入学习23种设计模式】
67 0
|
设计模式 存储 Java
110.【十万字带你深入学习23种设计模式】(十一)
110.【十万字带你深入学习23种设计模式】
65 0
|
设计模式 存储 Java
110.【十万字带你深入学习23种设计模式】(二十五)
110.【十万字带你深入学习23种设计模式】
83 1
|
设计模式 Java 程序员
110.【十万字带你深入学习23种设计模式】(十四)
110.【十万字带你深入学习23种设计模式】
72 0
110.【十万字带你深入学习23种设计模式】(十四)
|
存储 缓存 监控
|
存储 算法 安全
终于有人用1.6W字把分布式算法的系统模型给彻底讲明白了!
系统模型-进程 在真实的分布式系统中,可能存在服务器(节点)、处理器、进程、线程等并发执行的实体。 在分布式算法中,这些实体都被抽象为进程。 注意,这里的进程与操作系统中的进程不完全是一个概念,后者侧重于描述一组资源的集合,例如文件句柄、地址空间、数据、代码等,还可以有多个线程,而前者是一个有状态的自动机。
957 3
|
设计模式 Java
110.【十万字带你深入学习23种设计模式】(二十三)
110.【十万字带你深入学习23种设计模式】
59 0
|
消息中间件 缓存 监控
7000字+24张图带你彻底弄懂线程池(1)
7000字+24张图带你彻底弄懂线程池(1)

热门文章

最新文章