Netty 是一个 Java NIO 客户端服务器框架,使用它可以快速简单地开发网络应用程序,比如服务器和客户端的协议。Netty 大大简化了网络程序的开发过程比如 TCP 和 UDP 的 socket 服务的开发。
本文旨在通过实例学习Netty的一些用法。
【1】Netty 实现聊天功能
①SimpleChatServerHandler 服务端处理器
handler 是由 Netty 生成用来处理 I/O 事件的,SimpleChatServerHandler实例如下:
package com.nett.chat.handler; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; /** * Created by Janus on 2018/11/20. */ public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1) public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2) Channel incoming = ctx.channel(); for (Channel channel : channels) { channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n"); } channels.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3) Channel incoming = ctx.channel(); for (Channel channel : channels) { channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n"); } channels.remove(ctx.channel()); } @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4) Channel incoming = ctx.channel(); for (Channel channel : channels) { if (channel != incoming){ channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n"); } else { channel.writeAndFlush("[you]" + s + "\n"); } } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5) Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在线"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6) Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉线"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (7) Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"异常"); // 当出现异常就关闭连接 cause.printStackTrace(); ctx.close(); } }
① SimpleChatServerHandler 继承自SimpleChannelInboundHandler,这个类实现了ChannelInboundHandler接口,ChannelInboundHandler 提供了许多事件处理的接口方法,然后你可以覆盖这些方法。现在仅仅只需要继承 SimpleChannelInboundHandler 类而不是你自己去实现接口方法。
SimpleChannelInboundHandler继承示意图如下(ctrl+shift+alt+u IDEA下查看):
② 覆盖了 handlerAdded() 事件处理方法。每当从服务端收到新的客户端连接时,客户端的 Channel 存入ChannelGroup列表中,并通知列表中的其他客户端 Channel。
③ 覆盖了 handlerRemoved() 事件处理方法。每当从服务端收到客户端断开时,客户端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客户端 Channel。
④ 覆盖了 channelRead0() 事件处理方法。每当从服务端读到客户端写入信息时,将信息转发给其他客户端的 Channel。其中如果你使用的是 Netty 5.x 版本时,需要把 channelRead0() 重命名为messageReceived()。
⑤ 覆盖了 channelActive() 事件处理方法。服务端监听到客户端活动。
⑥ 覆盖了 channelInactive() 事件处理方法。服务端监听到客户端不活动。
⑦ exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的 channel 给关闭掉。然而这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
② SimpleChatServerInitializer
SimpleChatServerInitializer 用来增加多个的处理类到 ChannelPipeline 上,包括编码、解码、SimpleChatServerHandler 等。
实例如下:
public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleChatServerHandler()); System.out.println("SimpleChatClient:"+ch.remoteAddress() +"连接上"); } }
③SimpleChatServer
编写一个 main() 方法来启动服务端。
SimpleChatServer实例如下:
import com.nett.chat.handler.SimpleChatServerInitializer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Created by Janus on 2018/11/20. */ public class SimpleChatServer { private int port; public SimpleChatServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new SimpleChatServerInitializer()) //(4) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) System.out.println("SimpleChatServer 启动了"); // 绑定端口,开始接收进来的连接 ChannelFuture f = b.bind(port).sync(); // (7) // 等待服务器 socket 关闭 。 // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); System.out.println("SimpleChatServer 关闭了"); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new SimpleChatServer(port).run(); } }
④ SimpleChatClientHandler客户端处理器
客户端的处理类比较简单,只需要将读到的信息打印出来即可。
public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { Channel channel = ctx.channel(); System.out.println(channel+" , "+s); } }
⑤ SimpleChatClientInitializer
与服务端类似,不再赘述。
实例如下:
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * Created by Janus on 2018/11/20. */ public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleChatClientHandler()); } }
⑥ SimpleChatClient
编写一个 main() 方法来启动客户端。
实例如下:
import com.nett.chat.handler.SimpleChatClientInitializer; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.io.BufferedReader; import java.io.InputStreamReader; /** * Created by Janus on 2018/11/20. */ public class SimpleChatClient { public static void main(String[] args) throws Exception{ new SimpleChatClient("localhost", 8080).run(); } private final String host; private final int port; public SimpleChatClient(String host, int port){ this.host = host; this.port = port; } public void run() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new SimpleChatClientInitializer()); //这里使用connect instead of bind Channel channel = bootstrap.connect(host, port).sync().channel(); //从控制台读取数据 BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); //循环将读取的数据发送 while (true) { channel.writeAndFlush(in.readLine() + "\r\n"); } } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
⑦ 运行效果
先运行 SimpleChatServer,再可以运行多个 SimpleChatClient,控制台输入文本继续测试。
- 分别启动两个client,服务端显示如下:
- 当第二个客户端加入时,第一个客户端收到服务端发来消息如下:
当第二个客户端输入字符串时,第二个客户端接收到服务器发来的消息如下:
- 当第二个客户端输入字符串时,第一个客户端接收到服务器的消息如下:
当第二个客户端关闭时,第一个客户端收到消息如下:
- 当第二个客户端关闭时,服务器端打印如下:
【2】Netty 实现 WebSocket 聊天功能
上面我们用Netty快速实现了一个 Java 聊天程序。现在,我们
要做下修改,加入 WebSocket 的支持,使它可以在浏览器里进行文本聊天。
① WebSocket
WebSocket 通过“Upgrade handshake(升级握手)”从标准的 HTTP 或HTTPS 协议转为 WebSocket。因此,使用 WebSocket 的应用程序将始终以 HTTP/S 开始,然后进行升级。在什么时候发生这种情况取决于具体的应用。它可以是在启动时,或当一个特定的 URL 被请求时。
在我们的应用中,当 URL 请求以“/ws”结束时,我们才升级协议为WebSocket。否则,服务器将使用基本的HTTP/S。一旦升级连接将使用WebSocket 传输所有数据。
整个服务器逻辑如下:
1.客户端/用户连接到服务器并加入聊天
2.HTTP 请求页面或 WebSocket 升级握手
3.服务器处理所有客户端/用户
4.响应 URI “/”的请求,转到默认 html 页面
5.如果访问的是 URI“/ws” ,处理 WebSocket 升级握手
6.升级握手完成后 ,通过 WebSocket 发送聊天消息
② HttpRequestHandler
实例如下:
import io.netty.channel.*; import io.netty.handler.codec.http.*; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedNioFile; import java.io.File; import java.io.RandomAccessFile; import java.net.URISyntaxException; import java.net.URL; /** * Created by Janus on 2018/11/20. */ public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { //1 private final String wsUri; private static final File INDEX; static { URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation(); try { String path = location.toURI() + "WebsocketChatClient.html"; path = !path.contains("file:") ? path : path.substring(5); INDEX = new File(path); } catch (URISyntaxException e) { throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e); } } public HttpRequestHandler(String wsUri) { this.wsUri = wsUri; } @Override public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { if (wsUri.equalsIgnoreCase(request.getUri())) { ctx.fireChannelRead(request.retain()); //2 } else { if (HttpHeaders.is100ContinueExpected(request)) { send100Continue(ctx); //3 } RandomAccessFile file = new RandomAccessFile(INDEX, "r");//4 HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8"); boolean keepAlive = HttpHeaders.isKeepAlive(request); if (keepAlive) { //5 response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length()); response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); } ctx.write(response); //6 if (ctx.pipeline().get(SslHandler.class) == null) { //7 ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length())); } else { ctx.write(new ChunkedNioFile(file.getChannel())); } ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); //8 if (!keepAlive) { future.addListener(ChannelFutureListener.CLOSE); //9 } file.close(); } } private static void send100Continue(ChannelHandlerContext ctx) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); System.out.println("Client:" + incoming.remoteAddress() + "异常"); // 当出现异常就关闭连接 cause.printStackTrace(); ctx.close(); } }
标注说明如下:
1.扩展 SimpleChannelInboundHandler 用于处理 FullHttpRequest信息
2.如果请求是 WebSocket 升级,递增引用计数器(保留)并且将它传递给在 ChannelPipeline 中的下个 ChannelInboundHandler
3.处理符合 HTTP 1.1的 “100 Continue” 请求
4.读取默认的 WebsocketChatClient.html 页面
5.判断 keepalive 是否在请求头里面
6.给客户端响应
7.写 index.html 到客户端,判断 SslHandler 是否在 ChannelPipeline 来决定是使用 DefaultFileRegion 还是ChunkedNioFile
8.写并刷新 LastHttpContent 到客户端,标记响应完成
9.如果 keepalive 为false,当写完成时,关闭 Channel
HttpRequestHandler 做了下面几件事,
如果该 HTTP 请求被发送到URI “/ws”,调用 FullHttpRequest 上的 retain(),并通过调用 fireChannelRead(msg) 转发到下一个 ChannelInboundHandler。retain() 是必要的,因为 channelRead() 完成后,它会调用 FullHttpRequest 上的 release() 来释放其资源。
如果客户端发送的 HTTP 1.1 头是“Expect: 100-continue” ,将发送“100 Continue”的响应。
在响应头被设置后,写一个 HttpResponse 返回给客户端。注意,这是不是 FullHttpResponse,唯一的反应的第一部分。此外,我们不使用writeAndFlush() 在这里—这个是在最后完成。
如果没有加密也不压缩,要达到最大的效率可以是通过存储 index.html 的内容在一个 DefaultFileRegion实现—这将利用零拷贝来执行传输。出于这个原因,我们检查,看看是否有一个 SslHandler 在 ChannelPipeline 中来判断是否使用 ChunkedNioFile。
写 LastHttpContent 来标记响应的结束,并终止它
如果不要求 keepalive ,添加 ChannelFutureListener 到 ChannelFuture 对象的最后写入,并关闭连接。注意,这里我们调用 writeAndFlush() 来刷新所有以前写的信息。
③ 处理 WebSocket frame
WebSockets 在“帧”里面来发送数据,其中每一个都代表了一个消息的一部分。一个完整的消息利用了多个帧。
WebSocket “Request for Comments” (RFC) 定义了六中不同的 frame。 Netty 给他们每个都提供了一个 POJO 实现 ,而我们的程序只需要使用下面4个帧类型:
CloseWebSocketFrame
PingWebSocketFrame
PongWebSocketFrame
TextWebSocketFrame
另外两种为BinaryWebSocketFrame和ContinuationWebSocketFrame。
在这里我们只需要显示处理 TextWebSocketFrame,其他的会由 WebSocketServerProtocolHandler 自动处理。
下面代码展示了 ChannelInboundHandler 处理 TextWebSocketFrame,同时也将跟踪在 ChannelGroup中所有活动的 WebSocket 连接。
TextWebSocketFrameHandler实例如下:
import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; /** * Created by Janus on 2018/11/20. */ public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // (1) Channel incoming = ctx.channel(); for (Channel channel : channels) { if (channel != incoming) { channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text())); } else { channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text())); } } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2) Channel incoming = ctx.channel(); for (Channel channel : channels) { channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入")); } channels.add(ctx.channel()); System.out.println("Client:" + incoming.remoteAddress() + "加入"); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3) Channel incoming = ctx.channel(); for (Channel channel : channels) { channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 离开")); } System.out.println("Client:" + incoming.remoteAddress() + "离开"); channels.remove(ctx.channel()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5) Channel incoming = ctx.channel(); System.out.println("Client:" + incoming.remoteAddress() + "在线"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6) Channel incoming = ctx.channel(); System.out.println("Client:" + incoming.remoteAddress() + "掉线"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); System.out.println("Client:" + incoming.remoteAddress() + "异常"); // 当出现异常就关闭连接 cause.printStackTrace(); ctx.close(); } }
数字标注说明参考第【1】部分,这里同样不再赘述。
TextWebSocketFrameHandler 仅作了几件事:
当WebSocket 与新客户端已成功握手完成,通过写入信息到 ChannelGroup 中的 Channel 来通知所有连接的客户端,然后添加新 Channel 到 ChannelGroup
如果接收到 TextWebSocketFrame,调用 retain() ,并将其写、刷新到 ChannelGroup,使所有连接的WebSocket Channel 都能接收到它。和以前一样,retain() 是必需的,因为当 channelRead0()返回时,TextWebSocketFrame 的引用计数将递减。由于所有操作都是异步的,writeAndFlush() 可能会在以后完成,我们不希望它来访问无效的引用。
④ WebsocketChatServerInitializer
由于 Netty 处理了其余大部分功能,唯一剩下的我们现在要做的是初始化 ChannelPipeline 给每一个创建的新的Channel 。做到这一点,我们需要一个ChannelInitializer。
public class WebsocketChatServerInitializer extends ChannelInitializer<SocketChannel> { //1扩展 ChannelInitializer @Override public void initChannel(SocketChannel ch) throws Exception { //2添加 ChannelHandler 到 ChannelPipeline ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(64*1024)); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpRequestHandler("/ws")); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(new TextWebSocketFrameHandler()); } }
initChannel() 方法设置 ChannelPipeline 中所有新注册的 Channel,安装所有需要的ChannelHandler。
⑤ WebsocketChatServer
编写一个 main() 方法来启动服务端。
实例如下:
import com.netty.chat.handler.WebsocketChatServerInitializer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Created by Janus on 2018/11/20. */ public class WebsocketChatServer { private int port; public WebsocketChatServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new WebsocketChatServerInitializer()) //(4) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) System.out.println("WebsocketChatServer 启动了"); // 绑定端口,开始接收进来的连接 ChannelFuture f = b.bind(port).sync(); // (7) // 等待服务器 socket 关闭 。 // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); System.out.println("WebsocketChatServer 关闭了"); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new WebsocketChatServer(port).run(); } }
⑥ WebsocketChatClient.html
在程序的 resources/templates 目录下,我们创建一个 WebsocketChatClient.html 页面来作为客户端。
实例如下:
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>WebSocket Chat</title> </head> <body> <script type="text/javascript"> var socket; if (!window.WebSocket) { window.WebSocket = window.MozWebSocket; } if (window.WebSocket) { socket = new WebSocket("ws://localhost:8080/ws"); socket.onmessage = function(event) { var ta = document.getElementById('responseText'); ta.value = ta.value + '\n' + event.data }; socket.onopen = function(event) { var ta = document.getElementById('responseText'); ta.value = "连接开启!"; }; socket.onclose = function(event) { var ta = document.getElementById('responseText'); ta.value = ta.value + "连接被关闭"; }; } else { alert("你的浏览器不支持 WebSocket!"); } function send(message) { if (!window.WebSocket) { return; } if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("连接没有开启."); } } </script> <form onsubmit="return false;"> <h3>WebSocket 聊天室:</h3> <textarea id="responseText" style="width: 500px; height: 300px;"></textarea> <br> <input type="text" name="message" style="width: 300px" value="Welcome !"> <input type="button" value="发送消息" onclick="send(this.form.message.value)"> <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录"> </form> <br> <br> </body> </html>
⑦ 运行效果
先运行 WebsocketChatServer,再打开多个浏览器页面实现多个 客户端访问 http://localhost:8080