【Netty 从成神到升仙系列 五】Netty 的责任链真有这么神奇吗?

简介: 【Netty 从成神到升仙系列 五】Netty 的责任链真有这么神奇吗?

责任链模式

1. 什么是责任链模式?

我们来看下百度百科 责任链模式 的定义:

责任链模式是一种设计模式。在责任链模式里,很多对象由每一个对象对其下家的引用而连接起来形成一条链。请求在这个链上传递,,直到链上的某一个对象决定处理此请求。发出这个请求的客户端并不知道链上的哪一个对象最终处理这个请求,这使得系统可以在不影响客户端的情况下动态地重新组织和分配责任。

通俗的理解,当我们的业务需要一连串的校验,比如下面的商品上架流程:

正常情况下,对于这种链式的校验,我们一般采用 **责任链 ** 的方式去进行处理。

2. 责任链实现 demo

首先,我们肯定要有一个上下文,这个上下文用来进行链表的执行:

/**
 * handler上下文,维护链表和负责链表的执行
 */
public class HandlerChainContext {
    HandlerChainContext next; // 持有下一个节点:单链表
    AbstractHandler handler;
    public HandlerChainContext(AbstractHandler handler) {
        this.handler = handler;
    }
    // 真正执行 handler
    void handler(Object arg0) {
        this.handler.doHandler(this, arg0);
    }
    // 继续执行下一个
    void runNext(Object arg0) {
        if (this.next != null) {
            this.next.handler(arg0);
        }
    }
}

处理器的抽象类:

abstract public class AbstractHandler {
    abstract void doHandler(HandlerChainContext context, Object arg0);
}

具体的实现类:

public class Handler1 extends AbstractHandler {
    @Override
    void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
        System.out.println("--首次购买打9折!");
        arg0 = new DecimalFormat("0.00").format(Double.valueOf(arg0.toString()) * 0.9);
        System.out.println("折扣后金额:" + arg0);
        // 继续执行下一个
        handlerChainContext.runNext(arg0);
    }
}

Pipeline 类:

public class Pipeline {
    // 持有上下文(可以获得需要的数据,属性)
    public HandlerChainContext context = new HandlerChainContext(new AbstractHandler() {
        @Override
        void doHandler(HandlerChainContext context, Object arg0) {
            System.out.println("折扣前" + arg0);
            context.runNext(arg0);
        }
    });
    // 添加责任链
    public void addLast(AbstractHandler handler) {
        HandlerChainContext next = context;
        while (next.next != null) {
            next = next.next;
        }
        next.next = new HandlerChainContext(handler);
    }
    // 开始调用
    public void requestProcess(Object arg0) {
        context.handler(arg0);
    }
    public static void main(String[] args) {
        Pipeline p = new Pipeline();
        p.addLast(new Handler1());
        p.addLast(new Handler2());
        p.addLast(new Handler3());
        p.requestProcess("150");
    }
}

3. Netty的责任链

我们回顾下 Netty 的责任链使用方法:

.childHandler(new ChannelInitializer<SocketChannel>() {
    protected void initChannel(SocketChannel ch) throws Exception {
        /*添加到该子channel的pipeline的尾部*/
        ch.pipeline().addLast(serverHandler);
        ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
        ch.pipeline().addLast(new ChannelOutboundHandlerAdapter());
    }
});

从这个地方我们可以看到,Netty 为每一个客户端的连接 SocketChannel 都分配了一个独立的 pipeline

ChannelPipeline支持运行态动态的添加或者删除 ChannelHandler。

当业务高峰期需要对系统做拥塞保护时,就可以根据当前的系统时间进行判断,如果处于业务高峰期,则动态地将系统拥塞保护ChannelHandler添加到当前的ChannelPipeline中,当高峰期过去之后,就可以动态删除拥塞保护 ChannelHandler了。

ChannelPipeline是线程安全的,但是 ChannelHandler 却不是线程安全的。

我们的 ChannelPipeline 执行是单线程的,因此是安全的,但 ChannelHandler 自己实现的可以利用多线程来跑,但是这种是不安全的。

至于 ChannelPipeline 是单线程怎么知道的,后面我们会解答

3.1 Netty 的责任链设计

3.1.1 责任处理器的接口

pipeline 中的所有的 handler 的顶级抽象接口,它规定了所有的 handler 统一要有添加,移除,异常捕获的行为。

public interface ChannelHandler {
    // 添加
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
   // 删除
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
}

当然,NettyChannelHandler 做了进一步的划分,分别为 ChannelInboundHandlerAdapter(入栈)ChannelOutboundHandlerAdapter(出栈)


3.1.2 添加删除责任处理器的接口

ChannelPipeline 中有各种各样的处理 Handler 的接口:


public interface ChannelPipeline {
   // 头部
    ChannelPipeline addFirst(String name, ChannelHandler handler);
    // 尾部
    ChannelPipeline addLast(String name, ChannelHandler handler);
}

