短频快task的java解决方案

简介: 本文探讨了Java自带WorkStealingPool的缺陷,特别是在任务中断方面的不足。普通线程池在处理短频快任务时存在锁竞争问题,导致性能损耗。文章提出了一种基于任务窃取机制的优化方案,通过设计合理的窃取逻辑和减少性能损耗,实现了任务的高效执行和资源的充分利用。最后总结了不同场景下应选择的线程池类型。

看到这个标题,大家一定很好奇,感觉这是一个重复造轮子的事情。java明明已经提供了WorkStealingPool,本身是带窃取能力的。这里就需要讲一下背景。这里主要来自WorkStealingPool的能力缺陷。

WorkStealingPool的能力缺陷

java

代码解读

复制代码


public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

java的WorkStealingPool本质是forkjoinpool。他是利用了forkjoinpool的窃取任务的能力,来达到窃取的效果。forkjoinpool在实现复杂场景的fork,join的过程,也带来了一个问题,就是他无法做到中断程序。例如task1,拆出了task2,task3,task4。现在task2正在被执行,task3还在队列里,task4已经被窃取走了。现在对整体任务1发起中断。正确的中断是task2中断,task3取消,task4中断。最终对于task1而言是被中断了。这里就有了一个问题,task4被窃取走了,怎么知道那个线程需要被中断呢?这个问题forkjoinpool也没实现。 在forkjointask的实现过程中也体现了这点。

java

代码解读

复制代码

public boolean cancel(boolean mayInterruptIfRunning) {
    return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}

在取消任务的方法中mayInterruptIfRunning直接没有被使用。注释中直接写了这个参数不生效。

arduino

代码解读

复制代码

Params:
mayInterruptIfRunning – this value has no effect in the default implementation because interrupts are not used to control cancellation.

所以自带的WorkStealingPool是无法发起中断的。只能发起取消。

普通线程池为什么满足不了场景

java自带的线程数是一个生产消费模型,生产者就是我们提交的任务,消费者线程池的线程,用来执行任务。这个从模型上看还是一个公平模型,那个线程执行完了就从队列里获取任务,这样算力能力也不会被浪费。但是这个地方有个前提,就是任务执行的时间一定是得大于从队列获取的时间。对于短频快的任务,获取任务的损耗就会变得特别明显。线程池的队列必须是一个阻塞队列。这里就涉及到锁的竞争,如下的代码是线程池获取任务的地方workQueue就是阻塞队列,poll,take都会涉及到锁的竞争。

java

代码解读

复制代码

try {
    Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
        workQueue.take();
    if (r != null)
        return r;
    timedOut = true;
} catch (InterruptedException retry) {
    timedOut = false;
}

了解了场景之后,我们想这个场景的优化方式,其实之一就是批处理,反正任务执行快,把多个任务合并成一个,这样每个线程执行的时候就是按照一批来的。锁竞争的的损耗被降低了。

上面的批量的方案运行良好的情况每个任务的执行时间都差不多。但是现实情况是无法保证的,可能因为一些不合理的操作,导致某一个任务的执行时间变长,或者某一个批次的每个任务都比其他的多一些,最终导致了整体的执行时间变长。在整体的观测线程池的线程运行情况,就会发现某一个线程一直在执行,但是其他的线程还有空闲,造成了线程算力的浪费。

这里在看窃取的能力,似乎更好的满足了这里的诉求。

基于场景的窃取

上面讲述了场景其实根因来自,批处理可能会触发不均衡的情况,我们只要在不均衡的时候,其他的空闲线程过来拿任务来跑。就满足的我们的场景。这里就有2个点,需要我们设计。

  1. 窃取的逻辑。
  2. 窃取的损耗

任务窃取的逻辑

这里我们只要在执行任务的时候,把执行的线程和同一批的task做关联即可。在创建任务的时候,把拆分好的任务和当前任务都设置在task里。线程执行完任务把task的状态设置为完成。在当前执行完成之后,在任务列表进行过滤,找出未完成的任务,窃取任务在自己的线程里执行。直到没有任务可以被执行为止。

