解决方案
问题根源找到了,解决的方法其实就非常的简单了,采取了自定义线程池参数。
在我们的修复方案中,选择的就是有界队列,虽然会有部分任务被丢失,但是我们线上是排序日志搜集任务,所以对部分对丢失是可以容忍的。
Java提供的四种常用线程池解析 Executors
既然楼主踩坑就是使用了 JDK 的默认实现,那么再来看看这些默认实现到底干了什么,封装了哪些参数。简而言之 Executors 工厂方法Executors.newCachedThreadPool() 提供了无界线程池,可以进行自动线程回收;Executors.newFixedThreadPool(int) 提供了固定大小线程池,内部使用无界队列;Executors.newSingleThreadExecutor() 提供了单个后台线程。
newCachedThreadPool:可缓存线程池
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
这种类型的线程池特点是:
工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
public class Main { public static void main(String[] args) { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; try { Thread.sleep(index * 100); } catch (Exception e) { e.printStackTrace(); } cachedThreadPool.execute(() -> System.out.println(index + "当前线程" + Thread.currentThread().getName())); } } } 输出: 0当前线程pool-1-thread-1 1当前线程pool-1-thread-1 2当前线程pool-1-thread-1 3当前线程pool-1-thread-1 4当前线程pool-1-thread-1 5当前线程pool-1-thread-1 6当前线程pool-1-thread-1 7当前线程pool-1-thread-1 8当前线程pool-1-thread-1 9当前线程pool-1-thread-1
发现10个线程都是使用的线程1,线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
看代码一目了然了,线程数量固定,使用无限大的队列。再次强调,楼主就是踩的这个无限大队列的坑。
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
在来看看ScheduledThreadPoolExecutor()的构造函数:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
ScheduledThreadPoolExecutor的父类即ThreadPoolExecutor,因此这里各参数含义和上面一样。值得关心的是DelayedWorkQueue这个阻塞对列。
它作为静态内部类就在ScheduledThreadPoolExecutor中进行了实现。简单的说,DelayedWorkQueue是一个无界队列,它能按一定的顺序对工作队列中的元素进行排列。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
注意:该静态方法,禁止使用,因为里面有不少坑,这里不做过多解释
关于线程池的阻塞队列的各种用法,请参见博文:
【小家java】BlockingQueue阻塞队列详解以及5大实现(ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue…)
结束语
虽然之前学习了不少相关知识,但是只有在实践中踩坑才能印象深刻吧
可以通过Executors静态工厂构建线程池,但一般不建议这样使用。
附:ThreadFactory简单介绍
ThreadFactory是一个线程工厂。用来创建线程。这里为什么要使用线程工厂呢?其实就是为了统一在创建线程时设置一些参数,如是否守护线程。线程一些特性等,如优先级。通过这个TreadFactory创建出来的线程能保证有相同的特性。它首先是一个接口类,而且方法只有一个。就是创建一个线程。
public interface ThreadFactory { Thread newThread(Runnable r); }
所以我们可以自己实现这个工厂,然后定制属于我们自己的一类线程
class MyThreadFactory implements ThreadFactory { private int counter; private String name; private List<String> stats; public MyThreadFactory(String name) { counter = 0; this.name = name; stats = new ArrayList<String>(); } @Override public Thread newThread(Runnable run) { Thread t = new Thread(run, name + "-Thread-" + counter); counter++; stats.add(String.format("Created thread %d with name %s on%s\n",t.getId(), t.getName(), new Date())); return t; } public String getStas() { StringBuffer buffer = new StringBuffer(); Iterator<String> it = stats.iterator(); while (it.hasNext()) { buffer.append(it.next()); buffer.append("\n"); } return buffer.toString(); } } //使用: MyThreadFactory factory = new MyThreadFactory("MyThreadFactory"); Thread thread = factory.newThread(new MyTask(i)); thread.start();