《Netty 权威指南》—— NIO创建的TimeServer源码分析

简介:

声明:本文是《Netty 权威指南》的样章,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文。

我们将在TimeServer例程中给出完整的NIO创建的时间服务器源码:

public class TimeServer {

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
	int port = 8080;
	if (args != null && args.length > 0) {
	    try {
		port = Integer.valueOf(args[0]);
	    } catch (NumberFormatException e) {
		// 采用默认值
	    }
	}
	MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
	New Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}

 

我们对NIO创建的TimeServer进行下简单分析,8-15行跟之前的一样,设置监听端口。16-17行创建了一个被称为MultiplexerTimeServer的多路复用类,它是个一个独立的线程,负责轮询多路复用器Selctor,可以处理多个客户端的并发接入,现在我们继续看MultiplexerTimeServer的源码:

public class MultiplexerTimeServer implements Runnable {

    private Selector selector;

    private ServerSocketChannel servChannel;

    private volatile boolean stop;

    /**
     * 初始化多路复用器、绑定监听端口
     * 
     * @param port
     */
    public MultiplexerTimeServer(int port) {
	try {
	    selector = Selector.open();
	    servChannel = ServerSocketChannel.open();
	    servChannel.configureBlocking(false);
	    servChannel.socket().bind(new InetSocketAddress(port), 1024);
	    servChannel.register(selector, SelectionKey.OP_ACCEPT);
	    System.out.println("The time server is start in port : " + port);
	} catch (IOException e) {
	    e.printStackTrace();
	    System.exit(1);
	}
    }

    public void stop() {
	this.stop = true;
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
	while (!stop) {
	    try {
		selector.select(1000);
		Set<SelectionKey> selectedKeys = selector.selectedKeys();
		Iterator<SelectionKey> it = selectedKeys.iterator();
		SelectionKey key = null;
		while (it.hasNext()) {
		    key = it.next();
		    it.remove();
		    try {
			handleInput(key);
		    } catch (Exception e) {
			if (key != null) {
			    key.cancel();
			    if (key.channel() != null)
				key.channel().close();
			}
		    }
		}
	    } catch (Throwable t) {
		t.printStackTrace();
	    }
	}

	// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
	if (selector != null)
	    try {
		selector.close();
	    } catch (IOException e) {
		e.printStackTrace();
	    }
    }

    private void handleInput(SelectionKey key) throws IOException {

	if (key.isValid()) {
	    // 处理新接入的请求消息
	    if (key.isAcceptable()) {
		// Accept the new connection
		ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
		SocketChannel sc = ssc.accept();
		sc.configureBlocking(false);
		// Add the new connection to the selector
		sc.register(selector, SelectionKey.OP_READ);
	    }
	    if (key.isReadable()) {
		// Read the data
		SocketChannel sc = (SocketChannel) key.channel();
		ByteBuffer readBuffer = ByteBuffer.allocate(1024);
		int readBytes = sc.read(readBuffer);
		if (readBytes > 0) {
		    readBuffer.flip();
		    byte[] bytes = new byte[readBuffer.remaining()];
		    readBuffer.get(bytes);
		    String body = new String(bytes, "UTF-8");
		    System.out.println("The time server receive order : "
			    + body);
		    String currentTime = "QUERY TIME ORDER"
			    .equalsIgnoreCase(body) ? new java.util.Date(
			    System.currentTimeMillis()).toString()
			    : "BAD ORDER";
		    doWrite(sc, currentTime);
		} else if (readBytes < 0) {
		    // 对端链路关闭
		    key.cancel();
		    sc.close();
		} else
		    ; // 读到0字节,忽略
	    }
	}
    }

    private void doWrite(SocketChannel channel, String response)
	    throws IOException {
	if (response != null && response.trim().length() > 0) {
	    byte[] bytes = response.getBytes();
	    ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
	    writeBuffer.put(bytes);
	    writeBuffer.flip();
	    channel.write(writeBuffer);
	}
    }
}

由于这个类相比于传统的Socket编程稍微复杂一些,在此我们进行详细分析,我们从如下几个关键步骤讲解多路复用处理类:

14-26行为构造方法,在构造方法中进行资源初始化,创建多路复用器Selector、ServerSocketChannel,对Channel和TCP参数进行配置,例如将ServerSocketChannel设置为异步非阻塞模式,它的backlog设置为1024。系统资源初始化成功后将ServerSocketChannel注册到Selector,监听SelectionKey.OP_ACCEPT操作位;如果资源初始化失败,例如端口被占用则退出

39-61行在线程的run方法的while循环体中循环遍历selector,它的休眠时间为1S,无论是否有读写等事件发生,selector每隔1S都被唤醒一次,selector也提供了一个无参的select方法。当有处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectionKey集合,我们通过对就绪状态的Channel集合进行迭代,就可以进行网络的异步读写操作

76-83行处理新接入的客户端请求消息,根据SelectionKey的操作位进行判断即可获知网络事件的类型,通过ServerSocketChannel的accept接收客户端的连接请求并创建SocketChannel实例,完成上述操作后,相当于完成了TCP的三次握手,TCP物理链路正式建立。注意,我们需要将新创建的SocketChannel设置为异步非阻塞,同时也可以对其TCP参数进行设置,例如TCP接收和发送缓冲区的大小等,作为入门的例子,例程没有进行额外的参数设置

