作者:闲鱼技术-码宝
前景概要
在我们日常的开发过程中经常会碰到无血缘关系的流水账逻辑
(数据补全, 通知逻辑等), 这个时候我们通常会采用异步化的方式去处理从而加快响应速度. 与此同时, 伴随着上下游依赖的服务变多, 对应的可能也会产生一系列的问题包括不限于问题难以排查
, 性能难以保证
等. 在闲鱼也不例外:
- 补齐的
串行操作
(这里指串行去实现多次网络io)存在性能瓶颈, 严重影响到了接口rt. - 逻辑单元化程度不好, 逻辑"自由飞翔"导致代码可读性较差, 单元逻辑相关指标也难以衡量.
基于上述的描述, 我们想就此抽象出一个异步化组件去解决对应的问题. 那先给自己定几个小目标吧!
- 可监控: 每个单元逻辑生命周期内的所有状态和行为(成功, 失败, rt等关键指标)都在监控范围内.
- 容灾性: 有fallback机制, 当逻辑单元内出现大量异常(通常指的是超时)的时候能堪大用.
- 零成本接入: 接入使用方便, 做到拆箱即用.
举个栗子
不超时场景
假设逻辑单元L有a, b, c三个任务, 执行时长分别为t1=1, t2=2, t3=3, 超时时间为4. 如图所示:
现在希望的结果是:
- 该逻辑单元L执行时间为3
- a任务执行成功1次耗时为1, b任务执行成功1次耗时为2, c任务执行成功1次耗时为3
超时场景
假设假设逻辑单元L有a, b, c三个任务, 执行时长分别为t1=3, t2=5, t3=6, 超时时间为4. 如图所示:
现在希望的结果是:
- 该逻辑单元L执行时间为4
- a任务执行成功1次耗时为3, b任务执行失败, c任务执行失败
- b和c线程在失败后线程停止, 不继续占用线程池资源
方案
注意下面所有的方案都是围绕超时场景
展开
方案之akka
- 当前服务获取对应的LogicActor
- LogicActor获取当前的所有UnitActor(a, b, c)
- 使用
tell
方式分别向a, b, c发消息, 注意这个过程是异步的 - a正常返回, b和c均超时
- 不管超时与否LogicActor都会去countDown当前的任务
- 当所有任务都完成的时候就回去combine数据
- 最后返回结果数据
LogicActor-逻辑单元actor, 负责分发任务和合并数据
UnitActor-具体任务的actor
优点
- 消息驱动.
- 不需要额外的线程池管理 & 异常容错.
- 优雅停止正在执行的worker(PoinsonPill).
- 熟悉akka的人可以较为容易的上手.
缺点
- 为了达到目标需要一层封装, 并且也不容易.
- 大大增加系统复杂度.
- 对于不熟悉scala/akka的人来说简直是灾难.
scala/akka的学习成本直接导致了"零成本接入"的目标无法达成, 故放弃.
方案之rxjava
使用rxjava来实现的逻辑
- 定义一个CountDownLatch来做一个超时控制
- 定义三个task
- 通过rxjava去实现异步化处理
- 给一个固定大小为10的线程池去处理
- reduce中去做累加操作
- latch做一个统一卡点
这边的timeout起到了整体卡点的效果, 但是并不知道timeout是哪个业务导致的
优点
- 响应式编程
- 代码量精简
- 屏蔽线程池的操作
- 封装了
#timeout
和#onErrorReturn
方法, 已存在超时处理模块无需二度封装
缺点
- 为了达到目标需要一层封装
- 存在超时处理模块, 但是业务单元的植入会破坏rxjava原有的封装且同时改造难度较大
虽然满足"容灾性"(
onErrorReturn
), 但是无法做到"可监控", 因为超时之后我们没有办法知道是哪个业务超时了, 故放弃.
完整方案-基于JUC包下的封装
难点
如何实现逻辑单元的监控
- 集团内部针对全局的
traceId
进行封装(ThreadLocal), 使用线程池会丢失上下文
, 导致对应的日志无法被追踪到. - 存在
bizCode
概念, 方便监控逻辑单元以及逻辑单元内的每一个操作.
如何清理超时线程, 使其不占用线程池资源
上述Callable
和Runnable
执行超时时, 需要将其停止, 让其不继续占用线程池资源资源.
并发(超时)如何控制
控制超时的工具类有很多
- CountDownLatch: java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)超时不会抛出异常只能通过boolean去判断是否超时, 意味着不能通过短路的异常流方式去处理.
CyclicBarrier: java.util.concurrent.CyclicBarrier#await(long, java.util.concurrent.TimeUnit)对于异常的封装并不友好, 例
现在有四个线程a, b, c, d. 超时时间均2s. 其中d线程执行了3s, 此时会发生下面的情况
- 第一个完成任务进入barrier.await的会抛出timeout异常, 同时其他四个任务都会抛出broken barrier
- 超时任务最后会完成后续的动作, 持续占用资源
- Semaphore: 不满足使用条件, 不过多赘述.
- Future: java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit), futureList会串行读取, 导致T>=Max(t1,t2,t3)
一个典型的错误例子
- 提交future到对应的list中
- 通过遍历去获取对应的结果
- 获取到结果之后去做累加的操作
- 最后输出结果和执行的时间
虽然上面三个任务是并发执行, 但由于
future.get(long, TimeUnit)
是阻塞, timeout会在此处失效. 如上图, 最后耗时可能是3+2+1=6s
.
整体方案
核心类图如下:
- ConcurrentCallable是对外使用的类
- BizBaseCallable继承了Callable, 封装了内部的上下文和对应的业务单元
- BizCountDownLatch为了解决业务超时监控的问题, 加上了业务单元池的属性
- BizBaseCallable封装了ctx表示上下文, 保证在多线程的情况下上下文也不会丢失(针对集团内部的traceId的兼容)
CountDownLatch的改造-可监控
继承CountDownLatch重写#await()和countDown()方法
- 补充业务单元概念(bizCode)
- 对应的bizSet实际上是一个未完成的业务池子, 完成一个业务去掉对应的业务单元
- 重写countDown方法, 通过移除业务单元去表示
已完成
- await超时的时候抛出异常, 模拟短路环境, 上层统一处理
- 超时异常中丢出还未完成的任务
上层监控逻辑封装
ConcurrentCallable的超时处理-容灾性
监控逻辑
中包含了超时逻辑的处理- Callable获取返回值的时候, get超时时长给
0
: 该完成的都完成了, 没完成的就超时了. - 通过future.cancel去中断正在执行的callable
- 默认返回fallback(兜底操作), fallback默认实现为
renturn null;
. - 这里需要塞一个默认值, 这样可以方便外层调用的时候通过
list.forEach(l -> deal(l))
去处理列表逻辑
再看上面那个例子-零成本接入
- 模拟上下文的置入, TestCtx中封装了一个ThreadLocal
- 4000L表示该任务的超时时间为4000ms, 该逻辑单元取名为"l"
- 往组件中添加任务a, b, c
- 最后展示了获取到数据之后的处理
- a结束后会在
countDownLatch
中移除对应的单元 - 超时的时候会强行结束b和c
至此就完成了一个可监控
, 容灾性
, 零成本接入
的异步化组件.
小结
综上所述最后选择了"JUC(升级版)"作为我们的异步化组件.
本文主要介绍了闲鱼内部使用的异步化调度的组件. 业务场景衍生出的业务组件可能不具备很强的普适性, 但是希望从文中的背景, 目标, 方案选型中给出值得借鉴的思考.