责任链模式
1. 什么是责任链模式?
我们来看下百度百科 责任链模式 的定义:
责任链模式是一种设计模式。在责任链模式里,很多对象由每一个对象对其下家的引用而连接起来形成一条链。请求在这个链上传递,,直到链上的某一个对象决定处理此请求。发出这个请求的客户端并不知道链上的哪一个对象最终处理这个请求,这使得系统可以在不影响客户端的情况下动态地重新组织和分配责任。
通俗的理解,当我们的业务需要一连串的校验,比如下面的商品上架流程:
正常情况下,对于这种链式的校验,我们一般采用 **责任链 ** 的方式去进行处理。
2. 责任链实现 demo
首先,我们肯定要有一个上下文,这个上下文用来进行链表的执行:
/** * handler上下文,维护链表和负责链表的执行 */ public class HandlerChainContext { HandlerChainContext next; // 持有下一个节点:单链表 AbstractHandler handler; public HandlerChainContext(AbstractHandler handler) { this.handler = handler; } // 真正执行 handler void handler(Object arg0) { this.handler.doHandler(this, arg0); } // 继续执行下一个 void runNext(Object arg0) { if (this.next != null) { this.next.handler(arg0); } } }
处理器的抽象类:
abstract public class AbstractHandler { abstract void doHandler(HandlerChainContext context, Object arg0); }
具体的实现类:
public class Handler1 extends AbstractHandler { @Override void doHandler(HandlerChainContext handlerChainContext, Object arg0) { System.out.println("--首次购买打9折!"); arg0 = new DecimalFormat("0.00").format(Double.valueOf(arg0.toString()) * 0.9); System.out.println("折扣后金额:" + arg0); // 继续执行下一个 handlerChainContext.runNext(arg0); } }
Pipeline 类:
public class Pipeline { // 持有上下文(可以获得需要的数据,属性) public HandlerChainContext context = new HandlerChainContext(new AbstractHandler() { @Override void doHandler(HandlerChainContext context, Object arg0) { System.out.println("折扣前" + arg0); context.runNext(arg0); } }); // 添加责任链 public void addLast(AbstractHandler handler) { HandlerChainContext next = context; while (next.next != null) { next = next.next; } next.next = new HandlerChainContext(handler); } // 开始调用 public void requestProcess(Object arg0) { context.handler(arg0); } public static void main(String[] args) { Pipeline p = new Pipeline(); p.addLast(new Handler1()); p.addLast(new Handler2()); p.addLast(new Handler3()); p.requestProcess("150"); } }
3. Netty的责任链
我们回顾下 Netty 的责任链使用方法:
.childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { /*添加到该子channel的pipeline的尾部*/ ch.pipeline().addLast(serverHandler); ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter()); } });
从这个地方我们可以看到,Netty
为每一个客户端的连接 SocketChannel
都分配了一个独立的 pipeline
ChannelPipeline支持运行态动态的添加或者删除 ChannelHandler。
当业务高峰期需要对系统做拥塞保护时,就可以根据当前的系统时间进行判断,如果处于业务高峰期,则动态地将系统拥塞保护ChannelHandler添加到当前的ChannelPipeline中,当高峰期过去之后,就可以动态删除拥塞保护 ChannelHandler了。
ChannelPipeline是线程安全的,但是 ChannelHandler 却不是线程安全的。
我们的 ChannelPipeline 执行是单线程的,因此是安全的,但 ChannelHandler
自己实现的可以利用多线程来跑,但是这种是不安全的。
至于 ChannelPipeline 是单线程怎么知道的,后面我们会解答
3.1 Netty 的责任链设计
3.1.1 责任处理器的接口
pipeline 中的所有的 handler 的顶级抽象接口,它规定了所有的 handler 统一要有添加,移除,异常捕获的行为。
public interface ChannelHandler { // 添加 void handlerAdded(ChannelHandlerContext ctx) throws Exception; // 删除 void handlerRemoved(ChannelHandlerContext ctx) throws Exception; }
当然,Netty
对 ChannelHandler
做了进一步的划分,分别为 ChannelInboundHandlerAdapter(入栈)
和 ChannelOutboundHandlerAdapter(出栈)
3.1.2 添加删除责任处理器的接口
ChannelPipeline
中有各种各样的处理 Handler
的接口:
public interface ChannelPipeline { // 头部 ChannelPipeline addFirst(String name, ChannelHandler handler); // 尾部 ChannelPipeline addLast(String name, ChannelHandler handler); }
默认的实现是 DefaultChannelPipeline
,具体的实现见代码(核心就是双向链表的添加和删除)
3.1.3 上下文
pipeline中的handler被封装进了上下文中。
如下, 通过上下文,可以轻松拿到当前节点所属的channel,以及它的线程执行器
3.1.4 责任终止机制
- 在pipeline中的任意一个节点,只要我们不手动的往下传播下去,这个事件就会终止传播在当前节点
- 传播使用:
super.channelRead(ctx, msg);
直接调用ChannelHandlerContext
上下文里面的下一个执行Handler
。如下所示:
3.1.5 线程安全与否
在我们执行过程中,每一个连接服务端的客户端,都会有一个 SocketChannel
,而当客户端接入时,我们的 ServerBootstrap
会为每一个 SocketChannel
分配其相应的 pipeline
。
当有新的链接进来时,会启动如下:
// 这个方法会在服务端检查当前有连接事件时,调用 unsafe.read() 方法,创建 SocketChannel @Override protected int doReadMessages(List<Object> buf) throws Exception { //接受新连接创建SocketChannel SocketChannel ch = SocketUtils.accept(javaChannel()); if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } return 0; } /** * Creates a new instance. */ protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); // 新的 Pipeline pipeline = newChannelPipeline(); }
我们之前聊过,每一个 SocketChannel
,我们都会为其分配一个 EventLoop
来进行执行。
所以,我们每一个 SocketChannel
中的 Pipeline
在运行时,可以理解成是一个单线程的(线程安全)。
但我们可以在每个 ChannelHandler
里面进行一个多线程的操作
比如,客户端传进来的请求:客户端发送1000个正则表达式,需要服务端尽快的返回命中结果
如果我们单单的使用 EventLoop
来使用,那么整个 Pipeline
中是一个单线程的处理模式,会极大的影响 RT
于是,我们新起一个 Handler
,在 Handler
里面新起一个线程池,来执行 1000
个 Task
,如下:
public class Test { public static ThreadPoolExecutor service = new ThreadPoolExecutor( 10, 10, 3, TimeUnit.MINUTES, new LinkedBlockingQueue(100)); public static void main(String[] args) throws ExecutionException, InterruptedException { Random random = new Random(); List<Future<Boolean>> list = new ArrayList<>(); for (int i = 0; i < 100; i++) { Future<Boolean> path = service.submit(new Task(random.nextInt(10))); list.add(path); } for (Future<Boolean> future : list) { System.out.println(future.get()); } } } /** * 计算当前的正则匹配 * 这里使用数字代替 */ class Task implements Callable { int value; public Task(int value) { this.value = value; } @Override public Object call() throws Exception { if (value <= 5) { return true; } else { return false; } } }
这种我们整体线程流转如下:
注意:这种虽然可以提高效率,但必须要保证多线程情况下 ChannelHandlerContext
上下文的线程安全,不然,有可能出现线程不安全的情况发生。
4. 总结
总体而言,Netty
的 Pipline
设计主要使用了 责任链
的设计模式,通过 head
和 tail
两个指针来对 Handler
进行操作。
其中,个人感觉比较重要的应该是我们的 服务端(NioServerSocketChannel)
会为每一个 客户端(NIOSocketChannel)
分配一个独一无二的 Pipline
,当然,每个 Pipline
里面的 Handler
还是固定的。
另外,在 Pipline
运行过程中,我们执行完全由 EventLoop
来进行执行,也就是我们通常所说的 EventLoopGroup
线程组里面的线程。
同样,我们之前也讲过关于 Netty
为每一个通道(NIOSocketChannel)分配 EventLoop
的策略(2次幂等 | 轮询)。
最后,注意一下 Handler
是固定的,所以我们一般不会在 Hanler
里面做一些线程并发的情况。
这次的文章就到这里了,后面会更新下 Netty
接受客户端连接的整个过程,再更新一波 Netty
整个的链路图,Netty
的章节应该就可以结束了。
下一个模块的话,准备更新完之前落下的 Kafka
的源码模块。