剑指JUC原理-20.并发编程实践(上)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 剑指JUC原理-20.并发编程实践

并发编程实践


定时任务


其实使用Thread是能够模拟定时任务的,其中一些定时任务框架的底层源码中,最后也会使用到Thread

实现这种定时任务的具体代码如下:

public static void init() {
    new Thread(() -> {
        while (true) {
            try {
                System.out.println("下载文件");
                Thread.sleep(1000 * 60 * 5);
            } catch (Exception e) {
                log.error(e);
            }
        }
    }).start();
}

使用Thread类可以做最简单的定时任务,在run方法中有个while的死循环(当然还有其他方式),执行我们自己的任务。有个需要特别注意的地方是,需要用try...catch捕获异常,否则如果出现异常,就直接退出循环,下次将无法继续执行了。


但这种方式做的定时任务,只能周期性执行,不能支持定时在某个时间点执行。


特别提醒一下,该线程建议定义成守护线程,可以通过setDaemon方法设置,让它在后台默默执行就好。


使用场景:比如项目中有时需要每隔5分钟去下载某个文件,或者每隔10分钟去读取模板文件生成静态html页面等等,一些简单的周期性任务场景。


使用Thread类做定时任务的优缺点:


  • 优点:这种定时任务非常简单,学习成本低,容易入手,对于那些简单的周期性任务,是个不错的选择。
  • 缺点:不支持指定某个时间点执行任务,不支持延迟执行等操作,功能过于单一,无法应对一些较为复杂的场景。


因为为了尽可能满足延迟执行 和 在某个时间点执行任务,比如:如果用户下单后,超过30分钟还未完成支付,则系统自动将该订单取消。


这里需求就可以使用延迟定时任务实现。


ScheduledExecutorService是JDK1.5+版本引进的定时任务,该类位于java.util.concurrent并发包下。


ScheduledExecutorService是基于多线程的,设计的初衷是为了解决Timer单线程执行,多个任务之间会互相影响的问题。


它主要包含4个方法:


  • schedule(Runnable command,long delay,TimeUnit unit),带延迟时间的调度,只执行一次,调度之后可通过Future.get()阻塞直至任务执行完毕。
  • schedule(Callable callable,long delay,TimeUnit unit),带延迟时间的调度,只执行一次,调度之后可通过Future.get()阻塞直至任务执行完毕,并且可以获取执行结果。
  • scheduleAtFixedRate,表示以固定频率执行的任务,如果当前任务耗时较多,超过定时周期period,则当前任务结束后会立即执行。
  • scheduleWithFixedDelay,表示以固定延时执行任务,延时是相对当前任务结束为起点计算开始时间。


实现这种定时任务的具体代码如下:

public class ScheduleExecutorTest {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            System.out.println("doSomething");
        },1000,1000, TimeUnit.MILLISECONDS);
    }
}

调用ScheduledExecutorService类的scheduleAtFixedRate方法实现周期性任务,每隔1秒钟执行一次,每次延迟1秒再执行。


当然也可以配置 定时执行任务,比如说如何让每周四 18:00:00 定时执行任务?

// 获得当前时间
        LocalDateTime now = LocalDateTime.now();
        // 获取本周四 18:00:00.000
        LocalDateTime thursday =
                    now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);
        // 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000
        if(now.compareTo(thursday) >= 0) {
            thursday = thursday.plusWeeks(1);
        }
        // 计算时间差,即延时执行时间
        long initialDelay = Duration.between(now, thursday).toMillis();
        // 计算间隔时间,即 1 周的毫秒值
        long oneWeek = 7 * 24 * 3600 * 1000;
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
        System.out.println("开始时间:" + new Date());
        executor.scheduleAtFixedRate(() -> {
            System.out.println("执行时间:" + new Date());
        }, initialDelay, oneWeek, TimeUnit.MILLISECONDS);


模拟监听器


有时候,我们需要写个监听器,去监听某些数据的变化。


比如:我们在使用canal的时候,需要监听binlog的变化,能够及时把数据库中的数据,同步到另外一个业务数据库中。

如果直接写一个监听器去监听数据就太没意思了,我们想实现这样一个功能:在配置中心有个开关,配置监听器是否开启,如果开启了使用单线程异步执行。


主要代码如下:

@Service
public CanalService {
    private volatile boolean running = false;
    private Thread thread;
    @Autowired
    private CanalConnector canalConnector;
    public void handle() {
        //连接canal
        while(running) {
           //业务处理
        }
    }
    public void start() {
       thread = new Thread(this::handle, "name");
       running = true;
       thread.start();
    }
    public void stop() {
       if(!running) {
          return;
       }
       running = false;
    }
}

在start方法中开启了一个线程,在该线程中异步执行handle方法的具体任务。然后通过调用stop方法,可以停止该线程。


