Mina2.0框架源码剖析(六)

简介:
上文的内容还有一些没有结尾,这篇补上。在ExpiringMap类中,使用了一个私有内部类ExpiringObject来表示待检查超时的对象,它包括三个域,键,值,上次访问时间,以及用于上次访问时间这个域的读写锁:

        private K key;
        private V value;
        private long lastAccessTime;
        private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();
而ExpiringMap中包括了下述几个变量:
  private final ConcurrentHashMap<K, ExpiringObject> delegate;//超时代理集合,保存待检查对象
    private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners;//超时监听者
    private final Expirer expirer;//超时检查线程
现在再来看看IoSession的一个抽象实现类AbstractIoSession。这是它的几个重要的成员变量:

    private IoSessionAttributeMap attributes;//会话属性映射图
    private WriteRequestQueue writeRequestQueue;//写请求队列
    private WriteRequest currentWriteRequest;//当前写请求
     当要结束当前会话时,会发送一个一个写请求CLOSE_REQUEST。而closeFuture这个CloseFuture会在连接关闭时状态被设置为”closed”,它的监听器是SCHEDULED_COUNTER_RESETTER。

close和closeOnFlush都是异步的关闭操作,区别是前者立即关闭连接,而后者是在写请求队列中放入一个CLOSE_REQUEST,并将其即时刷新出去,若要真正等待关闭完成,需要调用方在返回的CloseFuture等待

复制代码
public final CloseFuture close() {
        synchronized (lock) {
            if (isClosing()) {
                return closeFuture;
            } else {
                closing = true;
            }
        }
        getFilterChain().fireFilterClose();//fire出关闭事件
        return closeFuture;
    }

    public final CloseFuture closeOnFlush() {
        getWriteRequestQueue().offer(this, CLOSE_REQUEST);
        getProcessor().flush(this);
        return closeFuture;
    }
复制代码
     下面来看看读数据的过程:

复制代码
public final CloseFuture close() {
        synchronized (lock) {
            if (isClosing()) {
                return closeFuture;
            } else {
                closing = true;
            }
        }
        getFilterChain().fireFilterClose();//fire出关闭事件
        return closeFuture;
    }

    public final CloseFuture closeOnFlush() {
        getWriteRequestQueue().offer(this, CLOSE_REQUEST);
        getProcessor().flush(this);
        return closeFuture;
    }


    private Queue<ReadFuture> getReadyReadFutures() {//返回可被读数据队列
        Queue<ReadFuture> readyReadFutures =
            (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);//从会话映射表中取出可被读数据队列
        if (readyReadFutures == null) {//第一次读数据
            readyReadFutures = new CircularQueue<ReadFuture>();//构造一个新读数据队列
            Queue<ReadFuture> oldReadyReadFutures =
                (Queue<ReadFuture>) setAttributeIfAbsent(
                        READY_READ_FUTURES_KEY, readyReadFutures);
            if (oldReadyReadFutures != null) {
                readyReadFutures = oldReadyReadFutures;
            }
        }
        return readyReadFutures;
    }

    public final ReadFuture read() {//读数据
        if (!getConfig().isUseReadOperation()) {//会话配置不允许读数据(这是默认情况)
            throw new IllegalStateException("useReadOperation is not enabled.");
        }
        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();//获取已经可被读数据队列
        ReadFuture future;
        synchronized (readyReadFutures) {//锁住读数据队列
            future = readyReadFutures.poll();//取队头数据
            if (future != null) {
                if (future.isClosed()) {//关联的会话已经关闭了,让读者知道此情况
                    readyReadFutures.offer(future);
                }
            } else {
                future = new DefaultReadFuture(this);
                getWaitingReadFutures().offer(future); //将此数据插入等待被读取数据的队列,这个代码和上面的getReadyReadFutures类似,只是键值不同而已

            }
        }
        return future;
    }
复制代码
     再来看写数据到指定远端地址的过程,可以写三种类型数据:IoBuffer,整个文件或文件的部分区域,这会通过传递写请求给过滤器链条来完成数据向目的远端的传输。

复制代码
    public final WriteFuture write(Object message, SocketAddress remoteAddress) {
        FileChannel openedFileChannel = null;
        try 
        {
            if (message instanceof IoBuffer&& !((IoBuffer) message).hasRemaining()) 
            {// 空消息
                throw new IllegalArgumentException(
                "message is empty. Forgot to call flip()?");
            } 
            else if (message instanceof FileChannel) 
            {//要发送的是文件的某一区域
                FileChannel fileChannel = (FileChannel) message;
                message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
            }
            else if (message instanceof File) 
            {//要发送的是文件,打开文件通道
                File file = (File) message;
                openedFileChannel = new FileInputStream(file).getChannel();
                message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
            }
        } 
        catch (IOException e) 
        {
            ExceptionMonitor.getInstance().exceptionCaught(e);
            return DefaultWriteFuture.newNotWrittenFuture(this, e);
        }
        WriteFuture future = new DefaultWriteFuture(this); 
        getFilterChain().fireFilterWrite(
                new DefaultWriteRequest(message, future, remoteAddress)); //构造写请求,通过过滤器链发送出去,写请求中指明了要发送的消息,目的地址,以及返回的结果
 
//如果打开了一个文件通道(发送的文件的部分区域或全部),就必须在写请求完成时关闭文件通道
        if (openedFileChannel != null) {
            final FileChannel finalChannel = openedFileChannel;
            future.addListener(new IoFutureListener<WriteFuture>() {
                public void operationComplete(WriteFuture future) {
                    try {
                        finalChannel.close();//关闭文件通道
                    } catch (IOException e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    }
                }
            });
        }
        return future;//写请求成功完成
    }
