Java利用线程工厂监控线程池

简介: Java利用线程工厂监控线程池

ThreadFactory

线程池中的线程从哪里来呢?就是ThreadFoctory

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

Threadfactory里面有个接口,当线程池中需要创建线程就会调用该方法,也可以自定义线程工厂

public class ThreadfactoryText {
    public static void main(String[] args) {
        Runnable runnable=new Runnable() {
            @Override
            public void run() {
                int num=new Random().nextInt(10);
                System.out.println(Thread.currentThread().getId()+"--"+System.currentTimeMillis()+"--睡眠"+num);
                try {
                    TimeUnit.SECONDS.sleep(num);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //创建线程池 使用自定义线程工厂 采用默认的拒绝策略
        ExecutorService executorService=new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t=new Thread(r);
                t.setDaemon(true);//设置为守护线程,当主线程运行结束,线程池中线程也会被释放
                System.out.println("创建了线程"+t);
                return t;
            }
        });
        //提交五个任务
        for (int i = 0; i < 5; i++) {
            executorService.submit(runnable);
        }
    }
}

当线程提交超过五个任务时,线程池会默认抛出异常

监控线程池

ThreadPoolExcutor提供了一组方法用于监控线程池

int getActiveCount()//获得线程池只当前的获得线程数量
long getCompletedTaskCount()//返回线程池完成任务数量
int getCorePoolSize()//线程池中核心任务数量
int getLargestPoolSize() //返回线程池中曾经达到线程的最大数
int getMaximumPoolSize()//返回线程池的最大容量
int getPoolSize()//返回线程大小
BlockingQueue<Runnable> getQueue()//返回阻塞队列
long getTaskCount()//返回线程池收到任务总数
public class Text {
    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getId() + "线程开始执行--" + System.currentTimeMillis());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //创建线程池 使用默认线程工厂 有界队列  采用DiscardPolicy策略
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());
        //提交五个任务
        for (int i = 0; i < 30; i++) {
            executorService.submit(runnable);
            System.out.println("当前线程核心线程数"+executorService.getCorePoolSize()+",最大线程数:"+executorService.getMaximumPoolSize()+",当前线程池大小:"+executorService.getPoolSize()+"活动线程数:"+executorService.getActiveCount()+",收到任务:"+executorService.getTaskCount()+"完成任务数:"+executorService.getCompletedTaskCount()+"等待任务数:"+executorService.getQueue().size());
            TimeUnit.MILLISECONDS.sleep(500);
        }
        System.out.println("-------------------");
        while (executorService.getActiveCount()>=0)//继续对线程池进行检测
        {
          System.out.println("当前线程核心线程数"+executorService.getCorePoolSize()+",最大线程数:"+executorService.getMaximumPoolSize()+",当前线程池大小:"+executorService.getPoolSize()+"活动线程数:"+executorService.getActiveCount()+",收到任务:"+executorService.getTaskCount()+"完成任务数:"+executorService.getCompletedTaskCount()+"等待任务数:"+executorService.getQueue().size());
            Thread.sleep(1000);//每1秒检测一次
        }
    }
}

当线程池大小达到了核心线程数,线程会被放在等待队列。当线程池等待队列已满会开启新的线程。当当前线程大小达到最大线程数,等待队列也满了,再提交的话会执行DiscardPolicy策略,直接丢弃这个无法处理的任务,最后30个任务只剩下15个了。

原理如图:

扩展线程池

有时候需要对线程池进行扩展,如在监控每个任务开始和结束时间,或者自定义其他增强功能。

ThreadPoolExecutor线程池提供了两个方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

线程池执行某个任务前会执行beforeExecute()方法,执行后会调用afterExecute()方法

查看ThreadPoolExecutor源码,在该类中定义了一个内部类Worker,ThreadPoolExecutor线程池的工作线程就是Worker类的实例,Worker实例在执行时会调用beforeExecute与afterExecute方法。

public void run() {
            runWorker(this);
}
final void runWorker(Worker w) {
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
    }

部分代码已省略,线程执行前会调用beforeExecute,执行后会调用afterExecute方法。

扩展线程池示例