其中,使用volatile关键字控制的running变量作为开关,它可以控制线程中的状态。


接下来,有个比较关键的点是:如何通过配置中心的配置,控制这个开关呢?


以apollo配置为例,我们在配置中心的后台,修改配置之后,自动获取最新配置的核心代码如下:

public class CanalConfig {
    @Autowired
    private CanalService canalService;
    @ApolloConfigChangeListener
    public void change(ConfigChangeEvent event) {
        String value = event.getChange("test.canal.enable").getNewValue();
        if(BooleanUtils.toBoolean(value)) {
            canalService.start();
        } else {
            canalService.stop();
        }
    }
}

通过apollo的ApolloConfigChangeListener注解,可以监听配置参数的变化。


如果test.canal.enable开关配置的true,则调用canalService类的start方法开启canal数据同步功能。如果开关配置的false,则调用canalService类的stop方法,自动停止canal数据同步功能。


文件中转暂存数据


举个例子,在某些高并发的场景中,我们需要收集部分用户的日志(比如:用户登录的日志),写到数据库中,以便于做分析。


但由于项目中,还没有引入消息中间件,比如:kafka、rocketmq等。


如果直接将日志同步写入数据库,可能会影响接口性能。


所以,大家很自然想到了异步处理。


实现这个需求最简单的做法是,开启一个线程,异步写入数据到数据库即可。


这样做,可以是可以。


但如果用户登录操作的耗时,比异步写入数据库的时间要少得多。这样导致的结果是:生产日志的速度,比消费日志的速度要快得多,最终的性能瓶颈在消费端。


其实,还有更优雅的处理方式,虽说没有使用消息中间件,但借用了它的思想。


这套记录登录日志的功能,分为:日志生产端、日志存储端和日志消费端。


如下图所示:

先定义了一个阻塞队列。

