蚂蚁:实现异步的8种方式,你知道几个? 下

简介: 蚂蚁:实现异步的8种方式,你知道几个? 下

4.5 Spring ApplicationEvent事件实现异步

4.5.1 定义事件

public class AsyncSendEmailEvent extends ApplicationEvent {
    /**
     * 邮箱
     **/
    private String email;
   /**
     * 主题
     **/
    private String subject;
    /**
     * 内容
     **/
    private String content;
    /**
     * 接收者
     **/
    private String targetUserId;
}

4.5.2 定义事件处理器

@Slf4j
@Component
public class AsyncSendEmailEventHandler implements ApplicationListener<AsyncSendEmailEvent> {
    @Autowired
    private IMessageHandler mesageHandler;
    @Async("taskExecutor")
    @Override
    public void onApplicationEvent(AsyncSendEmailEvent event) {
        if (event == null) {
            return;
        }
        String email = event.getEmail();
        String subject = event.getSubject();
        String content = event.getContent();
        String targetUserId = event.getTargetUserId();
        mesageHandler.sendsendEmailSms(email, subject, content, targerUserId);
      }
}

另外,可能有些时候采用ApplicationEvent实现异步的使用,当程序出现异常错误的时候,需要考虑补偿机制,那么这时候可以结合Spring Retry重试来帮助我们避免这种异常造成数据不一致问题。

4.6 消息队列

4.6.1 回调事件消息生产者

@Slf4j
@Component
public class CallbackProducer {
    @Autowired
    AmqpTemplate amqpTemplate;
    public void sendCallbackMessage(CallbackDTO allbackDTO, final long delayTimes) {
        log.info("生产者发送消息,callbackDTO,{}", callbackDTO);
        amqpTemplate.convertAndSend(CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getExchange(), CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getRoutingKey(), JsonMapper.getInstance().toJson(genseeCallbackDTO), new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间
                message.getMessageProperties().setHeader("x-delay", delayTimes);
                message.getMessageProperties().setCorrelationId(callbackDTO.getSdkId());
                return message;
            }
        });
    }
}

4.6.2 回调事件消息消费者

@Slf4j
@Component
@RabbitListener(queues = "message.callback", containerFactory = "rabbitListenerContainerFactory")
public class CallbackConsumer {
    @Autowired
    private IGlobalUserService globalUserService;
    @RabbitHandler
    public void handle(String json, Channel channel, @Headers Map<String, Object> map) throws Exception {
        if (map.get("error") != null) {
            //否认消息
            channel.basicNack((Long) map.get(AmqpHeaders.DELIVERY_TAG), false, true);
            return;
        }
        try {
            CallbackDTO callbackDTO = JsonMapper.getInstance().fromJson(json, CallbackDTO.class);
            //执行业务逻辑
            globalUserService.execute(callbackDTO);
            //消息消息成功手动确认,对应消息确认模式acknowledge-mode: manual
            channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);
        } catch (Exception e) {
            log.error("回调失败 -> {}", e);
        }
    }
}

4.7 ThreadUtil异步工具类

@Slf4j
public class ThreadUtils {
    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            ThreadUtil.execAsync(() -> {
                ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
                int number = threadLocalRandom.nextInt(20) + 1;
                System.out.println(number);
            });
            log.info("当前第:" + i + "个线程");
        }
        log.info("task finish!");
    }
}

4.8 Guava异步

GuavaListenableFuture顾名思义就是可以监听的Future,是对java原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用「Guava ListenableFuture」 可以帮我们检测Future是否完成了,不需要再通过get()方法苦苦等待异步的计算结果,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。

ListenableFuture是一个接口,它从jdkFuture接口继承,添加了void addListener(Runnable listener, Executor executor)方法。

我们看下如何使用ListenableFuture。首先需要定义ListenableFuture的实例:

 ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
        final ListenableFuture<Integer> listenableFuture = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("callable execute...")
                TimeUnit.SECONDS.sleep(1);
                return 1;
            }
        });

首先通过MoreExecutors类的静态方法listeningDecorator方法初始化一个ListeningExecutorService的方法,然后使用此实例的submit方法即可初始化ListenableFuture对象。

ListenableFuture要做的工作,在Callable接口的实现类中定义,这里只是休眠了1秒钟然后返回一个数字1,有了ListenableFuture实例,可以执行此Future并执行Future完成之后的回调函数。

 Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        //成功执行...
        System.out.println("Get listenable future's result with callback " + result);
    }
    @Override
    public void onFailure(Throwable t) {
        //异常情况处理...
        t.printStackTrace();
    }
});

那么,以上就是本期介绍的实现异步的8种方式了。




相关文章
|
2月前
|
设计模式 前端开发 JavaScript
前端编程的异步解决方案有哪些?
本文首发于微信公众号“前端徐徐”,介绍了异步编程的背景和几种常见方案,包括回调、事件监听、发布订阅、Promise、Generator、async/await和响应式编程。每种方案都有详细的例子和优缺点分析,帮助开发者根据具体需求选择最合适的异步编程方式。
83 1
|
5月前
|
消息中间件 存储 中间件
云原生异步问题之消息队列中的异步如何解决
云原生异步问题之消息队列中的异步如何解决
|
消息中间件 JavaScript 小程序
蚂蚁:实现异步的8种方式,你知道几个? 上
蚂蚁:实现异步的8种方式,你知道几个?上
|
前端开发 测试技术
深入解析价值25k的蚂蚁金服异步串行面试题
注:代码有错,请阅读原文。朋友去面试蚂蚁金服,遇到了一道面试题,乍一看感觉挺简单的,但是实现起来发现内部值得一提的点还是挺多的。
|
前端开发 JavaScript Java
前端异步解决方案大全(2021版)(四)
前端异步解决方案大全(2021版)
111 0
前端异步解决方案大全(2021版)(四)
|
前端开发 JavaScript
前端异步解决方案大全(2021版)(一)
前端异步解决方案大全(2021版)
162 0
前端异步解决方案大全(2021版)(一)
|
前端开发
前端异步解决方案大全(2021版)(三)
前端异步解决方案大全(2021版)
104 0
前端异步解决方案大全(2021版)(三)
|
前端开发 JavaScript API
前端异步(async)解决方案(所有方案)(一)
前端异步(async)解决方案(所有方案)
196 0
前端异步(async)解决方案(所有方案)(一)
|
前端开发 JavaScript Java
前端异步(async)解决方案(所有方案)(三)
前端异步(async)解决方案(所有方案)
332 0
前端异步(async)解决方案(所有方案)(三)
|
缓存 前端开发 API
异步的发展,顺手学会怎么处理多请求
异步的发展,顺手学会怎么处理多请求
125 0