Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊

简介: Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊

1. 引入Netty依赖

<!--后端采用springboot项目,netty只需引入这一个依赖 -->
<!--netty依赖 -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>

2. 创建netty服务器

package com.cnpc.modules.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
/**
 * @author wuzhenyong
 * ClassName:NettyWebSocketServer.java
 * date:2022-05-05 8:48
 * Description: Netty服务器
 */
@Component
public class NettyWebSocketServer implements ApplicationListener<ContextRefreshedEvent> {
    @Autowired
    private WebSocketChannelInit webSocketChannelInit;
    /**
     * 容器初始化完成后调用
     *
     * @param contextRefreshedEvent
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        // 启动netty服务器
        this.init();
    }
    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private EventLoopGroup workerGroup = new NioEventLoopGroup();
    public void init() {
        try {
            //1.创建服务端启动助手
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //2.设置线程组
            serverBootstrap.group(bossGroup, workerGroup);
            //3.设置参数
            serverBootstrap.channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(webSocketChannelInit);
            //4.启动  绑定端口不能和服务端口一致
            ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
            System.out.println("--Netty服务端启动成功---");
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

3. 创建通道初始化对象

package com.cnpc.modules.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @author wuzhenyong
 * ClassName:WebSocketChannelInit.java
 * date:2022-05-05 8:53
 * Description: 通道初始化对象
 */
@Component
public class WebSocketChannelInit extends ChannelInitializer {
    @Autowired
    private WebSocketHandler webSocketHandler;
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        //对http协议的支持.
        pipeline.addLast(new HttpServerCodec());
        // 对大数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        //post请求分三部分. request line / request header / message body
        // HttpObjectAggregator将多个信息转化成单一的request或者response对象
        pipeline.addLast(new HttpObjectAggregator(8000));
        // 将http协议升级为ws协议. websocket的支持
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, false, true));
        // 自定义处理handler
        pipeline.addLast(webSocketHandler);
    }
}

4. 创建自定义处理类

package com.cnpc.modules.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.stereotype.Component;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * @author wuzhenyong
 * ClassName:WebSocketHandler.java
 * date:2022-05-05 8:54
 * Description: 自定义处理类
 */
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  /**
     * 管理channel的组,可以理解为channel的池 —— 客服使用
     */
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    /**
     * 用户通道管理
     */
    public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(16);
    /**
     * 通道绑定管理
     */
    public static ConcurrentHashMap<String, String> bindMap = new ConcurrentHashMap<>(16);
    public static List<Channel> channelList = new ArrayList<>();
    /**
     * 通道就绪事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        // channel.read();
        //当有新的客户端连接的时候, 将通道放入集合
        SocketAddress socketAddress = ctx.channel().remoteAddress();
        // 放入通道组
        channels.add(channel);
        System.out.println("有新的连接." + socketAddress);
    }
    /**
     * 用户事件触发 token校验
     *
     * @param ctx ctx
     * @param evt evt
     * @throws Exception 异常
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("触发事件");
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
            HttpHeaders httpHeaders = complete.requestHeaders();
            // 自行处理鉴权问题
            System.out.println("uri: " + uri);
            System.out.println("握手成功");
            channelMap.put(paramValue, ctx.channel());
        }
        super.userEventTriggered(ctx, evt);
    }
    /**
     * 收到消息事件
     *
     * @param ctx 通道处理程序上下文
     * @param textWebSocketFrame    文本框架网络套接字
     * @throws Exception 异常
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
        // 按照自己公司的逻辑进行处理消息
        System.out.println("通道: " + ctx.channel().remoteAddress() + "发送了消息");
        String msg = textWebSocketFrame.text();
        System.out.println("msg:" + msg);
        String[] params = msg.split(":");
        if (params[1].contains("yangmingquan")) {
            // 私发
            channelMap.get("yangmingquan").writeAndFlush(new TextWebSocketFrame(msg));
        }
        if (params[1].contains("wuzhenyong")) {
            // 私发
            channelMap.get("wuzhenyong").writeAndFlush(new TextWebSocketFrame(msg));
        }
        if (params[1].contains("all")) {
            // 群发
            channels.writeAndFlush(new TextWebSocketFrame(msg));
        }
    }
    /**
     * 通道未就绪--channel下线
     *
     * @param ctx ctx
     * @throws Exception 异常
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        SocketAddress socketAddress = ctx.channel().remoteAddress();
        System.out.println("通道:" + socketAddress + "已下线");
        //当有客户端断开连接的时候,就移除对应的通道
        channelList.remove(channel);
        ctx.close();
    }
    /**
     * 异常处理事件
     *
     * @param ctx   ctx
     * @param cause 导致
     * @throws Exception 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        Channel channel = ctx.channel();
        //移除集合
        channelList.remove(channel);
        ctx.close();
    }
}

5. 创建常量类

package com.cnpc.modules.netty;
/**
 * @author wuzhenyong
 * date:2022-05-05 8:35
 * Description:
 */
public final class WebSocketConstant {
    /**
     * websocket head参数key
     */
    public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";
}

6. 前端js

