集群通信组件tribes之通道拦截器

简介: 拦截器应该可以说是一个很经典的设计模式,它有点类似于过滤器,当某信息从一个地方流向目的地的过程中,可能需要统一对信息进行处理,如果考虑到系统的可扩展性和灵活性通常就会使用拦截器模式,它就像一个个关卡被设置在信息流动的通道中,并且可以按照实际需要添加和减少关卡。

拦截器应该可以说是一个很经典的设计模式,它有点类似于过滤器,当某信息从一个地方流向目的地的过程中,可能需要统一对信息进行处理,如果考虑到系统的可扩展性和灵活性通常就会使用拦截器模式,它就像一个个关卡被设置在信息流动的通道中,并且可以按照实际需要添加和减少关卡。Tribes为了在应用层提供对源消息统一处理的渠道引入通道拦截器,用户在应用层只需要根据自己需要添加拦截器即可,例如,压缩解压拦截器、消息输出输入统计拦截器、异步消息发送器等等。

拦截器的数据流向示意图可以参考前面的tribes简介章节,数据从IO层流向应用层,中间就会经过一个拦截器栈,应用层处理完就会返回一个ack给发送端表示已经接收并处理完毕(消息可靠级别为SYNC_ACK),下面尝试用最简单一些代码和伪代码说明tribes的拦截器实现,旨在领会拦截器如何设计而并非具体的实现。最终实现的功能如图所示,最底层的协调者ChannelCoordinator永远作为第一个加入拦截器栈的拦截器,往上则是按照添加顺序排列,且每个拦截器的previousnext分别指向前一个拦截器和下一个拦截器。

 

① 定义拦截器接口

public interface ChannelInterceptor{

    public void setNext(ChannelInterceptor next) ;

    public ChannelInterceptor getPrevious();

    public void sendMessage(ChannelMessage msg);

    public void messageReceived(ChannelMessage msg);

}

② 定义一个基础拦截器,提供一些公共的操作,由于拦截器执行完后要触发下个拦截器,所以把触发工作统一抽离到基础类里面完成,当然里面必须包含前一个和后一个拦截器的引用。

public class ChannelInterceptorBase implements ChannelInterceptor {

    private ChannelInterceptor next;

    private ChannelInterceptor previous;

    public ChannelInterceptorBase() {

    }

    public final void setNext(ChannelInterceptor next) {

        this.next = next;

    }

    public final ChannelInterceptor getNext() {

        return next;

    }

    public final void setPrevious(ChannelInterceptor previous) {

        this.previous = previous;

    }

    public final ChannelInterceptor getPrevious() {

        return previous;

    }

    public void sendMessage(ChannelMessage msg) {

        if (getNext() != null) getNext().sendMessage(msg,);

    }

    public void messageReceived(ChannelMessage msg) {

        if (getPrevious() != null) getPrevious().messageReceived(msg);

    }

}

③ 压缩解压拦截器,此拦截器负责按一定算法压缩和解压处理。

public class GzipInterceptor extends ChannelInterceptorBase {

    public void sendMessage(ChannelMessage msg){

            compress the msg;

            getNext().sendMessage(msg);

    }

    public void messageReceived(ChannelMessage msg) {

            decompress the msg;

            getPrevious().messageReceived(msg);

    }

}

④ 最底层的协调器,直接与网络IO做交互

public class ChannelCoordinator extends ChannelInterceptorBase{

    public ChannelCoordinator() {

    }

    public void sendMessage(ChannelMessage msg) throws ChannelException {

        Network IO Send

    }

public void messageReceived(ChannelMessage msg) {

    Network IO READ

        super.messageReceived(msg);

    }

}

⑤ 测试类

public class Test{

public void main(String[] args){

ChannelCoordinator coordinator = new ChannelCoordinator();

GzipInterceptor gzipInterceptor = new GzipInterceptor();

coordinator.setNext(null);

coordinator.setPrevious(gzipInterceptor);

gzipInterceptor.setPrevious(null);

gzipInterceptor .setNext(coordinator);

gzipInterceptor.sendMessage(msg);

coordinator.messageReceived(msg);

}

    Tribes的拦截器整体设计就如上面,整个拦截器的执行顺序如下,当执行写操作时,数据流向GzipInterceptor -> ChannelCoordinator -> Network IO;当执行读操作时,数据流向则为Network IO -> ChannelCoordinator -> GzipInterceptor。理解了整个设计原理后对于tribes的整体把握将会更加深入。


点击订购作者《Tomcat内核设计剖析》



目录
相关文章
|
5月前
|
存储 C++
gRPC 四模式之 双向流RPC模式
gRPC 四模式之 双向流RPC模式
175 0
|
6月前
组件间的通信
组件间的通信
|
自然语言处理 负载均衡 算法
Nacos架构与原理 - 通信通道
Nacos架构与原理 - 通信通道
125 1
|
负载均衡 网络协议 安全
【服务网格架构】Envoy 架构概览(1):术语,线程模型,监听器和网络(L3 / L4)过滤器和HTTP连接管理
【服务网格架构】Envoy 架构概览(1):术语,线程模型,监听器和网络(L3 / L4)过滤器和HTTP连接管理
|
负载均衡 网络协议 安全
Envoy 架构概览(1):术语,线程模型,监听器和网络(L3 / L4)过滤器和HTTP连接管理
Envoy 架构概览(1):术语,线程模型,监听器和网络(L3 / L4)过滤器和HTTP连接管理
|
JSON JavaScript 小程序
【小程序】组件通信
【小程序】组件通信
209 0
【小程序】组件通信
|
存储 算法 安全
设备通过mqtt通道的动态预注册
在物联网平台为产品开启动态注册功能后,直连设备可使用一型一密安全认证方式完成动态注册,通过MQTT通信协议连接物联网平台。设备先基于TLS建立与物联网平台的连接,获取MQTT连接所需的设备密钥,再断开连接,然后重新建立MQTT连接进行通信。
522 1
|
算法 物联网
设备通过mqtt通道的动态免预注册
一型一密认证方式下,同一产品下所有设备可以烧录相同的设备标志信息,即所有设备包含相同的产品证书(ProductKey和ProductSecret)。设备发送激活请求时,物联网平台会进行身份确认,认证通过后,下发设备接入所需信息。
442 0
|
网络协议 前端开发 Java
【剖析 | SOFARPC 框架】之SOFARPC 连接管理与心跳剖析
本文介绍连接管理的策略和SOFARPC中连接管理与心跳机制的实现,希望通过这篇文章,大家对此有一个了解,如果对其中有疑问的,也欢迎留言与我们讨论。
1817 0
|
网络安全 数据中心 网络架构
操作高速通道 配置健康检查,只需四步!
阿里云每两秒从健康检查IP地址向本地数据中心中的客户侧互联IP发送一个ping报文,如果某条物理专线上连续8个ping报文都无法得到回复,则将流量切换至另一条链路。
868 0