netty是业界最流行的NIO框架之一,它的健壮型,功能,性能,可定制性和可扩展性都是首屈一指的,Hadoop的RPC框架Avro就使用了netty作为底层的通信框架,此外netty在互联网,大数据,网络游戏,企业应用,电信软件等众多行业都得到了成功的商业应用。正因为以上的一些特性,使得netty已经成为java NIO编程的首选框架。
其实使用netty很简单,直接将其jar包引入到工程中即可使用。 去 http://netty.io/网站上下载最新版本的jar包(由于官网上netty5已经被废弃,但是这里仍然使用netty5进行开发, 可以考虑从csnd下载),我这里下载的为:netty-5.0.0.Alpha1.tar.bz2。这其实是一个压缩文件,解压这个文件,取里面的所有类集合到一起的那个jar包netty-all-5.0.0.Alpha1.jar即可。另外还需要注意的是,我这里使用的jdk版本是1.8。

1 package com.rampage.netty.time; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 12 /** 13 * 时钟程序的服务器端 14 * @author zyq 15 * 16 */ 17 public class TimeServer { 18 19 public static void main(String[] args) throws Exception { 20 new TimeServer().bind(8080); 21 } 22 23 public void bind(int port) throws Exception { 24 // 配置服务器的NIO线程组 25 EventLoopGroup bossGroup = new NioEventLoopGroup(); 26 EventLoopGroup workerGroup = new NioEventLoopGroup(); 27 28 try { 29 ServerBootstrap bootStrap = new ServerBootstrap(); 30 31 // 进行链式调用(每一次调用的返回结果都是ServerBootstrap) 32 // group带两个参数第一个表示给父(acceptor)用的EventExecutorGroup(其实就是线程池) 33 // 第二个参数表示子(client)线程池 34 // channel方法可以带一个ServerChannel类来创建进行IO操作的通道。 35 // option方法给Channel定制对应的选项 36 // childHandler方法用来处理Channel中的请求 37 bootStrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) 38 .option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler()); 39 40 // 绑定端口,等待同步成功 41 // bind方法返回一个ChannelFuture类,就是相当于绑定端口并且创建一个新的channel 42 // sync方法会等待ChannelFuture的处理结束 43 ChannelFuture future = bootStrap.bind(port).sync(); 44 45 // 等待服务器监听端口关闭 46 future.channel().closeFuture().sync(); 47 } finally { 48 // 优雅地退出,释放线程资源 49 bossGroup.shutdownGracefully(); 50 workerGroup.shutdownGracefully(); 51 } 52 } 53 54 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 55 56 @Override 57 protected void initChannel(SocketChannel arg0) throws Exception { 58 arg0.pipeline().addLast(new TimeServerHandler()); 59 } 60 61 } 62 }

1 package com.rampage.netty.time; 2 3 import java.util.Date; 4 5 import io.netty.buffer.ByteBuf; 6 import io.netty.buffer.Unpooled; 7 import io.netty.channel.ChannelHandlerAdapter; 8 import io.netty.channel.ChannelHandlerContext; 9 10 /** 11 * 时间服务器的处理类,只有netty5中的ChannelHandlerAdapter中才有ChannelRead和ChannelReadComplete方法。 12 * @author zyq 13 * 14 */ 15 public class TimeServerHandler extends ChannelHandlerAdapter { 16 @Override 17 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 18 // netty中的ByteBuf类相当于jdk中的ByteBuffer类,但是功能更加强大 19 ByteBuf buf = (ByteBuf) msg; 20 21 // readableBytes返回缓冲区可读的字节数 22 byte[] req = new byte[buf.readableBytes()]; 23 24 // 将缓冲区的字节数复制到新的字节数组中去 25 buf.readBytes(req); 26 27 // 根据客户端传来的信息得到应答信息 28 String body = new String(req, "UTF-8"); 29 System.out.println("The time server receive order:" + body); 30 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 31 32 // 给客户端的回应 33 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 34 35 // 为了防止频繁唤醒Selector进行消息发送,Netty的write方法并不直接将消息写入到SocketChannel中,而是把消息放入到缓冲数组 36 ctx.write(resp); 37 } 38 39 @Override 40 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 41 // 将放到缓冲数组中的消息写入到SocketChannel中去 42 ctx.flush(); 43 } 44 45 @Override 46 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 47 ctx.close(); 48 } 49 50 51 }

1 package com.rampage.netty.time; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioSocketChannel; 11 12 /** 13 * 时钟程序的客户端 14 * @author zyq 15 * 16 */ 17 public class TimeClient { 18 19 public static void main(String[] args) throws Exception { 20 new TimeClient().connect("", 8080); 21 } 22 23 public void connect(String host, int port) throws Exception { 24 // 配置客户端NIO线程池 25 EventLoopGroup group = new NioEventLoopGroup(); 26 27 Bootstrap strap = new Bootstrap(); 28 try { 29 // 这里用了匿名内部类,各个函数的含义同Server端 30 strap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) 31 .handler(new ChannelInitializer<SocketChannel>() { 32 33 @Override 34 protected void initChannel(SocketChannel arg0) throws Exception { 35 arg0.pipeline().addLast(new TimeClientHandler()); 36 } 37 38 }); 39 40 // 发起异步连接操作 41 ChannelFuture future = strap.connect(host, port).sync(); 42 43 // 等待客户端关闭(注意调用的是closeFuture如果直接调用close会立马关闭) 44 future.channel().closeFuture().sync(); 45 } finally { 46 // 优雅的关闭 47 group.shutdownGracefully(); 48 } 49 } 50 }

1 package com.rampage.netty.time; 2 3 import java.util.logging.Logger; 4 5 import io.netty.buffer.ByteBuf; 6 import io.netty.buffer.Unpooled; 7 import io.netty.channel.ChannelHandlerAdapter; 8 import io.netty.channel.ChannelHandlerContext; 9 10 public class TimeClientHandler extends ChannelHandlerAdapter { 11 12 private static final Logger LOGGER = Logger.getLogger(TimeClientHandler.class.getName()); 13 14 private final ByteBuf firstMsg; 15 16 public TimeClientHandler() { 17 byte[] req = "QUERY TIME ORDER".getBytes(); 18 firstMsg = Unpooled.buffer(req.length); 19 firstMsg.writeBytes(req); 20 } 21 22 /** 23 * channel连通之后的处理 24 */ 25 @Override 26 public void channelActive(ChannelHandlerContext ctx) throws Exception { 27 ctx.writeAndFlush(firstMsg); 28 } 29 30 @Override 31 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 32 ByteBuf buf = (ByteBuf) msg; 33 byte[] resp = new byte[buf.readableBytes()]; 34 buf.readBytes(resp); 35 36 String body = new String(resp, "UTF-8"); 37 System.out.println("Now is:" + body); 38 39 // 两秒钟后继续向服务器端发送消息 40 Thread.sleep(2000); 41 byte[] req = "QUERY TIME ORDER".getBytes(); 42 ByteBuf sendMsg = Unpooled.buffer(req.length); 43 sendMsg.writeBytes(req); 44 ctx.writeAndFlush(sendMsg); 45 } 46 47 @Override 48 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 49 LOGGER.warning("Unexpected exception from downstream:" + cause.getMessage()); 50 ctx.close(); 51 } 52 53 54 }
The time server receive order:QUERY TIME ORDER
Now is:Wed Aug 03 05:55:30 PDT 2016
Now is:Wed Aug 03 05:55:33 PDT 2016
Now is:Wed Aug 03 05:55:35 PDT 2016
Now is:Wed Aug 03 05:55:37 PDT 2016