并发编程实践
定时任务
其实使用Thread是能够模拟定时任务的,其中一些定时任务框架的底层源码中,最后也会使用到Thread
实现这种定时任务的具体代码如下:
public static void init() { new Thread(() -> { while (true) { try { System.out.println("下载文件"); Thread.sleep(1000 * 60 * 5); } catch (Exception e) { log.error(e); } } }).start(); }
使用Thread类可以做最简单的定时任务,在run方法中有个while的死循环(当然还有其他方式),执行我们自己的任务。有个需要特别注意的地方是,需要用try...catch捕获异常,否则如果出现异常,就直接退出循环,下次将无法继续执行了。
但这种方式做的定时任务,只能周期性执行,不能支持定时在某个时间点执行。
特别提醒一下,该线程建议定义成守护线程,可以通过setDaemon方法设置,让它在后台默默执行就好。
使用场景:比如项目中有时需要每隔5分钟去下载某个文件,或者每隔10分钟去读取模板文件生成静态html页面等等,一些简单的周期性任务场景。
使用Thread类做定时任务的优缺点:
- 优点:这种定时任务非常简单,学习成本低,容易入手,对于那些简单的周期性任务,是个不错的选择。
- 缺点:不支持指定某个时间点执行任务,不支持延迟执行等操作,功能过于单一,无法应对一些较为复杂的场景。
因为为了尽可能满足延迟执行 和 在某个时间点执行任务,比如:如果用户下单后,超过30分钟还未完成支付,则系统自动将该订单取消。
这里需求就可以使用延迟定时任务实现。
ScheduledExecutorService是JDK1.5+版本引进的定时任务,该类位于java.util.concurrent并发包下。
ScheduledExecutorService是基于多线程的,设计的初衷是为了解决Timer单线程执行,多个任务之间会互相影响的问题。
它主要包含4个方法:
- schedule(Runnable command,long delay,TimeUnit unit),带延迟时间的调度,只执行一次,调度之后可通过Future.get()阻塞直至任务执行完毕。
- schedule(Callable callable,long delay,TimeUnit unit),带延迟时间的调度,只执行一次,调度之后可通过Future.get()阻塞直至任务执行完毕,并且可以获取执行结果。
- scheduleAtFixedRate,表示以固定频率执行的任务,如果当前任务耗时较多,超过定时周期period,则当前任务结束后会立即执行。
- scheduleWithFixedDelay,表示以固定延时执行任务,延时是相对当前任务结束为起点计算开始时间。
实现这种定时任务的具体代码如下:
public class ScheduleExecutorTest { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); scheduledExecutorService.scheduleAtFixedRate(() -> { System.out.println("doSomething"); },1000,1000, TimeUnit.MILLISECONDS); } }
调用ScheduledExecutorService
类的scheduleAtFixedRate
方法实现周期性任务,每隔1秒钟执行一次,每次延迟1秒再执行。
当然也可以配置 定时执行任务,比如说如何让每周四 18:00:00 定时执行任务?
// 获得当前时间 LocalDateTime now = LocalDateTime.now(); // 获取本周四 18:00:00.000 LocalDateTime thursday = now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0); // 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000 if(now.compareTo(thursday) >= 0) { thursday = thursday.plusWeeks(1); } // 计算时间差,即延时执行时间 long initialDelay = Duration.between(now, thursday).toMillis(); // 计算间隔时间,即 1 周的毫秒值 long oneWeek = 7 * 24 * 3600 * 1000; ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); System.out.println("开始时间:" + new Date()); executor.scheduleAtFixedRate(() -> { System.out.println("执行时间:" + new Date()); }, initialDelay, oneWeek, TimeUnit.MILLISECONDS);
模拟监听器
有时候,我们需要写个监听器,去监听某些数据的变化。
比如:我们在使用canal
的时候,需要监听binlog
的变化,能够及时把数据库中的数据,同步到另外一个业务数据库中。
如果直接写一个监听器去监听数据就太没意思了,我们想实现这样一个功能:在配置中心有个开关,配置监听器是否开启,如果开启了使用单线程异步执行。
主要代码如下:
@Service public CanalService { private volatile boolean running = false; private Thread thread; @Autowired private CanalConnector canalConnector; public void handle() { //连接canal while(running) { //业务处理 } } public void start() { thread = new Thread(this::handle, "name"); running = true; thread.start(); } public void stop() { if(!running) { return; } running = false; } }
在start方法中开启了一个线程,在该线程中异步执行handle方法的具体任务。然后通过调用stop方法,可以停止该线程。
其中,使用volatile关键字控制的running变量作为开关,它可以控制线程中的状态。
接下来,有个比较关键的点是:如何通过配置中心的配置,控制这个开关呢?
以apollo配置为例,我们在配置中心的后台,修改配置之后,自动获取最新配置的核心代码如下:
public class CanalConfig { @Autowired private CanalService canalService; @ApolloConfigChangeListener public void change(ConfigChangeEvent event) { String value = event.getChange("test.canal.enable").getNewValue(); if(BooleanUtils.toBoolean(value)) { canalService.start(); } else { canalService.stop(); } } }
通过apollo的ApolloConfigChangeListener注解,可以监听配置参数的变化。
如果test.canal.enable开关配置的true,则调用canalService类的start方法开启canal数据同步功能。如果开关配置的false,则调用canalService类的stop方法,自动停止canal数据同步功能。
文件中转暂存数据
举个例子,在某些高并发的场景中,我们需要收集部分用户的日志(比如:用户登录的日志),写到数据库中,以便于做分析。
但由于项目中,还没有引入消息中间件,比如:kafka、rocketmq等。
如果直接将日志同步写入数据库,可能会影响接口性能。
所以,大家很自然想到了异步处理。
实现这个需求最简单的做法是,开启一个线程,异步写入数据到数据库即可。
这样做,可以是可以。
但如果用户登录操作的耗时,比异步写入数据库的时间要少得多。这样导致的结果是:生产日志的速度,比消费日志的速度要快得多,最终的性能瓶颈在消费端。
其实,还有更优雅的处理方式,虽说没有使用消息中间件,但借用了它的思想。
这套记录登录日志的功能,分为:日志生产端、日志存储端和日志消费端。
如下图所示:
先定义了一个阻塞队列。
@Component public class LoginLogQueue { private static final int QUEUE_MAX_SIZE = 1000; private BlockingQueueblockingQueue queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE); //生成消息 public boolean push(LoginLog loginLog) { return this.queue.add(loginLog); } //消费消息 public LoginLog poll() { LoginLog loginLog = null; try { loginLog = this.queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return result; } }
然后定义了一个日志的生产者。
@Service public class LoginSerivce { @Autowired private LoginLogQueue loginLogQueue; public int login(UserInfo userInfo) { //业务处理 LoginLog loginLog = convert(userInfo); loginLogQueue.push(loginLog); } }
接下来,定义了日志的消费者。
@Service public class LoginInfoConsumer { @Autowired private LoginLogQueue queue; @PostConstruct public voit init { new Thread(() -> { while (true) { LoginLog loginLog = queue.take(); //写入数据库 } }).start(); } }
当然,这个例子中使用单线程接收登录日志,为了提升性能,也可以使用线程池来处理业务逻辑(比如:写入数据库)等。
其实这种思想,总的来说,就是我们耗时的瓶颈是在数据库插入操作这里,有的时候哪怕我们使用批量操作,可能效果还不是很理想,那么就可以考虑别的方式,比如放入文件或者MQ中,可能会比直接插入效率高。下面再举一个例子。
比如说一个转账接口,如果是并发开启,10个并发度,每个批次1000笔转账明细数据,数据库插入会特别耗时,大概6秒左右;这个跟我们公司的数据库同步机制有关,并发情况下,因为优先保证同步,所以并行的插入变成串行啦,就很耗时。
数据库同步机制可能导致并行的插入变成串行的原因有很多,下面列举了一些可能的情况:
- 锁竞争:当多个事务同时尝试向相同的数据页或数据行插入数据时,数据库系统可能会使用锁来确保数据的一致性。如果同步机制导致大量的锁竞争,那么并行插入操作可能会被迫等待其他事务释放锁,从而导致串行化。
- 同步点阻塞:某些数据库同步机制可能会引入同步点,要求所有的写操作都必须在这些同步点进行同步,这样就会导致并行的写操作变成串行化。
- 冲突检测与重试:在数据库同步的过程中,可能会发生数据冲突,系统需要检测并解决这些冲突。这种检测和解决过程可能会导致并行插入变成串行化,因为某些操作需要等待其他操作完成后才能执行。
- 数据复制延迟:如果数据库采用了主从复制或者集群复制的机制,数据同步可能会引入一定的延迟。在这种情况下,并行的插入操作可能会因为数据尚未完全同步而变成串行化。
优化前,1000
笔明细转账数据,先落地DB
数据库,返回处理中给用户,再异步转账。如图:
记得当时压测的时候,高并发情况,这1000
笔明细入库,耗时都比较大。所以我转换了一下思路,把批量的明细转账记录保存的文件服务器,然后记录一笔转账总记录到数据库即可。接着异步再把明细下载下来,进行转账和明细入库。最后优化后,性能提升了十几倍。
优化后,流程图如下:
模拟大数据导入解析
我们可能会经常收到运营同学提过来的excel数据导入需求,比如:将某一大类下的所有子类一次性导入系统,或者导入一批新的供应商数据等等。
我们以导入供应商数据为例,它所涉及的业务流程很长,比如:
- 调用天眼查接口校验企业名称和统一社会信用代码。
- 写入供应商基本表
- 写入组织表
- 给供应商自动创建一个用户
- 给该用户分配权限
- 自定义域名
- 发站内通知
如果在程序中,解析完excel,读取了所有数据之后。用单线程一条条处理业务逻辑,可能耗时会非常长。
为了提升excel数据导入效率,非常有必要使用多线程来处理。
当然在java中实现多线程的手段有很多种,下面重点聊聊java8中最简单的实现方式:parallelStream
。
伪代码如下:
supplierList.parallelStream().forEach(x -> importSupplier(x));
parallelStream
是一个并行执行的流,它默认通过ForkJoinPool
实现的,能提高你的多线程任务的速度。
ForkJoinPool
处理的过程会分而治之,它的核心思想是:将一个大任务切分成多个小任务
。每个小任务都能单独执行,最后它会把所用任务的执行结果进行汇总。
下面用一张图简单介绍一下ForkJoinPool的原理:
当然除了excel导入之外,还有类似的读取文本文件,也可以用类似的方法处理。
温馨的提醒一下,如果一次性导入的数据非常多,用多线程处理,可能会使系统的cpu使用率飙升,需要特别关注。
查询接口优化 - 串行改并行
假设我们设计一个APP首页的接口,它需要查用户信息、需要查banner信息、需要查弹窗信息等等。如果是串行一个一个查,比如查用户信息200ms
,查banner信息100ms
、查弹窗信息50ms
,那一共就耗时350ms
了,如果还查其他信息,那耗时就更大了。
其实我们可以改为并行调用,即查用户信息、查banner信息、查弹窗信息,可以同时并行发起。
最后接口耗时将大大降低。
public UserInfo getUserInfo(Long id) throws InterruptedException, ExecutionException { final UserInfo userInfo = new UserInfo(); CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> { getRemoteUserAndFill(id, userInfo); return Boolean.TRUE; }, executor); CompletableFuture bonusFuture = CompletableFuture.supplyAsync(() -> { getRemoteBonusAndFill(id, userInfo); return Boolean.TRUE; }, executor); CompletableFuture growthFuture = CompletableFuture.supplyAsync(() -> { getRemoteGrowthAndFill(id, userInfo); return Boolean.TRUE; }, executor); CompletableFuture.allOf(userFuture, bonusFuture, growthFuture).join(); userFuture.get(); bonusFuture.get(); growthFuture.get(); return userInfo; }
剑指JUC原理-20.并发编程实践(中):https://developer.aliyun.com/article/1413700