84-109行用于读取客户端的请求消息,首先创建一个ByteBuffer,由于我们事先无法得知客户端发送的码流大小,作为例程,我们开辟一个1M的缓冲区。然后调用SocketChannel的read方法读取请求码流,注意,由于我们已经将SocketChannel设置为异步非阻塞模式,因此它的read是非阻塞的。使用返回值进行判断,看读取到的字节数,返回值有三种可能的结果:

1)      返回值大于0:读到了字节,对字节进行编解码;

2)      返回值等于0:没有读取到字节,属于正常场景,忽略;

3)      返回值为-1:链路已经关闭,需要关闭SocketChannel,释放资源。

当读取到码流以后,我们进行解码,首先对readBuffer进行flip操作,它的作用是将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作。然后根据缓冲区可读的字节个数创建字节数组,调用ByteBuffer的get操作将缓冲区可读的字节数组拷贝到新创建的字节数组中,最后调用字符串的构造函数创建请求消息体并打印。如果请求指令是”QUERY TIME ORDER”则把服务器的当前时间编码后返回给客户端,下面我们看看如果异步发送应答消息给客户端。

111-119行将应答消息异步发送给客户端,我们看下关键代码,首先将字符串编码成字节数组,根据字节数组的容量创建ByteBuffer,调用ByteBuffer的put操作将字节数组拷贝到缓冲区中,然后对缓冲区进行flip操作,最后调用SocketChannel的write方法将缓冲区中的字节数组发送出去。需要指出的是,由于SocketChannel是异步非阻塞的,它并不保证一次能够把需要发送的字节数组发送完,此时会出现“写半包”问题,我们需要注册写操作,不断轮询Selector将没有发送完的ByteBuffer发送完毕,可以通过ByteBuffer的hasRemain()方法判断消息是否发送完成。此处仅仅是个简单的入门级例程,没有演示如何处理“写半包”场景,后续的章节会有详细说明。

使用NIO创建TimeServer服务器完成之后,我们继续学习如何创建NIO客户端。首先还是通过时序图了解关键步骤和过程,然后结合代码进行详细分析。

文章转自 并发编程网-ifeve.com

目录
相关文章
|
30天前
|
设计模式
Lettuce的特性和内部实现问题之Netty NIO的性能优于BIO的问题如何解决
Lettuce的特性和内部实现问题之Netty NIO的性能优于BIO的问题如何解决
|
14天前
|
网络协议 C# 开发者
WPF与Socket编程的完美邂逅:打造流畅网络通信体验——从客户端到服务器端,手把手教你实现基于Socket的实时数据交换
【8月更文挑战第31天】网络通信在现代应用中至关重要,Socket编程作为其实现基础,即便在主要用于桌面应用的Windows Presentation Foundation(WPF)中也发挥着重要作用。本文通过最佳实践,详细介绍如何在WPF应用中利用Socket实现网络通信,包括创建WPF项目、设计用户界面、实现Socket通信逻辑及搭建简单服务器端的全过程。具体步骤涵盖从UI设计到前后端交互的各个环节,并附有详尽示例代码,助力WPF开发者掌握这一关键技术,拓展应用程序的功能与实用性。
36 0
|
1月前
|
传感器 物联网 微服务
Netty的源码分析和业务场景
【8月更文挑战第2天】Netty 是一款高性能的异步事件驱动网络框架,其源码深邃且复杂。通过采用Reactor模式与主从多线程设计,Netty能高效处理网络事件。例如,`NioEventLoop`负责I/O事件及任务执行,内置线程循环机制。内存管理方面,Netty提供高效内存池与`ByteBuf`类来减少开销并优化内存操作。在业务场景上,Netty广泛应用于分布式系统、微服务架构中的高效通信,以及实时通信场景如在线游戏和直播中的大量并发连接处理,同时也在物联网领域发挥重要作用,确保设备与服务器间稳定快速的数据传输。
|
1月前
|
网络协议 大数据 Linux
Netty的源码分析和业务场景
通过深入分析 Netty 的源码和理解其在不同业务场景下的应用,开发者可以更好地利用这一强大的网络编程框架,构建高效、稳定且可扩展的网络应用。
204 1
|
21天前
|
存储 网络协议 Java
【Netty 神奇之旅】Java NIO 基础全解析:从零开始玩转高效网络编程!
【8月更文挑战第24天】本文介绍了Java NIO,一种非阻塞I/O模型,极大提升了Java应用程序在网络通信中的性能。核心组件包括Buffer、Channel、Selector和SocketChannel。通过示例代码展示了如何使用Java NIO进行服务器与客户端通信。此外,还介绍了基于Java NIO的高性能网络框架Netty,以及如何用Netty构建TCP服务器和客户端。熟悉这些技术和概念对于开发高并发网络应用至关重要。
40 0
|
4月前
|
Java 应用服务中间件 API
从零手写实现 tomcat-06-servlet bio/thread/nio/netty 池化处理
该文介绍了逐步改进的网络服务器实现,从最初的 BIO 基础版到使用线程池的 BIO+Thread,再到 NIO 版本和 NIO+Thread,最后展示了一个使用 Netty 框架的简洁实现。文章旨在说明如何解决阻塞问题,并对比不同模型的优劣,最终推荐使用 Netty 以简化 NIO 编程。
|
4月前
|
编解码 网络协议 Java
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
|
4月前
|
移动开发 编解码 网络协议
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
|
4月前
|
设计模式 网络协议 Java
Java NIO 网络编程 | Netty前期知识(二)
Java NIO 网络编程 | Netty前期知识(二)
108 0
|
4月前
|
监控 网络协议 调度
Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析
Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析
315 0