之前进行了一个netty的介绍和服务端的示例Netty入门指引,今天来看下客户端的编写方式和示例情况:
基于我们之前编写的服务,测试它的最简单方法是使用telnet命令。例如,可以在命令行中输入telnet localhost 8080并输入一些内容。
但是,由于之前编写的是discard服务,根本不会得到任何回应。为了证明它确实有效,可以修改服务器以打印其收到的内容。按之前提到的,每当接收到数据时都会调用channelRead()方法。可以将一些代码放入DiscardServerHandler的channelRead()方法中:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { // (1) System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); // (2) } }
再次运行telnet命令,可以看到服务器打印收到的内容。
编写一个Echo服务器
到目前为止,我们一直在使用数据而没有任何响应。但是,通常应假定服务器对请求作出响应。可以通过实现ECHO协议将响应消息写到客户端,在该协议中,任何接收到的数据都将被发回。
与前面实现的discard服务器的唯一区别在于,它会将接收到的数据发回,而不是将接收到的数据打印到控制台。 因此,再次修改channelRead()方法:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); // (1) ctx.flush(); // (2) }
- ChannelHandlerContext对象提供了各种操作,能够触发各种I / O事件和操作。在这里,我们调用write(Object)以逐字记录接收到的消息。
- ctx.write(Object)不会将消息写出。在内部对其进行缓冲,然后通过ctx.flush()刷新。或者可以为简洁起见调用ctx.writeAndFlush(msg)。
如果再次运行telnet命令,将看到服务器将发送回的内容发送回去。
编写Time服务器
这个示例要实现的协议是TIME协议。它与前面的示例不同之处在于,它不包含任何请求就发送包含32位整数的消息,并在发送消息后关闭连接。在此示例中,将展示如何构造和发送消息以及如何在完成时关闭连接。
因为我们将忽略任何接收到的数据,在建立连接后立即发送消息,所以这次我们不能使用channelRead()方法。 相反,我们应该重写channelActive()方法。 以下是实现:
package io.netty.example.time; public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
- 当建立连接并准备生成流量时,将调用channelActive()方法。我们写一个代表该方法当前时间的32位整数。
- 要发送新消息,我们需要分配一个新缓冲区,其中将包含该消息。我们将要编写一个32位整数,因此我们需要一个容量至少为4个字节的ByteBuf。通过ChannelHandlerContext.alloc()获取当前的ByteBufAllocator并分配一个新的缓冲区。
- ChannelHandlerContext.write()(和writeAndFlush())方法返回ChannelFuture。ChannelFuture表示尚未发生的I / O操作。这意味着,可能尚未执行任何请求的操作,因为Netty中的所有操作都是异步的。例如,以下代码甚至可能在发送消息之前就关闭了连接:
Channel ch = ...; ch.writeAndFlush(message); ch.close();
因此,需要在ChannelFuture完成之后调用close()方法,该方法 由 write()方法返回,并在完成写操作后通知其侦听器。
- 当写请求完成时,为了接收到通知,创建了一个新的匿名ChannelFutureListener,当操作完成时它将关闭Channel。可以使用预定义的侦听器简化代码:
f.addListener(ChannelFutureListener.CLOSE);
编写Time客户端
Netty中服务器和客户端之间最大且唯一的区别是使用了不同的Bootstrap和Channel实现。
package io.netty.example.time; public class TimeClient { public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
- Bootstrap与ServerBootstrap相似,除了它用于非服务器channel,例如客户端或无连接channel。
- 如果仅指定一个EventLoopGroup,则它将同时用作boss组和worker组。但是,boss和worker并不用于客户端。
- 代替NioServerSocketChannel,NioSocketChannel被用来创建客户端channel。
- 请注意,由于客户端SocketChannel没有父级,因此此处不像使用ServerBootstrap那样使用childOption()。
- 应该调用connect()方法而不是bind()方法。
最后,从服务器接收一个32位整数,将其转换为人类可读的格式,打印转换后的时间,然后关闭连接的过程如下:
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }