Mina框架IoHandler与IoProcessor详解

简介:

我们已经知道,IoHandler是开发网络应用程序的时候,与实际业务逻辑相关的组件,即属于Mina核心框架之外的应用层组件。从Mina 官方文档上,我们几乎没有看到对IoProcessor的说明,实际上IoProcessor对实际使用Mina框架的开发人员透明,无需你去了解它的实现逻辑,它在Mina中用来处理实际的I/O操作。
我们分析的思路是,先分别对IoHandler与IoProcessor进行单独分析,然后再阐述它们之间的不同以及联系。

  • IoHandler

当我们通过IoSession执行相关操作的时候,如写数据,这些事件会触发Mina框架抽象的IoService实例,从而调用Mina框架底层的相关组件进行处理。这时,配置的IoHandler就被用来处理Mina所触发的相关事件,处理这些事件的操作被抽象出来。
实际上,IoHandler的继承层次非常简单,也说明了基于Mina框架开发实际网络应用程序,对业务逻辑的处理也还是相对比较容易的。看一下 IoHandler的继承层次,如图所示:

e83cf2f896d8c86744cefd2061c29e929e491352


IoHandler接口所定义的操作,一共定义了7个处理事件的操作,如下所示:

1 public interface IoHandler {
2 void sessionCreated(IoSession session) throws Exception;
3 void sessionClosed(IoSession session) throws Exception;
4 void sessionIdle(IoSession session, IdleStatus status) throws Exception;
5 void exceptionCaught(IoSession session, Throwable cause) throws Exception;
6 void messageReceived(IoSession session, Object message) throws Exception;
7 void messageSent(IoSession session, Object message) throws Exception;
8 }

因为IoHandler是一个接口,所以如果使用该接口我们就必须实现所有的方法,MIna通过使用IoHandlerAdapter来默认实现 IoHandler接口,并在IoHandlerAdapter中全部给出空实现,如果我们要开发自己的IoHandler,可以继承自IoHandlerAdapter,根据需要选择重写指定的处理Mina事件的方法,而对于你不感兴趣的方法就默认不给予实现(默认使用 IoHandlerAdapter的空实现)。
那么,Mina调用IoHandler的时机是什么呢?又是如何调用的呢?
其实,根据Mina的架构,我们知道,在客户端主动发起I/O操作请求以后,会等待Mina触发相应的事件,在经过一组IoFilter之后,在 IoFilter链的最后一个IoFilter被调用将要结束的时候,会调用我们注册的IoHandler实现,经过处理来满足实际业务逻辑需要。我们可以在DefaultIoFilterChain中看到一个内部IoFilter实现类TailFilter,在该类里调用了 IoHandler封装的逻辑,代码如下所示:

01 private static class TailFilter extends IoFilterAdapter {
02 @Override
03 public void sessionCreated(NextFilter nextFilter, IoSession session) throwsException {
04 try {
05 session.getHandler().sessionCreated(session);
06 } finally {
07 // Notify the related future.
08 ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE);
09 if (future != null) {
10 future.setSession(session);
11 }
12 }
13 }
14
15 @Override
16 public void sessionOpened(NextFilter nextFilter, IoSession session) throwsException {
17 session.getHandler().sessionOpened(session);
18 }
19
20 @Override
21 public void sessionClosed(NextFilter nextFilter, IoSession session) throwsException {
22 AbstractIoSession s = (AbstractIoSession) session;
23 try {
24 s.getHandler().sessionClosed(session);
25 } finally {
26 try {
27 s.getWriteRequestQueue().dispose(session);
28 } finally {
29 try {
30 s.getAttributeMap().dispose(session);
31 } finally {
32 try {
33 // Remove all filters.
34 session.getFilterChain().clear();
35 } finally {
36 if (s.getConfig().isUseReadOperation()) {
37 s.offerClosedReadFuture();
38 }
39 }
40 }
41 }
42 }
43 }
44
45 @Override
46 public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception {
47 session.getHandler().sessionIdle(session, status);
48 }
49
50 @Override
51 public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception {
52 AbstractIoSession s = (AbstractIoSession) session;
53 try {
54 s.getHandler().exceptionCaught(s, cause);
55 } finally {
56 if (s.getConfig().isUseReadOperation()) {
57 s.offerFailedReadFuture(cause);
58 }
59 }
60 }
61
62 @Override
63 public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
64 AbstractIoSession s = (AbstractIoSession) session;
65 if (!(message instanceof IoBuffer)) {
66 s.increaseReadMessages(System.currentTimeMillis());
67 } else if (!((IoBuffer) message).hasRemaining()) {
68 s.increaseReadMessages(System.currentTimeMillis());
69 }
70
71 try {
72 session.getHandler().messageReceived(s, message);
73 } finally {
74 if (s.getConfig().isUseReadOperation()) {
75 s.offerReadFuture(message);
76 }
77 }
78 }
79
80 @Override
81 public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
82 session.getHandler().messageSent(session, writeRequest.getMessage());
83 }
84
85 @Override
86 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
87 nextFilter.filterWrite(session, writeRequest);
88 }
89
90 @Override
91 public void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
92 nextFilter.filterClose(session);
93 }
94 }

在上面的代码中,正好调用了IoHandler接口定义的7个处理事件的方法。如果你还想知道IoFilterChain实例是在何时被调用的, 可以跟踪Mina的源码。
* IoProcessor

