模型背景
在传统的MVC的三层架构模式下,业务请求执行路径一般是外部 → web → service → dao → 数据库。
通常,我们在最外的视图层(view)处理日志打印、接口方法性能监控、全局异常处理、参数校验等;在业务层(service)处理复杂的业务逻辑;在持久层(dao)处理与数据库交互的增删改查操作。
优点
经典的MVC三层架构在大多数单体应用的开发中是最普适、最合理的一种架构设计。职责分明地剥离各模块的处理,使得项目可工程化控制和配置,降低了模块间的耦合度,能够快速搭建应用并且相对灵活地进行工程模块及代码的调整。
问题
在三层架构模式下,它是典型的贫血模型,在编码实践中不难发现一个问题:一旦遇到复杂逻辑业务,业务层(service)的代码会呈现指数级膨胀,最终为了封装业务创造了业务层,却无法控制和梳理清楚业务逻辑,导致代码难以组织和维护,变更风险飙升,最终失控。
// 糟糕的代码实践1:填鸭式声明接口与实现接口,不适合复杂逻辑,面向过程,无前瞻性思考和设计
├─Interface
│ AService.java
│ BService.java
│ CService.java
├─implement
│ AServiceImpl.java
│ BServiceImpl.java
│ CServiceImpl.java
大量的项目实践经验是,如果没有从一开始对业务需求有很好的前瞻性设计,只是填鸭式进行接口声明,接口实现,不去解析逻辑背后的拆解和治理之道,编码的过程的后续是非常痛苦的,维护代码的时间成本、对过去凌乱的代码逻辑的理解成本需要付出太多太多。
// 糟糕的代码实践2:一个包罗万象的万能方法
private void method(){
// all logics
// 此处省略成百上千行
}
代码完全面向业务过程组织的话,编码就成了杂乱无章似的缠电线。我们在开发过程中应该竭力避免这种偷懒行为,一旦后续业务逻辑复杂或者持续变更调整,这种“简易编程”由于没有打好基础,后续也只能在浮沙之上搭筑,可维护性差,测试难度大,变更对已有业务影响严重。简易的代码实践无非是面向业务方法过程,所有的函数方法串连完成了这个业务逻辑。
// 糟糕的代码实践3:尽管有意识的包含大量私有方法去分隔范围边界保持方法的单一原则,但是类代码膨胀,堆积方法体的串联
private void method(){
// all logics
method1();
method2();
method3();
}
private void method1(){
//logics 1
}
private void method2(){
//logics 2
}
private void method3(){
//logics 3
}
除此之外,还有一些朴素的代码实践是不断在某个类中进行需求迭代,尽管可以通过一些重构技巧进行方法拆分,但这仅仅是方法层级的优化,所有业务逻辑都封装在了service某个独立类中,最终导致类代码膨胀,大量的私有方法虽然闭合了短小的逻辑边界,但也随之膨胀起来。最终需求迭代到一定程度逻辑已经无法在代码中梳理清楚,最终不得不割肉般推倒重做。
// 糟糕的代码实践4:随意地见缝插针穿插逻辑,逻辑分散,没有核心线条和链路
private void method(){
// all logics
method1();
//赋值
setField(A);
method2();
//运算
calculate(B,C,D);
method3();
//其他逻辑
other(S);
}
private void method1(){
//logics 1
}
private void method2(){
//logics 2
}
private void method3(){
//logics 3
}
我有过一次印象深刻的重构,是一个非常复杂的查询列表功能,里面要针对不同type类型进行数据检索,不同type的数据构建不同,除了查询DB,还要进行缓存过滤,之后还要根据特定type进行数据封装和返回。由于需求迭代很快,代码评审和走查略有缺失,每次调整需求都是见缝插针,业务逻辑没有故事主线,后续维护人员也没有进行改善和优化,最终导致这个列表的逻辑处理非常蹩脚,变更产生风险很大。重构可能时间很短,但是消耗这段业务逻辑和逻辑知识背景,以及回归成本及风险都是难以想象。
分析
实践问题 |
解决方向 |
方法体冗长 |
以类为维度对方法模块化拆分、插拔式编程 |
类代码膨胀 |
拆解类方法,方法边界提炼为类边界,提高类内聚性 |
逻辑分散 、缺少业务流程主线 |
构建业务处理信息流,明细逻辑链路 |
在大量项目实践的捶打中,我一直在探索解决复杂业务逻辑的破解之道,试图找到一种比较好的方案去对这种失控的业务层进行治理,抽象出一个较为普适的模型,让代码实践得更优雅,让维护工作更灵活,真正地支持可扩展,面向对象编程。
模型发展
简易版:朴素责任链(Responsible Chain)
起初,在做一个商品佣金试算的业务场景,需求入参是商品原价,通过运营已配置的价格策略来试算商品价格经过折扣或补贴后的价格返回。
这个业务逻辑不是很复杂,业务逻辑处理主线是单一的,通过责任链模式构建流程主线,保持对业务逻辑数据流向的控制即可。
实现较为简单,是典型的朴素责任链设计模式构建,汇总如下:
- 构建责任链执行顺序[参数校验→策略拉取→过滤试算→返回结果]
- 入参传入开始,在责任链的执行链条中执行,出参返回结束
增强版:责任链(Responsible Chain) + 线程变量(ThreadLocal)
之后,接手了一波大促运营活动需求,业务需求和场景处理变得复杂起来,任务列表展示、抽奖逻辑、奖励结算、活动秒杀抢等。这里用活动抽奖的逻辑来阐述下业务流程:
随着业务复杂度的提升,发现如下问题:
复杂度问题 |
解决方向 |
解决手段 |
处理链条不再单一制,出现分叉逻辑 |
编排处理引擎执行体节点构建不同执行链分而治之 |
Handler构建Processor处理器节点,Processor构建执行链Chain,一到多Chain构建执行环境Context供业务方法调用 |
参与处理的业务体增多,各种异常捕获和处理 |
全局异常捕获、运行时异常定义 |
方法入口、出口环绕 |
业务节点之前信息孤岛,消息传递及线程安全问题 |
引入线程变量,抽象入参、出参、临时参及自定义参数 线程级生命周期驻留,随拿随用,用完即清除,对内存和GC友好 |
基于ThreadLocal构建线程级通信载体,驻留、传递、清除统一管理 |
业务流程越复杂,处理链条越长 |
需要支持Fast-Fail机制 |
执行节点Check,可以尽早结束 |
成型版:执行环境(Context)+ 处理引擎(Processor)+ 执行器(Executor)+ 执行单元(Handler)
经历了简易版、增强版的实践和不断优化提炼,逐步有了现在的成型版。
模型组成
组成 |
描述 |
功能 |
Context |
运行环境 |
维护运行环境的构建、执行,业务方法的总载体 |
ContextHolder |
环境变量 |
存储业务线程环境变量,如入参、出参、中间变量 |
Process |
处理器 |
业务处理节点,通过一个执行链存在 |
Executor |
执行器 |
Process处理器的内核,决定和影响并驱动Handler的执行 |
Handler |
执行体 |
具体业务处理的执行体 |
模型初始化
- Processor进行构建各自Handler模型,是责任链中的节点,串联起来形成业务逻辑处理流程,这里是单向流转的,因为业务逻辑处理本身就是流向单一的,不是Netty处理网络请求那样需要入站、出站。这里Processor执行链定义了业务逻辑的顺序。
- Executor是Processor内核负责驱动Handler执行,Executor就像CPU一样,指挥Handler执行,一般可以选择顺序执行、批量执行等。这里Executor定义了单体Processor中驱动Handler执行的方式。比如批量查询可以选择并行驱动Handler,如果Handler之间存在顺序依赖则顺序执行,顺序执行的话又像是Processor责任链中再嵌套一层Handler级别责任链,如果是多级责任链的话,要根据业务场景识别领域范围进行分级,更广泛的语义领域可以提炼Handler到Processor,需要具体处理的建议驻留在Handler进行处理。
- Handler是最原子的组成部分,负责具体业务逻辑处理,由于Handler是最小的处理单元,因此它是可插拔的,同理Processor被视为Handler的集合体,如果业务逻辑足够内聚,Processor也是可以可插拔复用的
- Context是运行环境,负责构建Processor并执行它
- ContextHolder是整个线程变量的持有者,它的底层实现是ThreadLocal,它是天然线程安全的,这里封装一个ThreadLocal<Map<Object,Object>>,一个Map绑定一个线程,Map的key可以供当前线程定义唯一的索引值,Map的value则可以存放多样化且具体的传值对象,这里可以根据具体使用场景决定是否需要序列化
模型运行时
- Request进入Context执行环境前,绑定ContextHolder
- Processor是整个执行链,从Processor-1、Processor-2、Processor-3,直到Processor-N。terminate是一个执行链之间共同持有线程级变量,用来控制Processor的执行判断,引入terminate的原因是执行链可能很长,但是可能在执行链队首因为异常或不合法等原因得到结果而不必继续执行后续Processor,这里使用volatile进行了修饰,保持terminate的可见性以保证可以在第一时间发现信号变化,执行链next方法会询问terminate状态以确保是否需要提前结束或继续执行。
- Executor根据顺序执行或并发执行驱动Handler
- Handler被驱动后执行封装的业务逻辑
- ContextHolder中的线程变量仅对当前线程执行有效,Handler、Processor都可以持有它并进行线程变量的存放、获取,在每层业务方法执行完会进行remove回收以防止内存泄露
模型实现
模型结构
│ BaseProcessor.java 基础处理器,可以作为Processor父类继承使用
│ IContext.java 执行环境定义,包含环境构建、环境运行、环境销毁
│ IHandler.java 无入参和出参Handler定义
│ IProcessor.java 定义执行器Processor,默认是支持入参、出参
│ IRequestHandler.java 定义支持入参Handler
│ IRequestResponseContext.java 定义支持入参、出参上下文执行环境Context
│ IRequestResponseFutureHandler.java 定义支持入参,异步出参Handler
│ IRequestResponseHandler.java 定义支持入参,出参Handler
│ IResponseFutureHandler.java 定义支持异步出参Handler
│ IResponseHandler.java 定义支持出参Handler
│
├─handler 所有的Handler构建成RequestCheckProcessor,内聚所有校验业务的逻辑
│ RequestAuthCheckHandler.java 校验授权
│ RequestChannelCheckHandler.java 校验渠道
│ RequestParamCheckHandler.java 校验请求参数
│ RequestTokenCheckHandler.java 校验授权码
│
├─holder
│ ContextHolder.java 环境变量封装
│
└─processor
RequestCheckProcessor.java 一个请求入参的校验Processor实现
模型代码
BaseProcessor
/**
* @author: guanjian
* @date: 2020/07/10 13:49
* @description: 基础处理器
*/
@Scope("prototype")
@Component("baseProcessor")
public class BaseProcessor<T> implements IProcessor, Iterable {
private final static Logger LOGGER = LoggerFactory.getLogger(BaseProcessor.class);
/**
* 线程变量
*/
@Resource
protected ContextHolder contextHolder;
@Resource
protected ApplicationContext springContext;
private final static String TERMINATE = "terminate";
/**
* 当前处理器
*/
protected IProcessor processor;
/**
* 后继处理器
*/
protected IProcessor successor;
/**
* 持有的执行器集合
*/
protected LinkedList<T> handlers = Lists.newLinkedList();
public IProcessor getProcessor() {
LOGGER.debug("[BaseProcessor] processor is {}.", processor.getClass().getName());
return processor;
}
public void setProcessor(IProcessor processor) {
this.processor = processor;
}
public IProcessor getSuccessor() {
LOGGER.debug("[BaseProcessor] successor is {}.", successor.getClass().getName());
return successor;
}
public void setSuccessor(IProcessor successor) {
this.successor = successor;
}
public boolean hasSuccessor() {
return null != successor;
}
public void processSuccessor() {
if (isTerminated()) {
LOGGER.debug("[BaseProcessor] terminate is stop status , it will stop all processors.");
return;
}
if (!hasSuccessor()) return;
getSuccessor().process();
}
public void terminate() {
LOGGER.debug("[BaseProcessor] terminate works , it will stop all processors.");
contextHolder.bindLocal(TERMINATE, Boolean.TRUE);
}
public T getHandler() {
Assert.notEmpty(handlers, "handlers cant not be null.");
LOGGER.debug("[BaseProcessor] handler is {}.", handlers.get(0).getClass().getName());
return handlers.get(0);
}
public LinkedList<T> getHandlers() {
return handlers;
}
public void setHandlers(LinkedList<T> handlers) {
this.handlers = handlers;
}
@Override
public void configurate() {
}
@Override
public void process() {
}
protected void attach(T handler) {
handlers.add(handler);
}
protected void append(T handler) {
handlers.addLast(handler);
}
@Override
public Iterator iterator() {
return new BaseProcessorIterator();
}
private class BaseProcessorIterator implements Iterator {
int index;
@Override
public boolean hasNext() {
if (index < handlers.size()) {
return true;
}
return false;
}
@Override
public Object next() {
if (hasNext()) {
LOGGER.debug("[BaseProcessor] handler is {}.", handlers.get(index).getClass().getName());
return handlers.get(index++);
}
return null;
}
}
protected boolean isTerminated() {
if (null == contextHolder.getLocal(TERMINATE)) return false;
return (boolean) contextHolder.getLocal(TERMINATE);
}
}
ContextHolder
/**
* @author: guanjian
* @date: 2020/07/08 9:31
* @description: 环境变量
*/
@Component("contextHolder")
public class ContextHolder<T, R> {
private final static Logger LOGGER = LoggerFactory.getLogger(ContextHolder.class);
/**
* 入参对象
*/
public final static String REQUEST_PARAM = "request_param";
/**
* 出参对象
*/
public final static String RESPONSE_PARAM = "response_param";
/**
* 传值对象
*/
public final static String TRANSMIT_PARAM = "transmit_param";
/**
* 线程变量
*/
private final static ThreadLocal<Map<Object, Object>> localVariable = ThreadLocal.withInitial(() -> Maps.newHashMap());
public void bindLocal(Object key, Object value) {
Objects.requireNonNull(key, "key can not be null");
Map holder = localVariable.get();
holder.put(key, value);
localVariable.set(holder);
LOGGER.debug("[ContextHolder] key={},value={} binded.", key, JSON.toJSONString(value));
}
public Object getLocal(Object key) {
if (CollectionUtils.isEmpty(localVariable.get())) return null;
Object value = localVariable.get().get(key);
LOGGER.debug("[ContextHolder] key={},value={} getted.", key, JSON.toJSONString(value));
return value;
}
public void bindRequest(T value) {
bindLocal(REQUEST_PARAM, value);
}
public T getRequest() {
return (T) localVariable.get().get(REQUEST_PARAM);
}
public void bindResponse(R value) {
bindLocal(RESPONSE_PARAM, value);
}
public R getResponse() {
return (R) localVariable.get().get(RESPONSE_PARAM);
}
public void bindTransmit(Object value) {
bindLocal(TRANSMIT_PARAM, value);
}
public Object getTransmit() {
return getLocal(TRANSMIT_PARAM);
}
public void clear() {
localVariable.remove();
}
}
Processor
这里列举其中一个Processor实现
/**
* @author: guanjian
* @date: 2020/07/10 13:57
* @description: 校验处理器
*/
@Scope("prototype")
@Component("requestCheckProcessor")
public class RequestCheckProcessor extends BaseProcessor {
@PostConstruct
@Override
public void configurate() {
configurateHandlers();
}
@Override
public void process() {
Iterator iterator = iterator();
while (iterator.hasNext()) {
IRequestResponseHandler<Request, Result> handler = (IRequestResponseHandler<Request, Result>) iterator.next();
if (!parseHandler(handler)) {
//force terminate all processorRequestAuthCheckHandler
terminate();
break;
}
}
processSuccessor();
}
private boolean parseHandler(IRequestResponseHandler<Request, Result> handler) {
boolean isContinue = true;
Result result = null;
try {
result = handler.execute((Request) contextHolder.getRequest());
if (!Result.isSuccess(result)) {
contextHolder.bindResponse(
Response.build(
Result.build(result.getCode(), result.getInfo())
)
);
isContinue = false;
}
} catch (Exception e) {
e.printStackTrace();
contextHolder.bindResponse(Response.unknowError());
isContinue = false;
}
return isContinue;
}
/**
* +-----------------+ +-------------------+ +-----------------+ +---------------+
* |param check| ----> |channel check | ----> | token check | ----> | auth check |
* +-----------------+ +------------------+ +-----------------+ +---------------+
*/
private void configurateHandlers() {
append(springContext.getBean(RequestParamCheckHandler.class));
append(springContext.getBean(RequestChannelCheckHandler.class));
append(springContext.getBean(RequestTokenCheckHandler.class));
append(springContext.getBean(RequestAuthCheckHandler.class));
}
}
Handler
这里列举其中一个Handler实现
/**
* @author: guanjian
* @date: 2020/07/10 13:41
* @description: 校验渠道
*/
@Component("requestChannelCheckHandler")
public class RequestChannelCheckHandler implements IRequestResponseHandler<Request, Result> {
private final static String REGION = CacheKeyConstants.ChannelConfig.CHANNEL_TOKEN_KEY;
private final static LocalCache LOCAL_CACHE = CaffeineCache.getInstance(REGION);
@Resource
private RedisCache redisCache;
@Override
public Result execute(Request request) {
if (LOCAL_CACHE.hasKey(request.getSource())) {
return Result.success();
}
Map<String, String> channelToken = redisCache.hGetAll(REGION);
if (channelToken.containsKey(request.getSource())) {
return Result.success();
}
return Result.build(
ResponseEnum.ResponseCode.A001.getCode(),
ResponseEnum.ResponseCode.A001.getInfo()
);
}
}
模型总结
解决方案最终都要投产到实战中检验,通过实践反馈不断优化和完善功能形成可落地的脚手架。