1:异步化
什么时候使用异步化?
调用的服务处理能力有限,或者处理时间(或返回时)较长时,应该考虑异步化。
1.1:什么是同步?什么是异步?
同步:一件事情做完,再做另一件事情。(烧水后处理工作)
异步:不用等一件事做完,就可以做另一件事,等第一件事情完成时,可以收到一个通知,可以进行后续处理。(烧水的时候,烧水人可以同时处理工作,当烧水完成时,可以听到水壶的蜂鸣声,就知道水烧好了)
1.2:异步化业务流程分析
标准异步化业务流程:
1:当用户要进行耗时很长的操作时,点击提交后,不需要在界面进行等待,而是把这个任务保存到数据库中,记录下来。
2:用户要执行新任务时:
3:线程从任务队列中取出任务依次执行,每完成一个任务需要修改任务的状态。
4:用户可以查询任务的状态,或者在任务执行或失败的时候能够得到通知。
5:如果执行的任务非常复杂,包括很多环节,在每完成一个任务时,要在程序(数据库中)记录一下任务的执行流程。
本次项目业务异步化的执行流程:
1:用户点击智能分析的提交按钮时,先把图表立刻保存到数据库中(作为一个任务)
2:用户可以在图表管理页面查看所有图表(已生成的,生成中的,生成失败)的信息和状态。
1:将用户从前端传过来的数据保存到数据库中
2:生成图表的任务放到任务队列中
3:任务处理模块再去从任务队列中取出图表任务,交给AI进行分析,一个一个进行分析。
4:AI处理完一个项目,给任务处理模块一个通知,任务处理模块更新数据库中的状态。
1.3:异步化的问题
1.3.1:任务队列的最大容量
1.3.2:怎样从任务队列中取出任务去执行?任务队列的流程怎样实现?
以上两个问题可以利用线程池来进行解决。
2:线程池的理论和实战
2.1:为啥需要线程池?
1:线程的管理比较复杂(eg:比如什么时候新增线程,什么时候减少空闲线程)
2:任务存取比较复杂(eg:什么时候接收任务,什么时候拒绝任务,怎么保证大家不抢到同一个任务)
线程池的作用:帮助我们轻松管理线程,协调任务的执行进程。
2.2:线程池的实现
在Spring中,可以用ThreadPoolTaskExecutor配合@Async注解来实现。(不太建议)
在Java中,可以使用JUC并发编程包中的ThreadPoolExecutor来实现。(建议)
2.2.1:线程池参数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
2.2.2:线程池的工作机制
当已经达到了最大线程数,此时,任务队列中也没有新的任务要去执行,那么等keepAlive时间达到后,就可以将这个线程释放。
线程池参数如何设置?
首先,根据自己当前的业务或者系统进行执行。
一般情况下,任务分为IO密集型和计算密集型两种。
计算密集型:吃CPU ,比如音视频处理,图像处理,数学计算等,一般设置为corePoolSize为CPU核数+1,可以让每个线程都利用CPU的每个核,而且线程之间不用频繁切换
IO密集型:吃带宽/内存/硬盘的读写资源,corePoolSize可以设置大一点(2*n+1)
考虑导入百万数据到数据库,属于IO密集型任务,还是计算密集型任务?
属于IO密集型任务,因为在这个工作中,吃的是带宽/内存/硬盘的资源。
代码校验线程池的工作机制:
a:相关参数设置,我们用的是一个config的目录下的类,相当于公共的类。
@Configuration public class ThreadPoolExecutorConfig { @Bean public ThreadPoolExecutor threadPoolExecutor(){ ThreadFactory threadFactory=new ThreadFactory() { private int count=1; @Override public Thread newThread(@NotNull Runnable r) { Thread thread=new Thread(r); thread.setName("线程"+count); return thread; } }; ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(2,4,100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4),threadFactory); return threadPoolExecutor; } }
b:我们在controller层新建一个类用来提交任务和检验任务。
@RestController @RequestMapping("/queue") @Slf4j public class QueueController { @Autowired private ThreadPoolExecutor threadPoolExecutor; //添加任务的接口,提交任务 @GetMapping("/add")//添加任务的接口 public void add(String name){ CompletableFuture.runAsync(()->{ log.info("任务执行中:"+name+". 执行人 "+ Thread.currentThread().getName()); try { Thread.sleep(600000); } catch (InterruptedException e) { throw new RuntimeException(e); } },threadPoolExecutor); }; //查看线程池的状态 @GetMapping("/get") public String get(){ Map<String,Object> map=new HashMap<>(); int size=threadPoolExecutor.getQueue().size(); map.put("队列长度",size); long taskCount = threadPoolExecutor.getTaskCount();//任务总数 map.put("任务的总数量",taskCount); long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();//已经成功的任务数 map.put("已经完成的任务数",completedTaskCount); int activeCount = threadPoolExecutor.getActiveCount();//正在工作的员工数 map.put("正在工作的线程数",activeCount); return JSONUtil.toJsonStr(map); } }
c:利用 swagger生成的接口文档进行校验:
a):当发送任务1时,我们设置的核心线程数为2
利用接口文档来进行发送和校验:
b):当发送任务2时,我们设置的核心线程数为2
利用接口文档来进行发送和校验:
c):当发送任务3时,我们设置的核心线程数为2,此时,正式的线程数已经用完了,根据线程池的工作机制应该加到任务队列中,我们来进行查看。任务3此时并没有线程来进行执行。而是加到了任务队列中。
d):当任务队列满时,此时正式线程数也已经满了,看看啥情况,即任务数到达6了,继续再发一个任务。
此时,就会调用临时的线程来执行任务。
当达到整个maxpoolSize且阻塞队列满时,就会进行报错。