基于网络的端到端的通信,Mina通过一个IoSession对象(任何获取到一个IoSession实例的持有者都可以)来间接执行 I/O操作,如发送数据、读取数据等。在Mina内部,当一个IoSession调用对应的方法,则直接触发IoProcessor来对指定的事件进行处理,它基于Ractor模式来简化网络传输的实现(事实上,Java NIO就是基于Reactor模式实现,属于同步非阻塞IO模式)。
我们看一下IoProcessor相关类的继承关系,如图所示:

0707099462bcd195b23f3d35db9943133fa04910

看到上面AbstractPollingConnectionlessIoAcceptor,我们知道,它同时也是IoService的实现,用于网络通信中的服务端,处理接收请求。可见,对于基于UDP/IP的传输,IoAcceptor和IoProcessor的处理是实现在一起的,可能实际处理的逻辑本身比较简单,放到一起能够更好地表达它们之间的紧密联系。
下面我们看一下IoProcessor接口的定义,如下所示:

01 public interface IoProcessor<S extends IoSession> {
02 boolean isDisposing();
03 boolean isDisposed();
04 void dispose();
05 void add(S session);
06 void flush(S session);
07 void write(S session, WriteRequest writeRequest);
08 void updateTrafficControl(S session);
09 void remove(S session);
10 }

我们根据上面接口总结一下,一个IoProcessor实际处理了如下内容:

  1. 添加IoSession实例,主要是使用IoProcessor内部的一个IoSession队列newSessions来存放。
  2. Flush指定IoSession实例到IoProcessor内部的flushingSessions队列。
  3. Write一个IoSession实例对应的WriteRequest,主要是将一个WriteRequest加入到 IoSession实例所持有的WriteRequestQueue writeRequestQueue队列。至于加入到队列的请求何时处理,其实我们可以参考IoProcessor的实现AbstractPollingIoProcessor 类,其内部有一个org.apache.mina.core.polling.AbstractPollingIoProcessor.Processor 线程类,这个线程会在调用一个IoProcessor的方法public final void add(S session)的时候被启动(实际,在调用dispose()、add(S session)、remove(S session)这三个方法的时候,都会尝试着去启动一个Processor线程,如果没有启动则会启动这个线程),然后反复循环检测并处理队列中的写请求。
  4. 当IoProcessor关闭与一个IoSession实例实例相关的连接,则会将这个IoSession实例从 removingSessions队列中移除。
  5. 控制处理事件的通信量,主要是控制读写:如果没有注册读操作(SelectionKey.OP_READ),则注册一个,否则当一个 读操作已经就绪,则进行读数据的处理;如果没有注册写操作(SelectionKey.OP_WRITE),则注册一个,否则当一个写操 作已经就绪,则进行写数据的处理。
  6. 释放所有与IoProcessor有关的资源。

总结

经过上面的对比,我们已经能够知道IoHandler与IoProcessor的本质区别。
IoHandler是在IoFilterChain执行中最后一个IoFilter中被调用,比如,经过IoFilterChain进行 codec、logging等等操作之后,已经将通信层的数据转化成我们需要的业务对象数据,这时就可以调用我们定义的IoHandler实 现来进行处理。
IoProcessor是与实际的Socket或Channel相关的操作紧密相关的,也就是说,它是支撑Mina进行处理底层实际I/O请 求的处理器。

目录
相关文章
|
3天前
|
负载均衡 Java 调度
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)
经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。
43 0
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)
|
3天前
|
网络协议 Java 容器
《跟闪电侠学Netty》阅读笔记 - ChannelHandler 生命周期
《跟闪电侠学Netty》阅读笔记 - ChannelHandler 生命周期
46 0
《跟闪电侠学Netty》阅读笔记 - ChannelHandler 生命周期
|
12月前
|
存储 缓存 负载均衡
计网 - 怎样实现 RPC 框架
计网 - 怎样实现 RPC 框架
79 0
|
JSON Dubbo 网络协议
|
编解码 移动开发 网络协议
Mina框架剖析
最近使用Mina开发一个Java的NIO服务端程序,因此也特意学习了Apache的这个Mina框架。 首先,Mina是个什么东西?看下官方网站(http://mina.apache.org/)对它的解释: Apache的Mina(Multipurpose Infrastructure Networked Applications)是一个网络应用框架,可以帮助用户开发高性能和高扩展性的网络应用程序;它提供了一个抽象的、事件驱动的异步API,使Java NIO在各种传输协议(如TCP/IP,UDP/IP协议等)下快速高效开发。
154 0
|
消息中间件 编解码 网络协议
《干翻Netty》, 写一个底层通信框架需要考虑哪些?
《干翻Netty》, 写一个底层通信框架需要考虑哪些?
《干翻Netty》, 写一个底层通信框架需要考虑哪些?
|
前端开发
netty系列之:一口多用,使用同一端口运行不同协议
netty系列之:一口多用,使用同一端口运行不同协议
|
存储 编解码 网络协议
Netty常用招式——ChannelHandler与编解码(一)
Netty常用招式——ChannelHandler与编解码(一)
183 0
Netty常用招式——ChannelHandler与编解码(一)
|
消息中间件 编解码 移动开发
Netty常用招式——ChannelHandler与编解码(二)
Netty常用招式——ChannelHandler与编解码(二)
148 0
Netty常用招式——ChannelHandler与编解码(二)