package com;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Text07 {
    public static void main(String[] args) {
        //定义扩展线程池 定义线程池类继承ThreadPoolExecutor,然后重写其他方法
        ExecutorService threadPoolExecutor=
 new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new LinkedBlockingDeque<>()){
     //在内部类重写开始方法
     @Override
     protected void beforeExecute(Thread t, Runnable r) {
         System.out.println(t.getId()+"线程准备执行任务"+((Mytask)r).name);
     }
     //在内部类重写结束方法
     @Override
     protected void afterExecute(Runnable r, Throwable t) {
         System.out.println(((Mytask)r).name+"执行完成");
     }
     //线程池退出
     @Override
     protected void terminated() {
         System.out.println("线程池退出");
     }
 };
        for (int i = 0; i < 5; i++) {
            Mytask mytask=new Mytask("Thread"+i);
            threadPoolExecutor.execute(mytask);
        }
    }
    private  static  class  Mytask implements Runnable
    {
        private  String name;
        public  Mytask(String name)
        {
            this.name=name;
        }
        @Override
        public void run() {
            System.out.println(name+"正在被执行"+Thread.currentThread().getId());
            try {
                Thread.sleep(1000);//模拟任务时长
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

优化线程池大小

线程池大小对系统性能有一定影响,过大或者过小都无法方法发挥系统最佳性能,不需要非常精确,只要避免极大或者极小就可以了,一般来说线程池大小大姚考虑CPU数量

线程池大小=CPU数量 * 目标CPU使用率*(1+等待时间与计算时间的比)

线程池死锁

如果线程池执行中,任务A在执行过程中提交了任务B,任务B添加到线程池中的等待队列,如果A的结束需要B的执行结果,而B线程需要等待A线程执行完毕,就可能会使其他所有工作线程都处于等待状态,待这些任务在阻塞队列中执行。线程池中没有可以对阻塞队列进行处理的线程,就会一直等待下去照成死锁。

适合给线程池提交相互独立的任务,而不是彼此依赖的任务,对于彼此依赖的任务,可以考虑分别提交给不同的线程池来处理。

线程池异常信息捕获

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Text09 {
    public static void main(String[] args) {
        //创建线程池
        ExecutorService executorService=new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new SynchronousQueue<>());
        //向线程池中添加两个数相处计算的任务
        for (int i = 0; i <5 ; i++) {
            executorService.submit(new Text(10,i));
        }
    }
    private  static class  Text implements  Runnable
    {
        private  int x;
        private  int y;
        public  Text(int x,int y)
        {
            this.x=x;
            this.y=y;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"线程x/y结果的为"+x+"/"+y+"="+(x/y));
        }
    }
}

可以看到只有四条结果,实际向线程池提交了五个任务,但是当i==0时,产生了算术异常,线程池把该异常吃掉了,导致我们对该异常一无所知

解决办法:

1.把submit改为execute

2.对线程池进行扩展,对submit进行包装

package com;
import java.util.concurrent.*;
public class Text09 {
    public static void main(String[] args) {
        //创建线程池  使用自定义的线程池
        ExecutorService executorService=new TranceThreadPoorExcuter(5,5,0, TimeUnit.SECONDS,new SynchronousQueue<>());
        //向线程池中添加两个数相处计算的任务
        for (int i = 0; i <5 ; i++) {
            executorService.submit(new Text(10,i));
        }
    }
    public  static class  Text implements  Runnable
    {
        public  int x;
        public  int y;
        public  Text(int x,int y)
        {
            this.x=x;
            this.y=y;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"线程x/y结果的为"+x+"/"+y+"="+(x/y));
        }
    }
    //自定义线程池类 对TranceThreadPoorExcuter进行扩展
    private  static  class  TranceThreadPoorExcuter extends  ThreadPoolExecutor
    {
        public TranceThreadPoorExcuter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        //定义一个方法用于传入两个参数 第一个是要接受的任务 第二个是Exception
        public  Runnable warp(Runnable r,Exception e)
        {
            return new Runnable() {
                @Override
                public void run() {
                    try {
                        r.run();
                    }
                    catch (Exception e1)
                    {
                        e.printStackTrace();
                        throw e1;
                    }
                }
            };
        }
        //重写submit方法
        @Override
        public Future<?> submit(Runnable task) {
            return super.submit(warp(task,new Exception("客户跟踪异常")));
        }
        //还可以重写excute方法
    }
}

此方法使用了自定义的线程池,重写线程池中的submit方法,在submit方法中,把要传入的任务参数带一个捕获异常信息的功能就可以捕获线程池异常。

相关文章
|
11天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
13天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
13天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
14天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
37 3
|
14天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
96 2
|
22天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
46 6
|
30天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
2月前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####