默认的实现是 DefaultChannelPipeline,具体的实现见代码(核心就是双向链表的添加和删除

3.1.3 上下文

pipeline中的handler被封装进了上下文中。

如下, 通过上下文,可以轻松拿到当前节点所属的channel,以及它的线程执行器


621f03c750e84a50af1a1633b4fff6eb.png

3.1.4 责任终止机制
  • 在pipeline中的任意一个节点,只要我们不手动的往下传播下去,这个事件就会终止传播在当前节点
  • 传播使用:super.channelRead(ctx, msg); 直接调用 ChannelHandlerContext 上下文里面的下一个执行 Handler。如下所示:


3.1.5 线程安全与否

在我们执行过程中,每一个连接服务端的客户端,都会有一个 SocketChannel,而当客户端接入时,我们的 ServerBootstrap 会为每一个 SocketChannel 分配其相应的 pipeline

当有新的链接进来时,会启动如下:

// 这个方法会在服务端检查当前有连接事件时,调用 unsafe.read() 方法,创建 SocketChannel
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    //接受新连接创建SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());
    if (ch != null) {
        buf.add(new NioSocketChannel(this, ch));
        return 1;
    }
    return 0;
}
/**
 * Creates a new instance.
 */
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    // 新的 Pipeline
    pipeline = newChannelPipeline();
}

我们之前聊过,每一个 SocketChannel,我们都会为其分配一个 EventLoop 来进行执行。

所以,我们每一个 SocketChannel 中的 Pipeline 在运行时,可以理解成是一个单线程的(线程安全)。

但我们可以在每个 ChannelHandler 里面进行一个多线程的操作

比如,客户端传进来的请求:客户端发送1000个正则表达式,需要服务端尽快的返回命中结果

如果我们单单的使用 EventLoop 来使用,那么整个 Pipeline 中是一个单线程的处理模式,会极大的影响 RT

于是,我们新起一个 Handler,在 Handler 里面新起一个线程池,来执行 1000Task,如下:

public class Test {
    public static ThreadPoolExecutor service = new ThreadPoolExecutor(
            10, 10, 3, TimeUnit.MINUTES, new LinkedBlockingQueue(100));
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Random random = new Random();
        List<Future<Boolean>> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            Future<Boolean> path = service.submit(new Task(random.nextInt(10)));
            list.add(path);
        }
        for (Future<Boolean> future : list) {
            System.out.println(future.get());
        }
    }
}
/**
 * 计算当前的正则匹配
 * 这里使用数字代替
 */
class Task implements Callable {
    int value;
    public Task(int value) {
        this.value = value;
    }
    @Override
    public Object call() throws Exception {
        if (value <= 5) {
            return true;
        } else {
            return false;
        }
    }
}

这种我们整体线程流转如下:

注意:这种虽然可以提高效率,但必须要保证多线程情况下 ChannelHandlerContext 上下文的线程安全,不然,有可能出现线程不安全的情况发生。

4. 总结

总体而言,NettyPipline 设计主要使用了 责任链 的设计模式,通过 headtail 两个指针来对 Handler 进行操作。

其中,个人感觉比较重要的应该是我们的 服务端(NioServerSocketChannel) 会为每一个 客户端(NIOSocketChannel) 分配一个独一无二的 Pipline,当然,每个 Pipline 里面的 Handler 还是固定的。

另外,在 Pipline 运行过程中,我们执行完全由 EventLoop 来进行执行,也就是我们通常所说的 EventLoopGroup 线程组里面的线程。

同样,我们之前也讲过关于 Netty 为每一个通道(NIOSocketChannel)分配 EventLoop 的策略(2次幂等 | 轮询)。

最后,注意一下 Handler 是固定的,所以我们一般不会在 Hanler 里面做一些线程并发的情况。

这次的文章就到这里了,后面会更新下 Netty 接受客户端连接的整个过程,再更新一波 Netty 整个的链路图,Netty 的章节应该就可以结束了。

下一个模块的话,准备更新完之前落下的 Kafka 的源码模块。








相关文章
|
消息中间件 缓存 网络协议
【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
|
消息中间件 缓存 安全
【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
|
XML 缓存 JSON
【Netty 从成神到升仙系列 四】让我们一起探索 Netty 中的零拷贝
【Netty 从成神到升仙系列 四】让我们一起探索 Netty 中的零拷贝
【Netty 从成神到升仙系列 四】让我们一起探索 Netty 中的零拷贝
|
缓存 JSON 网络协议
【Netty 从成神到升仙系列 三】Netty 凭什么成为国内最流行的网络通信框架?
【Netty 从成神到升仙系列 三】Netty 凭什么成为国内最流行的网络通信框架?
【Netty 从成神到升仙系列 三】Netty 凭什么成为国内最流行的网络通信框架?
|
监控 Java Linux
【Netty 从成神到升仙系列 二】你真的懂 NIOEventLoop 嘛?
【Netty 从成神到升仙系列 二】你真的懂 NIOEventLoop 嘛?
【Netty 从成神到升仙系列 二】你真的懂 NIOEventLoop 嘛?
|
存储 前端开发 Java
【Netty 从成神到升仙系列 一】Netty 服务端的启动源码剖析(一)
【Netty 从成神到升仙系列 一】Netty 服务端的启动源码剖析(一)
【Netty 从成神到升仙系列 一】Netty 服务端的启动源码剖析(一)
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13281 1
|
6天前
|
机器学习/深度学习 缓存 算法
netty源码解解析(4.0)-25 ByteBuf内存池:PoolArena-PoolChunk
netty源码解解析(4.0)-25 ByteBuf内存池:PoolArena-PoolChunk
|
1月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
54 1
|
1月前
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
65 0