窃取的损耗

这里我们可以参考WorkStealingPool,把任务队列设置为双端队列。当前线程的任务从队列的头获取,其他任务过来窃取的过程,从队列尾部窃取。这里优化不用引入阻塞队列,否则自身的任务执行也要过一遍锁。我们用更轻量级的方式。cas获取。这里就得借用unsafe的能力,对数组对象做cas。

例如下面的展示

自身任务的执行

|1|2|3|4|5|

执行的过程就是设置为null

|null|2|3|4|5|

其他线程来窃取就变成了

|null|2|3|null|null|

任务线程和窃取线程就避免了重量的锁操作,大家都是cas获取,当cas失败,证明已经没有可以窃取的任务了。

通过上面2个设计,就可以完成了任务的快速执行,还能线程的算力不浪费。而且在取消的任务的时候,保证更多的任务再被执行。

总结

如果任务要全部不中断的执行完,可以使用 WorkStealingPool。

如果任务执行时间长(大于锁的损耗),可以使用threadpool。

如果任务执行短,并且还需要被中断,可以在threadpool之上,进行二次的封装。设计任务窃取的逻辑。重点设计是窃取方式以及性能损耗。


转载来源:https://juejin.cn/post/7395962823501234210

相关文章
|
2月前
|
关系型数据库 MySQL Java
【IDEA】java后台操作mysql数据库驱动常见错误解决方案
【IDEA】java后台操作mysql数据库驱动常见错误解决方案
76 0
|
13天前
|
安全 Java 开发者
Java多线程编程中的常见问题与解决方案
本文深入探讨了Java多线程编程中常见的问题,包括线程安全问题、死锁、竞态条件等,并提供了相应的解决策略。文章首先介绍了多线程的基础知识,随后详细分析了每个问题的产生原因和典型场景,最后提出了实用的解决方案,旨在帮助开发者提高多线程程序的稳定性和性能。
|
19天前
|
人工智能 监控 数据可视化
Java智慧工地信息管理平台源码 智慧工地信息化解决方案SaaS源码 支持二次开发
智慧工地系统是依托物联网、互联网、AI、可视化建立的大数据管理平台,是一种全新的管理模式,能够实现劳务管理、安全施工、绿色施工的智能化和互联网化。围绕施工现场管理的人、机、料、法、环五大维度,以及施工过程管理的进度、质量、安全三大体系为基础应用,实现全面高效的工程管理需求,满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效,为监管平台提供数据支撑。
34 3
|
25天前
|
Java API Apache
|
2月前
|
小程序 Java
小程序访问java后台失败解决方案
小程序访问java后台失败解决方案
47 2
|
3月前
|
传感器 监控 数据可视化
【Java】智慧工地解决方案源码和所需关键技术
智慧工地解决方案是一种新的工程全生命周期管理理念。它通过使用各种传感器、数传终端等物联网手段获取工程施工过程信息,并上传到云平台,以保障数据安全。
83 7
|
2月前
|
存储 前端开发 Java
浅谈Java中文乱码浅析及解决方案
浅谈Java中文乱码浅析及解决方案
78 0
|
2月前
|
Java
Error:java: 无效的目标发行版: 11解决方案
Error:java: 无效的目标发行版: 11解决方案
81 0
|
2月前
|
Java Maven Spring
用Spring导致的无法运行Java文件的问题的解决方案
本文提供了解决在IntelliJ IDEA社区版中使用Spring Initializr插件创建Spring项目后,Java文件无法运行的问题的方法,主要是通过加载Maven项目来解决。
78 0
|
4月前
|
存储 NoSQL Java
一天五道Java面试题----第十一天(分布式架构下,Session共享有什么方案--------->分布式事务解决方案)
这篇文章是关于Java面试中的分布式架构问题的笔记,包括分布式架构下的Session共享方案、RPC和RMI的理解、分布式ID生成方案、分布式锁解决方案以及分布式事务解决方案。
一天五道Java面试题----第十一天(分布式架构下,Session共享有什么方案--------->分布式事务解决方案)