复制代码
     最后,来看看一个WriteRequestQueue的实现,唯一加入的一个功能就是若在队头发现是请求关闭,则会去关闭会话。

复制代码
 private class CloseRequestAwareWriteRequestQueue implements WriteRequestQueue {
        private final WriteRequestQueue q;//内部实际的写请求队列
        public CloseRequestAwareWriteRequestQueue(WriteRequestQueue q) {
            this.q = q;
        }
        public synchronized WriteRequest poll(IoSession session) {
            WriteRequest answer = q.poll(session);
            if (answer == CLOSE_REQUEST) {
                AbstractIoSession.this.close();
                dispose(session);
                answer = null;
            }
            return answer;
        }
        public void offer(IoSession session, WriteRequest e) {
            q.offer(session, e);
        }
        public boolean isEmpty(IoSession session) {
            return q.isEmpty(session);
        }
        public void clear(IoSession session) {
            q.clear(session);
        }
        public void dispose(IoSession session) {
            q.dispose(session);
        }
    }
复制代码



本文转自Phinecos(洞庭散人)博客园博客,原文链接:http://www.cnblogs.com/phinecos/archive/2008/12/07/1349723.html,如需转载请自行联系原作者
目录
相关文章
|
6月前
|
Java Unix Linux
【Netty技术专题】「原理分析系列」Netty强大特性之Native transports扩展开发实战
当涉及到网络通信和高性能的Java应用程序时,Netty是一个强大的框架。它提供了许多功能和组件,其中之一是JNI传输。JNI传输是Netty的一个特性,它为特定平台提供了高效的网络传输。 在本文中,我们将深入探讨Netty提供的特定平台的JNI传输功能,分析其优势和适用场景。我们将介绍每个特定平台的JNI传输,并讨论其性能、可靠性和可扩展性。通过了解这些特定平台的JNI传输,您将能够更好地选择和配置适合您应用程序需求的网络传输方式,以实现最佳的性能和可靠性。
143 7
【Netty技术专题】「原理分析系列」Netty强大特性之Native transports扩展开发实战
|
负载均衡 算法 Java
Netty源码分析系列之五:Netty多线程模型
本文主要介绍了Netty的多线程模型,它采用的是Reactor模型。处理连接请求与处理IO操作的线程隔离。基于事件轮询监听,不断获取处于就绪状态的通道。其中Boss线程池的线程负责处理连接请求,接收到accept事件之后,将对应的socket进行封装生成NioSocketChannel对象,并将其提交到workBoss线程池中,处理IO的read以及write事件。
Netty源码分析系列之五:Netty多线程模型
|
设计模式 监控 前端开发
第 10 章 Netty 核心源码剖析
第 10 章 Netty 核心源码剖析
129 0
|
存储 缓存 编解码
Netty源码剖析之核心组件
NioEventLoop有以下核心功能。 - 开启Selector并初始化。 - 把ServerSocketChannel注册到Selector上。 - 处理各种I/O事件,如OP_ACCEPT、OP_CONNECT、OP_READ、 OP_WRITE事件。 - 执行定时调度任务。 - 解决JDK空轮询bug。
119 0
|
网络协议 Java Linux
Netty实战与源码剖析(一)——浅谈NIO编程
Netty实战与源码剖析(一)——浅谈NIO编程
362 0
Netty实战与源码剖析(一)——浅谈NIO编程
探秘Netty5:基于Netty自己动手实现RPC框架
大厨小鲜——基于Netty自己动手实现RPC框架 钱文品 Good news everyone! ​关注他 71 人赞了该文章 今天我们要来做一道小菜,这道菜就是RPC通讯框架。
|
缓存 编解码 网络协议
Netty框架入门(一)
Netty框架入门(一)
238 0
Netty框架入门(一)
|
分布式计算 网络协议 Java
Netty实现原理分析
Netty实现原理分析
257 0
Netty实现原理分析
|
消息中间件 编解码 分布式计算
Netty源码分析系列之二:为什么选择Netty
本文主要介绍了使用Netty的好处与原因,它支持的协议越来越多,紧随JDK更新。从下文开始我们要对Netty的使用以及源码进行介绍。
|
Web App开发 Java
探秘Netty6:基于Netty自己动手实现Web框架
大厨小鲜——基于Netty自己动手实现Web框架 钱文品 Good news everyone! ​关注他 17 人赞了该文章 上节课我们自己动手制作了一个RPC框架,本节课我们挑战一个稍有难度的一点的任务,手动制作一个Web框架。