1. 引入Netty依赖
<!--后端采用springboot项目,netty只需引入这一个依赖 --> <!--netty依赖 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency>
2. 创建netty服务器
package com.cnpc.modules.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; /** * @author wuzhenyong * ClassName:NettyWebSocketServer.java * date:2022-05-05 8:48 * Description: Netty服务器 */ @Component public class NettyWebSocketServer implements ApplicationListener<ContextRefreshedEvent> { @Autowired private WebSocketChannelInit webSocketChannelInit; /** * 容器初始化完成后调用 * * @param contextRefreshedEvent */ @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { // 启动netty服务器 this.init(); } private EventLoopGroup bossGroup = new NioEventLoopGroup(1); private EventLoopGroup workerGroup = new NioEventLoopGroup(); public void init() { try { //1.创建服务端启动助手 ServerBootstrap serverBootstrap = new ServerBootstrap(); //2.设置线程组 serverBootstrap.group(bossGroup, workerGroup); //3.设置参数 serverBootstrap.channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(webSocketChannelInit); //4.启动 绑定端口不能和服务端口一致 ChannelFuture channelFuture = serverBootstrap.bind(9090).sync(); System.out.println("--Netty服务端启动成功---"); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
3. 创建通道初始化对象
package com.cnpc.modules.netty; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author wuzhenyong * ClassName:WebSocketChannelInit.java * date:2022-05-05 8:53 * Description: 通道初始化对象 */ @Component public class WebSocketChannelInit extends ChannelInitializer { @Autowired private WebSocketHandler webSocketHandler; @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //对http协议的支持. pipeline.addLast(new HttpServerCodec()); // 对大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //post请求分三部分. request line / request header / message body // HttpObjectAggregator将多个信息转化成单一的request或者response对象 pipeline.addLast(new HttpObjectAggregator(8000)); // 将http协议升级为ws协议. websocket的支持 pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, false, true)); // 自定义处理handler pipeline.addLast(webSocketHandler); } }
4. 创建自定义处理类
package com.cnpc.modules.netty; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.util.concurrent.GlobalEventExecutor; import org.springframework.stereotype.Component; import java.net.SocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author wuzhenyong * ClassName:WebSocketHandler.java * date:2022-05-05 8:54 * Description: 自定义处理类 */ @Component @ChannelHandler.Sharable public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { /** * 管理channel的组,可以理解为channel的池 —— 客服使用 */ public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 用户通道管理 */ public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(16); /** * 通道绑定管理 */ public static ConcurrentHashMap<String, String> bindMap = new ConcurrentHashMap<>(16); public static List<Channel> channelList = new ArrayList<>(); /** * 通道就绪事件 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); // channel.read(); //当有新的客户端连接的时候, 将通道放入集合 SocketAddress socketAddress = ctx.channel().remoteAddress(); // 放入通道组 channels.add(channel); System.out.println("有新的连接." + socketAddress); } /** * 用户事件触发 token校验 * * @param ctx ctx * @param evt evt * @throws Exception 异常 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("触发事件"); if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt; HttpHeaders httpHeaders = complete.requestHeaders(); // 自行处理鉴权问题 System.out.println("uri: " + uri); System.out.println("握手成功"); channelMap.put(paramValue, ctx.channel()); } super.userEventTriggered(ctx, evt); } /** * 收到消息事件 * * @param ctx 通道处理程序上下文 * @param textWebSocketFrame 文本框架网络套接字 * @throws Exception 异常 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception { // 按照自己公司的逻辑进行处理消息 System.out.println("通道: " + ctx.channel().remoteAddress() + "发送了消息"); String msg = textWebSocketFrame.text(); System.out.println("msg:" + msg); String[] params = msg.split(":"); if (params[1].contains("yangmingquan")) { // 私发 channelMap.get("yangmingquan").writeAndFlush(new TextWebSocketFrame(msg)); } if (params[1].contains("wuzhenyong")) { // 私发 channelMap.get("wuzhenyong").writeAndFlush(new TextWebSocketFrame(msg)); } if (params[1].contains("all")) { // 群发 channels.writeAndFlush(new TextWebSocketFrame(msg)); } } /** * 通道未就绪--channel下线 * * @param ctx ctx * @throws Exception 异常 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); SocketAddress socketAddress = ctx.channel().remoteAddress(); System.out.println("通道:" + socketAddress + "已下线"); //当有客户端断开连接的时候,就移除对应的通道 channelList.remove(channel); ctx.close(); } /** * 异常处理事件 * * @param ctx ctx * @param cause 导致 * @throws Exception 异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); Channel channel = ctx.channel(); //移除集合 channelList.remove(channel); ctx.close(); } }
5. 创建常量类
package com.cnpc.modules.netty; /** * @author wuzhenyong * date:2022-05-05 8:35 * Description: */ public final class WebSocketConstant { /** * websocket head参数key */ public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol"; }
6. 前端js
//1.创建websocket客户端 var wsServer = 'ws://ip/'; var limitConnect = 3; // 断线重连次数 var timeConnect = 0; function websocket() { //这里需要注意的是,prompt有两个参数,前面是提示的话,后面是当对话框出来后,在对话框里的默认值 var username = prompt("请输入您的名字", ""); //将输入的内容赋给变量 name , //建立WebSocket通讯 //注意:如果你要兼容ie8+,建议你采用 socket.io 的版本。下面是以原生WS为例 wsServer = 'ws://127.0.0.1:9090/ws?username=' + username; var socket = new WebSocket(wsServer); //连接成功时触发 socket.onopen = function() { }; //收到的消息事件 按自己需求处理 socket.onmessage = function(res) { //res为接受到的值,如 {"emit": "messageName", "data": {}} //emit即为发出的事件名,用于区分不同的消息 console.log('接收到消息:', res) } ; socket.onclose = function() { reconnect(); }; // 另外还有onclose、onerror,分别是在链接关闭和出错时触发 } // 重连 function reconnect() { // lockReconnect加锁,防止onclose、onerror两次重连 if (limitConnect > 0) { limitConnect--; timeConnect++; console.log("第" + timeConnect + "次重连"); // 进行重连 setTimeout(function() { websocket(); },2000); } else { console.log("TCP连接已超时"); } }
7. 以上就可以使用websocket的方式进行聊天了
8. 遇到的问题
- websocket路径传参问题 通道初始化对象 WebSocketChannelInit类
// 将http协议升级为ws协议. websocket的支持 pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, false, true)); // 最后一个参数设置为true则连接websocket可以进行路径传参,否则传参连接不成功
websocket路径参数获取问题 自定义处理器 WebSocketHandler类
// 重写userEventTriggered方法 用户连接触发事件 /** * 用户事件触发 token校验 * * @param ctx ctx * @param evt evt * @throws Exception 异常 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("触发事件"); if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt; // 获取header信息 HttpHeaders httpHeaders = complete.requestHeaders(); // 自行处理鉴权问题 System.out.println("uri: " + uri); System.out.println("握手成功"); channelMap.put(paramValue, ctx.channel()); } super.userEventTriggered(ctx, evt); }
启动失败问题 Netty服务器 NettyWebSocketServer类
//init方法更换为一下代码,删除异常捕获关闭事件 public void init() throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //1.创建服务端启动助手 ServerBootstrap serverBootstrap = new ServerBootstrap(); //2.设置线程组 serverBootstrap.group(bossGroup, workerGroup); //3.设置参数 serverBootstrap.channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(webSocketChannelInit); //4.启动 绑定端口不能和服务端口一致 ChannelFuture channelFuture = serverBootstrap.bind(9090).sync(); System.out.println("--Netty服务端启动成功---"); }
js websocket传入header参数
var ws = new WebSocket("地址", ['header参数信息']);