JAVA多线程 | 实现用户任务排队 | 预估排队时长

简介: JAVA多线程 | 实现用户任务排队 | 预估排队时长

 image.gif编辑

实现流程

image.gif编辑

 初始化一定数量的任务处理线程和缓存线程池,用户每次调用接口,开启一个线程处理。

 假设初始化5个处理器,代码执行 BlockingQueue.take 时候,每次take都会处理器队列就会减少一个,当处理器队列为空时,take就是阻塞线程,当用户处理某某任务完成时候,调用资源释放接口,在处理器队列put 一个处理器对象,原来阻塞的take ,就继续执行。

排队论简介

     排队论是研究系统随机聚散现象和随机系统工作工程的数学理论和方法,又称随机服务系统理论,为运筹学的一个分支。我们下面对排队论做下简化处理,先看下图:

image.gif编辑

代码具体实现

任务队列初始化 TaskQueue

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 初始化队列及线程池
 * @author tarzan
 *
 */
@Component
public class TaskQueue {
    //处理器队列
    public static BlockingQueue<TaskProcessor> taskProcessors;
    //等待任务队列
    public static BlockingQueue<CompileTask> waitTasks;
    //处理任务队列
    public static BlockingQueue<CompileTask> executeTasks;
    //线程池
    public static ExecutorService exec;
    //初始处理器数(计算机cpu可用线程数)
    public static Integer processorNum=Runtime.getRuntime().availableProcessors();
    /**
     * 初始化处理器、等待任务、处理任务队列及线程池
     */
    @PostConstruct
    public static void initEquipmentAndUsersQueue(){
        exec = Executors.newCachedThreadPool();
        taskProcessors =new LinkedBlockingQueue<TaskProcessor>(processorNum);
        //将空闲的设备放入设备队列中
        setFreeDevices(processorNum);
        waitTasks =new LinkedBlockingQueue<CompileTask>();
        executeTasks=new LinkedBlockingQueue<CompileTask>(processorNum);
    }
    /**
     * 将空闲的处理器放入处理器队列中
     */
    private static void setFreeDevices(int num) {
        //获取可用的设备
        for (int i = 0; i < num; i++) {
            TaskProcessor dc=new TaskProcessor();
            try {
                taskProcessors.put(dc);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static CompileTask getWaitTask(Long clazzId) {
        return get(TaskQueue.waitTasks,clazzId);
    }
    public static CompileTask getExecuteTask(Long clazzId) {
        return get(TaskQueue.executeTasks,clazzId);
    }
    private static CompileTask get(BlockingQueue<CompileTask> users, Long clazzId) {
        CompileTask compileTask =null;
        if (CollectionUtils.isNotEmpty(users)){
            Optional<CompileTask> optional=users.stream().filter(e->e.getClazzId().longValue()==clazzId.longValue()).findFirst();
            if(optional.isPresent()){
                compileTask =  optional.get();
            }
        }
        return compileTask;
    }
    public static Integer getSort(Long clazzId) {
        AtomicInteger index = new AtomicInteger(-1);
        BlockingQueue<CompileTask> compileTasks = TaskQueue.waitTasks;
        if (CollectionUtils.isNotEmpty(compileTasks)){
            compileTasks.stream()
                    .filter(e -> {
                        index.getAndIncrement();
                        return e.getClazzId().longValue() == clazzId.longValue();
                    })
                    .findFirst();
        }
        return index.get();
    }
    //单位秒
    public static int estimatedTime(Long clazzId){
        return  estimatedTime(60,getSort(clazzId)+1);
    }
    //单位秒
    public static int estimatedTime(int cellMs,int num){
         int a= (num-1)/processorNum;
         int b= cellMs*(a+1);
        return  b;
    }

image.gif

编译任务类 CompileTask

import lombok.Data;
import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.gis.common.enums.DataScheduleEnum;
import org.springblade.gis.dynamicds.service.DynamicDataSourceService;
import org.springblade.gis.modules.feature.schedule.service.DataScheduleService;
import java.util.Date;
@Data
public class CompileTask implements Runnable {
    //当前请求的线程对象
    private Long clazzId;
    //用户id
    private Long userId;
    //当前请求的线程对象
    private Thread thread;
    //绑定处理器
    private TaskProcessor taskProcessor;
    //任务状态
    private Integer status;
    //开始时间
    private Date startTime;
    //结束时间
    private Date endTime;
    private DataScheduleService dataScheduleService= SpringUtil.getBean(DataScheduleService.class);
    private DynamicDataSourceService dataSourceService= SpringUtil.getBean(DynamicDataSourceService.class);
    @Override
    public void run() {
        compile();
    }
    /**
     * 编译
     */
    public void compile() {
        try {
            //取出一个设备
            TaskProcessor taskProcessor = TaskQueue.taskProcessors.take();
            //取出一个任务
            CompileTask compileTask = TaskQueue.waitTasks.take();
            //任务和设备绑定
            compileTask.setTaskProcessor(taskProcessor);
            //放入
            TaskQueue.executeTasks.put(compileTask);
            System.out.println(DataScheduleEnum.DEAL_WITH.getName()+" "+userId);
            //切换用户数据源
            dataSourceService.switchDataSource(userId);
            //添加进度
            dataScheduleService.addSchedule(clazzId, DataScheduleEnum.DEAL_WITH.getState());
        } catch (InterruptedException e) {
            System.err.println( e.getMessage());
        }
    }
}

image.gif

任务处理器 TaskProcessor

import lombok.Data;
import java.util.Date;
@Data
public class TaskProcessor {
    /**
     * 释放
     */
    public  static Boolean release(CompileTask task)  {
        Boolean flag=false;
        Thread thread=task.getThread();
        synchronized (thread) {
            try {
                if(null!=task.getTaskProcessor()){
                    TaskQueue.taskProcessors.put(task.getTaskProcessor());
                    TaskQueue.executeTasks.remove(task);
                    task.setEndTime(new Date());
                    long intervalMilli = task.getEndTime().getTime() - task.getStartTime().getTime();
                    flag=true;
                    System.out.println("用户"+task.getClazzId()+"耗时"+intervalMilli+"ms");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return flag;
        }
    }
}

image.gif

Controller控制器接口实现

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springblade.core.tool.api.R;
import org.springblade.gis.multithread.TaskProcessor;
import org.springblade.gis.multithread.TaskQueue;
import org.springblade.gis.multithread.CompileTask;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
@RestController
@RequestMapping("task")
@Api(value = "数据编译任务", tags = "数据编译任务")
public class CompileTaskController {
    @ApiOperation(value = "添加等待请求 @author Tarzan Liu")
    @PostMapping("compile/{clazzId}")
    public R<Integer> compile(@PathVariable("clazzId") Long clazzId) {
        CompileTask checkUser=TaskQueue.getWaitTask(clazzId);
        if(checkUser!=null){
            return  R.fail("已经正在排队!");
        }
        checkUser=TaskQueue.getExecuteTask(clazzId);
        if(checkUser!=null){
            return  R.fail("正在执行编译!");
        }
        //获取当前的线程
        Thread thread=Thread.currentThread();
        //创建当前的用户请求对象
        CompileTask compileTask =new CompileTask();
        compileTask.setThread(thread);
        compileTask.setClazzId(clazzId);
        compileTask.setStartTime(new Date());
        //将当前用户请求对象放入队列中
        try {
            TaskQueue.waitTasks.put(compileTask);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        TaskQueue.exec.execute(compileTask);
        return R.data(TaskQueue.waitTasks.size()-1);
    }
    @ApiOperation(value = "查询当前任务前还有多少任务等待 @author Tarzan Liu")
    @PostMapping("sort/{clazzId}")
    public R<Integer> sort(@PathVariable("clazzId") Long clazzId) {
        return R.data(TaskQueue.getSort(clazzId));
    }
    @ApiOperation(value = "查询当前任务预估时长 @author Tarzan Liu")
    @PostMapping("estimate/time/{clazzId}")
    public R<Integer> estimatedTime(@PathVariable("clazzId") Long clazzId) {
        return R.data(TaskQueue.estimatedTime(clazzId));
    }
    @ApiOperation(value = "任务释放 @author Tarzan Liu")
    @PostMapping("release/{clazzId}")
    public R<Boolean> release(@PathVariable("clazzId") Long clazzId) {
        CompileTask task=TaskQueue.getExecuteTask(clazzId);
        if(task==null){
            return  R.fail("资源释放异常");
        }
        return R.status(TaskProcessor.release(task));
    }
    @ApiOperation(value = "执行 @author Tarzan Liu")
    @PostMapping("exec")
    public R exec() {
        Long start=System.currentTimeMillis();
        for (Long i = 1L; i < 100; i++) {
            compile(i);
        }
        System.out.println("消耗时间:"+(System.currentTimeMillis()-start)+"ms");
        return R.status(true);
    }
}

image.gif

接口测试

根据任务id查询该任务前还有多少个任务待执行

image.gif编辑

根据任务id查询该任务预估执行完成的剩余时间,单位秒

image.gif编辑

补充知识

BlockingQueue即阻塞队列,它是基于ReentrantLock,依据它的基本原理,我们可以实现Web中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示:

image.gif编辑

在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。

2. 阻塞与非阻塞

入队

offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞

put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中断-->阻塞

offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:-->阻塞

被唤醒

等待时间超时

当前线程被中断

出队

poll():如果没有元素,直接返回null;如果有元素,出队

take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断-->阻塞

poll(long timeout, TimeUnit unit):如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:

被唤醒

等待时间超时

当前线程被中断

相关文章
|
2天前
|
Java 调度 开发者
Java并发编程:深入理解线程池
在Java的世界中,线程池是提升应用性能、实现高效并发处理的关键工具。本文将深入浅出地介绍线程池的核心概念、工作原理以及如何在实际应用中有效利用线程池来优化资源管理和任务调度。通过本文的学习,读者能够掌握线程池的基本使用技巧,并理解其背后的设计哲学。
|
1天前
|
缓存 监控 Java
Java中的并发编程:理解并应用线程池
在Java的并发编程中,线程池是提高应用程序性能的关键工具。本文将深入探讨如何有效利用线程池来管理资源、提升效率和简化代码结构。我们将从基础概念出发,逐步介绍线程池的配置、使用场景以及最佳实践,帮助开发者更好地掌握并发编程的核心技巧。
|
3天前
|
缓存 监控 Java
java中线程池的使用
java中线程池的使用
|
2天前
|
算法 Java 数据处理
Java并发编程:解锁多线程的力量
在Java的世界里,掌握并发编程是提升应用性能和响应能力的关键。本文将深入浅出地探讨如何利用Java的多线程特性来优化程序执行效率,从基础的线程创建到高级的并发工具类使用,带领读者一步步解锁Java并发编程的奥秘。你将学习到如何避免常见的并发陷阱,并实际应用这些知识来解决现实世界的问题。让我们一起开启高效编码的旅程吧!
|
7天前
|
存储 Java 程序员
优化Java多线程应用:是创建Thread对象直接调用start()方法?还是用个变量调用?
这篇文章探讨了Java中两种创建和启动线程的方法,并分析了它们的区别。作者建议直接调用 `Thread` 对象的 `start()` 方法,而非保持强引用,以避免内存泄漏、简化线程生命周期管理,并减少不必要的线程控制。文章详细解释了这种方法在使用 `ThreadLocal` 时的优势,并提供了代码示例。作者洛小豆,文章来源于稀土掘金。
|
4天前
|
Java 开发者
Java中的多线程编程基础与实战
【9月更文挑战第6天】本文将通过深入浅出的方式,带领读者了解并掌握Java中的多线程编程。我们将从基础概念出发,逐步深入到代码实践,最后探讨多线程在实际应用中的优势和注意事项。无论你是初学者还是有一定经验的开发者,这篇文章都能让你对Java多线程有更全面的认识。
14 1
|
2天前
|
安全 Java UED
Java并发编程:解锁多线程的潜力
在Java的世界里,并发编程如同一场精心编排的交响乐,每个线程扮演着不同的乐手,共同奏响性能与效率的和声。本文将引导你走进Java并发编程的大门,探索如何在多核处理器上优雅地舞动多线程,从而提升应用的性能和响应性。我们将从基础概念出发,逐步深入到高级技巧,让你的代码在并行处理的海洋中乘风破浪。
|
8天前
|
监控 Java 调度
【Java学习】多线程&JUC万字超详解
本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
64 6
【Java学习】多线程&JUC万字超详解
|
12天前
|
安全 Java 程序员
Java编程中实现线程安全的策略
【8月更文挑战第31天】在多线程环境下,保证数据一致性和程序的正确运行是每个程序员的挑战。本文将通过浅显易懂的语言和实际代码示例,带你了解并掌握在Java编程中确保线程安全的几种策略。让我们一起探索如何用同步机制、锁和原子变量等工具来保护我们的数据,就像保护自己的眼睛一样重要。
|
12天前
|
安全 Java 开发者
深入浅出Java多线程编程
【8月更文挑战第31天】本文旨在通过浅显易懂的语言和实例,为初学者揭开Java多线程编程的神秘面纱。我们将从基础概念出发,逐步深入到多线程的创建、同步机制及实际应用,帮助读者构建起完整的多线程知识体系。文章不仅包含理论介绍,还提供代码示例,让读者能够动手实践,加深理解。无论你是编程新手还是希望巩固多线程知识的开发者,这篇文章都将是你不可多得的学习资源。