一、前言
1.什么是netty?
高性能,事件驱动,异步非堵塞
基于nio的客户端,服务端编程框架(nio的框架)
稳定性和伸缩性
2.netty的使用场景。
高性能领域
多线程并发领域
异步通信领域
3.学习目录
io通信
netty入门
websocket入门
netty实现websocket通信案例
二.java io通信
客户端个数:bio(1:1) 伪异步io(m:n) nio(m:1) aio(m:0)
io类型:bio(阻塞同步io) 伪异步io(阻塞同步io) nio(非阻塞同步io) aio(非阻塞异步io)
api使用难度:bio(简单) 伪异步io(简单) nio(复杂) aio(复杂,但比nio简单)
调试难度:同上
可靠性:bio(差) 伪异步io(差) nio(好) aio(好)
吞吐量:同上
三.netty入门
1.原生nio的缺陷。类库和api复杂;入门门槛高;工作量和难度大;jdk nio存在bug
2.netty的优势。api简单;入门门槛低;性能高;成熟、稳定。
四.websocket入门
1.什么是websocket?
h5协议规范;握手机制;解决客户端与服务端实时通信而产生的技术。
2.websocket的优点?
节省通信开销;服务器主动传送数据给客户端;实时通信;
3.websocket建立连接。
客户端发起握手请求;服务端响应请求;连接建立。
4.websocket生命周期。
打开事件;消息事件;错误事件;关闭事件。
5.websocket关闭连接。
服务端关闭底层tcp连接;客户端发起tcp close。
五.netty实现websocket通信案例。
1.功能介绍
netty开发服务端;html实现客户端;实现服务端和客户端的实时交互。
2.代码实现
2.1存储工厂的全局配置
package com.websocket.netty;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
/** * @author lilinshen * @title 存储工厂的全局配置 * @description 请填写相关描述 * @date 2018/5/23 10:32 */ public class NettyConfig {
/** * 储存每一个客户端进来时的 channel 对象 */ public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
2.2处理/接收/响应客户端websocket请求的核心业务处理类
package com.websocket.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import java.util.Date;
/** * @author lilinshen * @title 处理 / 接收 / 响应客户端 websocket 请求的核心业务处理类 * @description 请填写相关描述 * @date 2018/5/23 10:36 */ public class MyWebSocketHandler extends SimpleChannelInboundHandler<Object> {
private WebSocketServerHandshaker handshaker;
private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket" ;
/** * 客户端与服务端连接的时候调用 * * @param ctx * @throws Exception */ @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.channelGroup.add(ctx.channel());
System.out.println( " 客户端与服务端连接开启 ..." );
}
/** * 客户端与服务端断开连接的时候调用 * * @param ctx * @throws Exception */ @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.channelGroup.remove(ctx.channel());
System.out.println( " 客户端与服务端连接关闭 ..." );
}
/** * 服务端接收客户端发送过来的数据结束之后调用 * * @param ctx * @throws Exception */ @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
/** * 工程出现异常的时候调用 * * @param ctx * @param cause * @throws Exception */ @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/** * 服务端处理客户端 websocket 请求的核心方法 * * @param channelHandlerContext * @param o * @throws Exception */ @Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
if (o instanceof FullHttpRequest) {
// 处理客户端向服务端发起 http 握手请求的业务 handHttpRequest(channelHandlerContext, (FullHttpRequest) o);
} else if (o instanceof WebSocketFrame) {
// 处理 websocket 连接业务 handWebsocketFrame(channelHandlerContext, (WebSocketFrame) o);
}
}
/** * 处理客户端向服务端发起 http 握手请求的业务 * * @param ctx * @param request */ private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
if (!request.getDecoderResult().isSuccess() || !( "websocket" ).equals(request.headers().get( "Upgrade" ))) {
sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);
handshaker = wsFactory.newHandshaker(request);
if (null == handshaker) {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), request);
}
}
/** * 服务端向客户端响应消息 * * @param ctx * @param request * @param response */ private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, DefaultFullHttpResponse response) {
if (response.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8);
response.content().writeBytes(buf);
buf.release();
}
// 服务端向客户端发送数据 ChannelFuture channelFuture = ctx.channel().writeAndFlush(response);
if (response.getStatus().code() != 200) {
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
}
/** * 处理客户端与服务端之间的 websocket 业务 * * @param ctx * @param frame */ private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 判断是否是关闭 websocket 的指令 if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());
}
// 判断是否是 ping 的消息 if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
// 判断是否是二进制消息,如果是二进制消息,抛出异常 if (!(frame instanceof TextWebSocketFrame)) {
System.out.println( " 目前我们不支持二进制消息 ..." );
throw new RuntimeException( " 【 " + this.getClass().getName() + " 】不支持消息 ..." );
}
// 返回应答消息 // 获取客户端向服务端发送的消息 String request = ((TextWebSocketFrame) frame).text();
System.out.println( "===>>>" + request);
TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + "===>>>" + request);
// 群发,服务端向每个连接上来的客户端发消息 NettyConfig.channelGroup.writeAndFlush(tws);
}
}
3.初始化连接时的各个组件
package com.websocket.netty;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
/** * @author lilinshen * @title 初始化连接时的各个组件 * @description 请填写相关描述 * @date 2018/5/23 11:12 */ public class MyWebSocketChannelHander extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast( "http-codec" , new HttpServerCodec());
socketChannel.pipeline().addLast( "aggregator" , new HttpObjectAggregator(65536));
socketChannel.pipeline().addLast( "http-chunked" , new ChunkedWriteHandler());
socketChannel.pipeline().addLast( "handler" , new MyWebSocketHandler());
}
}
4.程序的入口,负责启动应用
package com.websocket.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/** * @author lilinshen * @title 程序的入口,负责启动应用 * @description 请填写相关描述 * @date 2018/5/23 11:17 */ public class Main {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new MyWebSocketChannelHander());
System.out.println( " 服务端开启等待客户端连接 ..." );
Channel channel = bootstrap.bind(8888).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅的退出程序 bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
5.websocket.html客户端代码。
<html>
<head>
<meta http-equiv= "Content-Type" content= "text/html;charset=utf-8" />
<title>websocket客户端</title>
<script type= "text/javascript" >
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket( "ws://localhost:8888/websocket" );
socket.onmessage = function (ev) {
var ta = document.getElementById( "responseContent" );
ta.value += ev.data + " \r\n " ;
}
socket.onopen = function (ev) {
var ta = document.getElementById( "responseContent" );
ta.value += " 您当前的浏览器支持 websocket ,请进行后续操作 \r\n " ;
}
socket.onclose = function (ev) {
var ta = document.getElementById( "responseContent" );
ta.value = "" ;
ta.value = "websocket 连接已经关闭 \r\n " ;
}
} else {
alert( " 您的浏览器不支持 websocket" );
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert( "websocket 连接没有建立成功 " );
}
}
</script>
</head>
<body>
<form onsubmit= " return false; " >
<input type= "text" name= "message" value= "" />
<br/><br/>
<input onclick= " send(this.form.message.value) " type= "button" value= " 发送 websocket 请求消息 " />
<hr color= "red" />
<h2>客户端接收到服务端返回的应答消息</h2>
<textarea id= "responseContent" style= " width:1024px;height:300px; " ></textarea>
</form>
</body>
</html>
6.启动。
1.main.java类是程序的入口,负责启动应用。\
2.将websocket.html在浏览器中打开,就可以建立一个websocket连接。