netty 事件驱动(二)

简介: 上一篇文件浅析了Netty中的事件驱动过程,这篇主要写一下异步相关的东东。 首先,什么是异步了? 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。 异步的好处是不会造成阻塞,在高并发情形下会更稳定和更高的吞吐量。   说到Netty中的异步,就不得不提ChannelFuture。Netty中

上一篇文件浅析了Netty中的事件驱动过程,这篇主要写一下异步相关的东东。

首先,什么是异步了?

异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

异步的好处是不会造成阻塞,在高并发情形下会更稳定和更高的吞吐量。

 

说到Netty中的异步,就不得不提ChannelFuture。Netty中的IO操作是异步的,包括bind、write、connect等操作会简单的返回一个ChannelFuture,调用者并不能立刻获得结果。

当future对象刚刚创建时,处于非完成状态。可以通过isDone()方法来判断当前操作是否完成。通过isSuccess()判断已完成的当前操作是否成功,getCause()来获取已完成的当前操作失败的原因,isCancelled()来判断已完成的当前操作是否被取消。

调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。

其实同步的阻塞和异步的非阻塞可以直接通过代码看出:

这是一段阻塞的代码:

复制代码
        printTime("开始connect: ");
        // Start the connection attempt.
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

        // Wait until the connection is closed or the connection attempt fails.
        future.getChannel().getCloseFuture().awaitUninterruptibly();

        printTime("connect结束: ");
        // Shut down thread pools to exit.
        bootstrap.releaseExternalResources();
复制代码

这段代码的输出结果是:

开始connect: 2013-07-17 14:45:28

connect结束: 2013-07-17 14:45:29

很明显的可以看出,connect操作导致整段代码阻塞了大概1秒。

 

以下这段是异步非阻塞的代码:

复制代码
        printTime("开始connect: ");
        // Start the connection attempt.
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

        future.addListener(new ChannelFutureListener()
        {
            public void operationComplete(final ChannelFuture future)
                throws Exception
            {
                printTime("connect结束: ");
            }
        });

        printTime("异步时间: ");

        // Shut down thread pools to exit.
        bootstrap.releaseExternalResources();
复制代码

输出结果是:

开始connect: 2013-07-17 14:50:09
异步时间: 2013-07-17 14:50:09
connect结束: 2013-07-17 14:50:09

可以明显的看出,在异步模式下,上面这段代码没有阻塞,在执行connect操作后直接执行到printTime("异步时间: "),随后connect完成,future的监听函数输出connect操作完成。

关于同步的阻塞和异步的非阻塞可以打一个很简单的比方,A向B打电话,通知B做一件事。

在同步模式下,A告诉B做什么什么事,然后A依然拿着电话,等待B做完,才可以做下一件事;

在异步模式下,A告诉B做什么什么事,A挂电话,做自己的事。B做完后,打电话通知A做完了。

 

如上面代码所显示的,ChannelFuture同时提供了阻塞和非阻塞方法,接下来就简单的分析一下各自是怎么实现的。

阻塞方法是await系列,这些方法要小心翼翼的使用,不可以在handler内调用这些方法,否则会造成死锁。

复制代码
public ChannelFuture awaitUninterruptibly() {
        boolean interrupted = false;
        synchronized (this) {
            //循环等待到完成
            while (!done) {
                checkDeadLock();
                waiters++;
                try {
                    wait();
                } catch (InterruptedException e) {
                    //不允许中断
                    interrupted = true;
                } finally {
                    waiters--;
                }
            }
        }

        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        return this;
    }
复制代码

一个标志位,一个while循环,代码简洁明了。

非阻塞则是添加监听类ChannelFutureListener,通过覆盖ChannelFutureListener的operationComplete执行业务逻辑。

复制代码
public void addListener(final ChannelFutureListener listener) {
        if (listener == null) {
            throw new NullPointerException("listener");
        }

        boolean notifyNow = false;
        synchronized (this) {
            if (done) {
                notifyNow = true;
            } else {
                if (firstListener == null) {
                    //listener链表头
                    firstListener = listener;
                } else {
                    if (otherListeners == null) {
                        otherListeners = new ArrayList<ChannelFutureListener>(1);
                    }
                    //添加到listener链表中,以便操作完成后遍历操作
                    otherListeners.add(listener);
                }

               ......

        if (notifyNow) {
            //通知listener进行处理
            notifyListener(listener);
        }
    }
复制代码

然后当操作完成后直接遍历listener链表,把每个listener取出来执行。以setSuccess为例,如下:

复制代码
public boolean setSuccess() {
        synchronized (this) {
            // Allow only once.
            if (done) {
                return false;
            }

            done = true;
            //唤醒所有等待
            if (waiters > 0) {
                notifyAll();
            }
        }

        //通知所有listener
        notifyListeners();
        return true;
    }
复制代码
复制代码
private void notifyListeners() {
        if (firstListener != null) {
            //执行listener表头
            notifyListener(firstListener);
            firstListener = null;

            //挨个执行其余的listener
            if (otherListeners != null) {
                for (ChannelFutureListener l: otherListeners) {
                    notifyListener(l);
                }
                otherListeners = null;
            }
        }
    }
复制代码

其实这部分代码的逻辑很简单,就是注册回调函数,当操作完成后自动调用回调函数,就达到了异步的效果。
目录
相关文章
|
前端开发
netty 事件驱动(一)
本篇文章着重于浅析一下Netty的事件处理流程,Netty版本为netty-3.6.6.Final。 Netty定义了非常丰富的事件类型,代表了网络交互的各个阶段。并且当各个阶段发生时,触发相应的事件交给pipeline中定义的handler处理。 举个例子,如下一段简单的代码: ChannelFactory factory = new NioServ
2241 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13497 1
|
6月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
127 1
|
11月前
|
NoSQL Java Redis
跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。 原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
160 1
|
6月前
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
154 0
|
6月前
|
编解码 安全 前端开发
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
247 0
|
分布式计算 网络协议 前端开发
【Netty底层数据交互源码】
【Netty底层数据交互源码】
|
Java 容器
【深入研究NIO与Netty线程模型的源码】
【深入研究NIO与Netty线程模型的源码】
|
编解码 弹性计算 缓存
Netty源码和Reactor模型
Netty源码和Reactor模型
99 0
|
设计模式 监控 前端开发
第 10 章 Netty 核心源码剖析
第 10 章 Netty 核心源码剖析
130 0