1 从打车开始说起
我们把滴滴打车的流程简化下
- 登录app后点击打车开始进行打车
- 打车服务开始为司机派单
- 司机接单后开始给来接驾
- 上车乘客后处于行程中
- 行程结束后完成本次打车服务
1.1 需要解决的问题
我们需要实现派单服务,用户发送打车订单后需要进行进行派单,如果在指定时间内没有找到司机就会收到派单超时的通知,并且能够实时查看当前排队的抢单人数
下面我们来介绍下涉打车涉及到的一些问题
1.1.1 打车超时
主要讲解打车服务在超时后的处理,比如打车后等待多长时间没有打到车后会通知等待超时
2 延时任务
我们先看下如何来解决打车超时的问题
2.1 什么是延时任务
在开发中,往往会遇到一些关于延时任务的需求,例如
- 生成订单30分钟未支付,则自动取消
- 生成订单60秒后,给用户发短信
- 滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。
2.1.1 和定时任务区别
对上述的任务,我们给一个专业的名字来形容,那就是 延时任务,那么这里就会产生一个问题,这个 延时任务和 定时任务的区别究竟在哪里呢?一共有如下几点区别
- 定时任务有明确的触发时间,延时任务没有
- 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期
- 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务
2.2 延时队列使用场景
那么什么时候需要用延时队列呢?考虑一下以下场景:
- 订单在十分钟之内未支付则自动取消。
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 账单在一周内未支付,则自动结算。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
可以想一下美团点餐,超时时间
2.3 常见方案
下面我们来介绍下常见的延时任务的解决方案
2.3.1 数据库轮询
该方案通常是在小型项目中使用,即通过一个线程定时的去扫描数据库,通过订单时间来判断是否有超时的订单,然后进行update或delete等操作
优点
简单易行,支持集群操作
缺点
- 对服务器内存消耗大
- 存在延迟,比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟
- 假设你的订单有几千万条,每隔几分钟这样扫描一次,数据库损耗极大
2.3.1 JDK的延迟队列
该方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。
优点
效率高,任务触发时间延迟低。
缺点
- 服务器重启后,数据全部消失,怕宕机
- 集群扩展相当麻烦
- 因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
- 代码复杂度较高
2.3.3 netty时间轮算法
时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick
这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。
如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈,位置是在2圈之后的5上面(20 % 8 + 1)
优点
效率高,任务触发时间延迟时间比delayQueue低,代码复杂度比delayQueue低。
缺点
服务器重启后,数据全部消失,怕宕机
集群扩展相当麻烦
因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
2.3.4 使用消息队列
我们可以采用RabbitMQ的延时队列,RabbitMQ具有以下两个特性,可以实现延迟队列
RabbitMQ可以针对Queue和Message设置 x-message-ttl,来控制消息的生存时间,如果超时,则消息变为dead letter
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了dead letter,则按照这两个参数重新路由。
优点
高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。
缺点
本身的易用度要依赖于RabbitMq的运维,因为要引用RabbitMq,所以复杂度和成本变高
2.4 延时队列
RabbitMQ中没有对消息延迟进行实现,但是我们可以通过TTL以及死信路由来实现消息延迟。
2.4.1 TTL(消息过期时间)
在介绍延时队列之前,还需要先介绍一下RabbitMQ中的一个高级特性——TTL(Time To Live)。
TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒,换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
2.4.1.1 配置队列TTL
一种是在创建队列的时候设置队列的“x-message-ttl”属性
@Bean public Queue taxiOverQueue() { Map<String, Object> args = new HashMap<>(2); args.put("x-message-ttl", 30000); return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build(); }
这样所有被投递到该队列的消息都最多不会存活超过30s,但是消息会到哪里呢,如果没有任何处理,消息会被丢弃,如果配置有死信队列,超时的消息会被投递到死信队列
2.5 死信队列
讲到延时消息就不能不讲死信队列
2.5.1 什么是死信队列
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解
一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;
2.5.2 死信队列使用场景
RabbitMQ中的死信交换器(dead letter exchange)可以接收下面三种场景中的消息:
- 消费者对消息使用了
basicReject
或者basicNack
回复,并且requeue
参数设置为false
,即不再将该消息重新在消费者间进行投递 - 消息在队列中超时,RabbitMQ可以在单个消息或者队列中设置
TTL
属性 - 队列中的消息已经超过其设置的最大消息个数
2.5.3 死信队列如何使用
死信交换器不是默认的设置,这里是被投递消息被拒绝后的一个可选行为,是在创建队列的时进行声明的,往往用在对问题消息的诊断上。
死信交换器仍然只是一个普通的交换器,创建时并没有特别要求和操作,在创建队列的时候,声明该交换器将用作保存被拒绝的消息即可,相关的参数是x-dead-letter-exchange
。
2.5.4 相关代码
@Bean public Queue taxiOverQueue() { Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", TAXI_DEAD_QUEUE_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", TAXI_DEAD_KEY); return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build(); }
2.6 打车超时处理
用户通过调用打车服务将数据放进RabbitMQ的死信队列进行延时操作,等待一段时间后,正常的业务处理还没有处理到我们发起的数据,将会进行超时处理,通过通知服务将我们的处理结构通过websocket方式推送到我们的客户端。
2.6.1 打车超时实现
在创建队列的时候配置死信交换器并设置队列的“x-message-ttl”属性
@Bean public Queue taxiDeadQueue() { return new Queue(TAXI_DEAD_QUEUE,true); } @Bean public Queue taxiOverQueue() { Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", TAXI_DEAD_QUEUE_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", TAXI_DEAD_KEY); // x-message-ttl 声明队列的TTL args.put("x-message-ttl", 30000); return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build(); }
这样所有被投递到该队列的消息都最多不会存活超过30s,超时后的消息会被投递到死信交换器