@Component
public class LoginLogQueue {
    private static final int QUEUE_MAX_SIZE    = 1000;
    private BlockingQueueblockingQueue queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
    //生成消息
    public boolean push(LoginLog loginLog) {
        return this.queue.add(loginLog);
    } 
    //消费消息
    public LoginLog poll() {
        LoginLog loginLog = null;
        try {
            loginLog = this.queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}

然后定义了一个日志的生产者。

@Service
public class LoginSerivce {
    @Autowired
    private LoginLogQueue loginLogQueue;
    public int login(UserInfo userInfo) {
        //业务处理
        LoginLog loginLog = convert(userInfo);
        loginLogQueue.push(loginLog);
    }  
}

接下来,定义了日志的消费者。

@Service
public class LoginInfoConsumer {
    @Autowired
    private LoginLogQueue queue;
    @PostConstruct
    public voit init {
       new Thread(() -> {
          while (true) {
              LoginLog loginLog = queue.take();
              //写入数据库
          }
        }).start();
    }
}

当然,这个例子中使用单线程接收登录日志,为了提升性能,也可以使用线程池来处理业务逻辑(比如:写入数据库)等。


其实这种思想,总的来说,就是我们耗时的瓶颈是在数据库插入操作这里,有的时候哪怕我们使用批量操作,可能效果还不是很理想,那么就可以考虑别的方式,比如放入文件或者MQ中,可能会比直接插入效率高。下面再举一个例子。


比如说一个转账接口,如果是并发开启,10个并发度,每个批次1000笔转账明细数据,数据库插入会特别耗时,大概6秒左右;这个跟我们公司的数据库同步机制有关,并发情况下,因为优先保证同步,所以并行的插入变成串行啦,就很耗时。


数据库同步机制可能导致并行的插入变成串行的原因有很多,下面列举了一些可能的情况:


  • 锁竞争:当多个事务同时尝试向相同的数据页或数据行插入数据时,数据库系统可能会使用锁来确保数据的一致性。如果同步机制导致大量的锁竞争,那么并行插入操作可能会被迫等待其他事务释放锁,从而导致串行化。
  • 同步点阻塞:某些数据库同步机制可能会引入同步点,要求所有的写操作都必须在这些同步点进行同步,这样就会导致并行的写操作变成串行化。
  • 冲突检测与重试:在数据库同步的过程中,可能会发生数据冲突,系统需要检测并解决这些冲突。这种检测和解决过程可能会导致并行插入变成串行化,因为某些操作需要等待其他操作完成后才能执行。
  • 数据复制延迟:如果数据库采用了主从复制或者集群复制的机制,数据同步可能会引入一定的延迟。在这种情况下,并行的插入操作可能会因为数据尚未完全同步而变成串行化。


优化前1000笔明细转账数据,先落地DB数据库,返回处理中给用户,再异步转账。如图:

记得当时压测的时候,高并发情况,这1000笔明细入库,耗时都比较大。所以我转换了一下思路,把批量的明细转账记录保存的文件服务器,然后记录一笔转账总记录到数据库即可。接着异步再把明细下载下来,进行转账和明细入库。最后优化后,性能提升了十几倍


优化后,流程图如下:


模拟大数据导入解析


我们可能会经常收到运营同学提过来的excel数据导入需求,比如:将某一大类下的所有子类一次性导入系统,或者导入一批新的供应商数据等等。


我们以导入供应商数据为例,它所涉及的业务流程很长,比如:


  1. 调用天眼查接口校验企业名称和统一社会信用代码。
  2. 写入供应商基本表
  3. 写入组织表
  4. 给供应商自动创建一个用户
  5. 给该用户分配权限
  6. 自定义域名
  7. 发站内通知


如果在程序中,解析完excel,读取了所有数据之后。用单线程一条条处理业务逻辑,可能耗时会非常长。


为了提升excel数据导入效率,非常有必要使用多线程来处理。


当然在java中实现多线程的手段有很多种,下面重点聊聊java8中最简单的实现方式:parallelStream


伪代码如下:

supplierList.parallelStream().forEach(x -> importSupplier(x));

parallelStream是一个并行执行的流,它默认通过ForkJoinPool实现的,能提高你的多线程任务的速度。


ForkJoinPool处理的过程会分而治之,它的核心思想是:将一个大任务切分成多个小任务。每个小任务都能单独执行,最后它会把所用任务的执行结果进行汇总。


下面用一张图简单介绍一下ForkJoinPool的原理:

当然除了excel导入之外,还有类似的读取文本文件,也可以用类似的方法处理。


温馨的提醒一下,如果一次性导入的数据非常多,用多线程处理,可能会使系统的cpu使用率飙升,需要特别关注。


查询接口优化 - 串行改并行


假设我们设计一个APP首页的接口,它需要查用户信息、需要查banner信息、需要查弹窗信息等等。如果是串行一个一个查,比如查用户信息200ms,查banner信息100ms、查弹窗信息50ms,那一共就耗时350ms了,如果还查其他信息,那耗时就更大了。

其实我们可以改为并行调用,即查用户信息、查banner信息、查弹窗信息,可以同时并行发起

最后接口耗时将大大降低

public UserInfo getUserInfo(Long id) throws InterruptedException, ExecutionException {
    final UserInfo userInfo = new UserInfo();
    CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {
        getRemoteUserAndFill(id, userInfo);
        return Boolean.TRUE;
    }, executor);
    CompletableFuture bonusFuture = CompletableFuture.supplyAsync(() -> {
        getRemoteBonusAndFill(id, userInfo);
        return Boolean.TRUE;
    }, executor);
    CompletableFuture growthFuture = CompletableFuture.supplyAsync(() -> {
        getRemoteGrowthAndFill(id, userInfo);
        return Boolean.TRUE;
    }, executor);
    CompletableFuture.allOf(userFuture, bonusFuture, growthFuture).join();
    userFuture.get();
    bonusFuture.get();
    growthFuture.get();
    return userInfo;
}



剑指JUC原理-20.并发编程实践(中):https://developer.aliyun.com/article/1413700

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
6月前
|
Java
剑指JUC原理-14.ReentrantLock原理(下)
剑指JUC原理-14.ReentrantLock原理
38 1
|
6月前
|
消息中间件 存储 Java
剑指JUC原理-20.并发编程实践(下)
剑指JUC原理-20.并发编程实践
64 0
|
6月前
|
NoSQL 前端开发 Java
剑指JUC原理-20.并发编程实践(中)
剑指JUC原理-20.并发编程实践
67 0
|
6月前
|
存储 算法 安全
剑指JUC原理-5.synchronized底层原理(上)
剑指JUC原理-5.synchronized底层原理
57 0
|
6月前
|
安全 Java 程序员
剑指JUC原理-14.ReentrantLock原理(上)
剑指JUC原理-14.ReentrantLock原理
45 0
|
6月前
|
存储 Java 编译器
剑指JUC原理-5.synchronized底层原理(下)
剑指JUC原理-5.synchronized底层原理
52 0
|
6月前
|
缓存 安全 前端开发
剑指JUC原理-8.Java内存模型(上)
剑指JUC原理-8.Java内存模型
69 0
|
6月前
|
SQL 安全 Java
剑指JUC原理-8.Java内存模型(下)
剑指JUC原理-8.Java内存模型
48 0
|
6月前
|
Java 编译器 测试技术
剑指JUC原理-8.Java内存模型(中)
剑指JUC原理-8.Java内存模型
57 0
|
6月前
|
存储 缓存 安全
剑指JUC原理-11.不可变设计
剑指JUC原理-11.不可变设计
31 0