//1.创建websocket客户端
    var wsServer = 'ws://ip/';
    var limitConnect = 3;  // 断线重连次数
    var timeConnect = 0;
    function websocket() {
        //这里需要注意的是,prompt有两个参数,前面是提示的话,后面是当对话框出来后,在对话框里的默认值
        var username = prompt("请输入您的名字", ""); //将输入的内容赋给变量 name ,
        //建立WebSocket通讯
        //注意:如果你要兼容ie8+,建议你采用 socket.io 的版本。下面是以原生WS为例
        wsServer = 'ws://127.0.0.1:9090/ws?username=' + username;
        var socket = new WebSocket(wsServer);
        //连接成功时触发
        socket.onopen = function() {
        };
        //收到的消息事件 按自己需求处理
        socket.onmessage = function(res) {
            //res为接受到的值,如 {"emit": "messageName", "data": {}}
            //emit即为发出的事件名,用于区分不同的消息
            console.log('接收到消息:', res)
        } ;
        socket.onclose = function() {
            reconnect();
        };
        // 另外还有onclose、onerror,分别是在链接关闭和出错时触发
    }
    // 重连
    function reconnect() {
        // lockReconnect加锁,防止onclose、onerror两次重连
        if (limitConnect > 0) {
            limitConnect--;
            timeConnect++;
            console.log("第" + timeConnect + "次重连");
            // 进行重连
            setTimeout(function() {
                websocket();
            },2000);
        } else {
            console.log("TCP连接已超时");
        }
    }

7. 以上就可以使用websocket的方式进行聊天了

8. 遇到的问题

  • websocket路径传参问题 通道初始化对象 WebSocketChannelInit类
// 将http协议升级为ws协议. websocket的支持
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, false, true));
// 最后一个参数设置为true则连接websocket可以进行路径传参,否则传参连接不成功

websocket路径参数获取问题 自定义处理器 WebSocketHandler类

// 重写userEventTriggered方法  用户连接触发事件
/**
 * 用户事件触发 token校验
 *
 * @param ctx ctx
 * @param evt evt
 * @throws Exception 异常
 */
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    System.out.println("触发事件");
    if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
        WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
        // 获取header信息
        HttpHeaders httpHeaders = complete.requestHeaders();
        // 自行处理鉴权问题
        System.out.println("uri: " + uri);
        System.out.println("握手成功");
        channelMap.put(paramValue, ctx.channel());
    }
    super.userEventTriggered(ctx, evt);
}

启动失败问题 Netty服务器 NettyWebSocketServer类

//init方法更换为一下代码,删除异常捕获关闭事件
public void init() throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    //1.创建服务端启动助手
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    //2.设置线程组
    serverBootstrap.group(bossGroup, workerGroup);
    //3.设置参数
    serverBootstrap.channel(NioServerSocketChannel.class)
        .handler(new LoggingHandler(LogLevel.DEBUG))
        .childHandler(webSocketChannelInit);
    //4.启动  绑定端口不能和服务端口一致
    ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
    System.out.println("--Netty服务端启动成功---");
}

js websocket传入header参数

var ws = new WebSocket("地址", ['header参数信息']);
相关文章
|
19天前
|
前端开发 JavaScript 关系型数据库
从前端到后端:构建现代化Web应用的技术探索
在当今互联网时代,Web应用的开发已成为了各行各业不可或缺的一部分。从前端到后端,这篇文章将带你深入探索如何构建现代化的Web应用。我们将介绍多种技术,包括前端开发、后端开发以及各种编程语言(如Java、Python、C、PHP、Go)和数据库,帮助你了解如何利用这些技术构建出高效、安全和可扩展的Web应用。
|
2天前
|
机器学习/深度学习 前端开发 JavaScript
探寻前端巨变:从HTML到现代框架的发展历程
探寻前端巨变:从HTML到现代框架的发展历程
11 2
|
2天前
|
前端开发 JavaScript Linux
relectron框架——打包前端vue3、react为pc端exe可执行程序
relectron框架——打包前端vue3、react为pc端exe可执行程序
8 1
|
3天前
|
前端开发 JavaScript Java
前端与后端:构建现代Web应用的双翼
前端与后端:构建现代Web应用的双翼
|
16天前
|
小程序 前端开发 JavaScript
小程序全栈开发:前端与后端的完美结合
【4月更文挑战第12天】本文介绍了小程序全栈开发,涵盖前端和后端的关键点。前端使用WXML和WXSS进行页面结构和样式设计,JavaScript处理逻辑及组件使用;后端采用Node.js等语言处理业务逻辑、数据库设计和API接口开发。前端与后端通过数据交互实现结合,采用前后端分离模式,支持跨平台运行。调试测试后,提交微信审核并上线运营。掌握前端后端结合是小程序成功的关键。
|
16天前
|
Web App开发 移动开发 运维
跨域解决方案[前端+后端]
跨域解决方案[前端+后端]
25 0
|
16天前
|
前端开发 JavaScript 搜索推荐
CSS框架是前端开发中不可或缺的工具
【4月更文挑战第12天】CSS框架是前端开发中不可或缺的工具
18 2
|
17天前
|
JavaScript 前端开发 API
游戏开发入门:Python后端与Vue前端的协同工作方式
【4月更文挑战第11天】使用Python后端(Flask或Django)和Vue.js前端开发游戏变得流行,能提高开发效率和可维护性。本文指导如何构建这样的项目,包括设置环境、创建虚拟环境、搭建后端API及前端Vue组件,强调前后端协作和API接口的重要性。这种架构促进团队合作,提升代码质量和游戏体验。
|
18天前
|
供应链 JavaScript 前端开发
使用Django和Vue实现电子商务网站的后端和前端
【4月更文挑战第10天】本文介绍了使用Django和Vue构建电子商务网站的后端与前端方法。Django作为Python的Web框架负责后端,其模型-视图-控制器设计简化了商品管理、购物车和订单处理。Vue.js用于前端,提供数据驱动和组件化的用户界面。通过定义Django模型和视图处理请求,结合Vue组件展示商品和管理购物车,开发者可构建交互性强的电商网站。虽然实际开发涉及更多细节,但本文为入门提供了基础指导。
|
8月前
|
Web App开发 前端开发 JavaScript
前端学习笔记202307学习笔记第五十七天-模拟面试笔记react-fiber解决了什么问题
前端学习笔记202307学习笔记第五十七天-模拟面试笔记react-fiber解决了什么问题
95 0