在多线程编程中,有时需要等待一个或多个线程完成它们的任务,然后再继续执行下一步操作。这种场景下,我们可以使用CountDownLatch来实现等待-通知机制。
理解CountDownLatch
CountDownLatch是Java中的一个同步工具,它允许一个或多个线程等待其他线程完成它们的操作后再继续执行。CountDownLatch包含一个计数器,该计数器初始化为一个正整数N。当一个线程完成一个操作时,计数器的值会减1。当计数器的值变为0时,所有等待中的线程将被释放。
CountDownLatch通常用于实现等待-通知机制,其中一个或多个线程等待其他线程完成它们的操作,然后再继续执行。例如,在一个多线程程序中,主线程可以使用CountDownLatch来等待所有工作线程完成它们的任务,然后再继续执行下一步操作。
使用CountDownLatch
下面是一个简单的示例,演示如何使用CountDownLatch:
import java.util.concurrent.CountDownLatch; public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); Thread t1 = new Thread(() -> { System.out.println("Thread 1 is running"); latch.countDown(); }); Thread t2 = new Thread(() -> { System.out.println("Thread 2 is running"); latch.countDown(); }); Thread t3 = new Thread(() -> { System.out.println("Thread 3 is running"); latch.countDown(); }); t1.start(); t2.start(); t3.start(); latch.await(); System.out.println("All threads have completed their tasks"); } }
在这个例子中,我们创建了一个CountDownLatch对象,计数器的初始值为3。然后,我们创建了三个线程t1、t2和t3,它们各自完成自己的任务,并调用了CountDownLatch的countDown()方法来减少计数器的值。最后,我们调用CountDownLatch的await()方法来等待所有线程完成它们的任务。当计数器的值变为0时,await()方法将返回,主线程将继续执行下一步操作。
实践中的CountDownLatch
最近需要删除公司的S3上的大量文件以及对应的MySQL中存储的索引。
由于要删除的量级比较大,且公司的S3没有开放批量删除的接口,因此一开始引入了多线程:
public void physicallyDelete(List<String> idList) { int pageId = 1; boolean isHasNextPage; do { PageHelper.startPage(pageId, Constants.DEFAULT_PAGE_SIZE); List<Info> infoList = infoDAO.getByIdList(idList); PageInfo<Info> page = new PageInfo<>(infoList); isHasNextPage = page.isHasNextPage(); Stopwatch stopwatch = Stopwatch.createStarted(); infoList.forEach(info -> threadPool.execute(() -> { deleteS3AndMySQL(info); })); LOGGER.info("s3和数据库删除成功, size:{}, cost:{}", infoList.size(), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); } while (isHasNextPage); } private void deleteS3AndMySQL(Info info) { String key = getKeyFromS3Url(info.getVideoUrl()); try { Stopwatch stopwatch = Stopwatch.createStarted(); S3Manager.deleteFile(bucketName, key); LOGGER.info("s3删除成功, bucket:{}, key:{}, cost:{}", bucketName, key, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); } catch (Exception e) { LOGGER.error("s3删除失败, bucket:{}, key:{}", bucketName, key, e); return; } infoDAO.physicallyDeleteByIdList(Collections.singletonList(info.getId())); }
但是发现了一个问题,从理论上来讲,删除一个S3上的文件,应该对应删除一条MySQL上的记录;但是在日志中发现,每删除一条MySQL上的记录,就多次重复触发了删除S3上的对应文件。
排查发现,由于在physicallyDelete方法中存在分页查询,有可能在deleteS3AndMySQL方法中已经删除了S3,但尚未删除MySQL中的记录时,已经进行了分页查询下一页,线程池中的其他线程又运行了deleteS3AndMySQL方法,导致重复调用了S3的删除接口。
为了解决这个问题,我们可以引入CountDownLatch,代码如下:
public void physicallyDelete(List<String> idList) { int pageId = 1; boolean isHasNextPage; do { PageHelper.startPage(pageId, Constants.DEFAULT_PAGE_SIZE); List<Info> infoList = infoDAO.getByIdList(idList); PageInfo<Info> page = new PageInfo<>(infoList); isHasNextPage = page.isHasNextPage(); CountDownLatch latch = new CountDownLatch(infoList.size()); Stopwatch stopwatch = Stopwatch.createStarted(); infoList.forEach(info -> threadPool.execute(() -> { try { deleteS3AndMySQL(info); } catch (Exception e) { LOGGER.info("deleteS3AndMySQL error", e); } finally { latch.countDown(); } })); LOGGER.info("s3和数据库删除成功, size:{}, cost:{}", infoList.size(), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); try { latch.await(); } catch (Exception e) { LOGGER.info("latch.await异常", e); Thread.currentThread().interrupt(); } } while (isHasNextPage); }
这样,由于CountDownLatch的存在,就会等到线程池中的线程将分页查出的全部数据处理完毕后,再去查出下一页数据进行处理,从而避免多次重复调用S3删除接口。
总结
CountDownLatch是Java中的一个同步工具,它允许一个或多个线程等待其他线程完成它们的操作后再继续执行。CountDownLatch通常用于实现等待-通知机制,其中一个或多个线程等待其他线程完成它们的操作,然后再继续执行。在多线程编程中,CountDownLatch是一种非常有用的工具,可以帮助我们实现复杂的同步逻辑。