Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊

简介: Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊

1. 引入Netty依赖

<!--后端采用springboot项目,netty只需引入这一个依赖 -->
<!--netty依赖 -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>

2. 创建netty服务器

package com.cnpc.modules.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
/**
 * @author wuzhenyong
 * ClassName:NettyWebSocketServer.java
 * date:2022-05-05 8:48
 * Description: Netty服务器
 */
@Component
public class NettyWebSocketServer implements ApplicationListener<ContextRefreshedEvent> {
    @Autowired
    private WebSocketChannelInit webSocketChannelInit;
    /**
     * 容器初始化完成后调用
     *
     * @param contextRefreshedEvent
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        // 启动netty服务器
        this.init();
    }
    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private EventLoopGroup workerGroup = new NioEventLoopGroup();
    public void init() {
        try {
            //1.创建服务端启动助手
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //2.设置线程组
            serverBootstrap.group(bossGroup, workerGroup);
            //3.设置参数
            serverBootstrap.channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(webSocketChannelInit);
            //4.启动  绑定端口不能和服务端口一致
            ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
            System.out.println("--Netty服务端启动成功---");
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

3. 创建通道初始化对象

package com.cnpc.modules.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @author wuzhenyong
 * ClassName:WebSocketChannelInit.java
 * date:2022-05-05 8:53
 * Description: 通道初始化对象
 */
@Component
public class WebSocketChannelInit extends ChannelInitializer {
    @Autowired
    private WebSocketHandler webSocketHandler;
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        //对http协议的支持.
        pipeline.addLast(new HttpServerCodec());
        // 对大数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        //post请求分三部分. request line / request header / message body
        // HttpObjectAggregator将多个信息转化成单一的request或者response对象
        pipeline.addLast(new HttpObjectAggregator(8000));
        // 将http协议升级为ws协议. websocket的支持
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, false, true));
        // 自定义处理handler
        pipeline.addLast(webSocketHandler);
    }
}

4. 创建自定义处理类

package com.cnpc.modules.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.stereotype.Component;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * @author wuzhenyong
 * ClassName:WebSocketHandler.java
 * date:2022-05-05 8:54
 * Description: 自定义处理类
 */
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  /**
     * 管理channel的组,可以理解为channel的池 —— 客服使用
     */
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    /**
     * 用户通道管理
     */
    public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(16);
    /**
     * 通道绑定管理
     */
    public static ConcurrentHashMap<String, String> bindMap = new ConcurrentHashMap<>(16);
    public static List<Channel> channelList = new ArrayList<>();
    /**
     * 通道就绪事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        // channel.read();
        //当有新的客户端连接的时候, 将通道放入集合
        SocketAddress socketAddress = ctx.channel().remoteAddress();
        // 放入通道组
        channels.add(channel);
        System.out.println("有新的连接." + socketAddress);
    }
    /**
     * 用户事件触发 token校验
     *
     * @param ctx ctx
     * @param evt evt
     * @throws Exception 异常
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("触发事件");
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
            HttpHeaders httpHeaders = complete.requestHeaders();
            // 自行处理鉴权问题
            System.out.println("uri: " + uri);
            System.out.println("握手成功");
            channelMap.put(paramValue, ctx.channel());
        }
        super.userEventTriggered(ctx, evt);
    }
    /**
     * 收到消息事件
     *
     * @param ctx 通道处理程序上下文
     * @param textWebSocketFrame    文本框架网络套接字
     * @throws Exception 异常
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
        // 按照自己公司的逻辑进行处理消息
        System.out.println("通道: " + ctx.channel().remoteAddress() + "发送了消息");
        String msg = textWebSocketFrame.text();
        System.out.println("msg:" + msg);
        String[] params = msg.split(":");
        if (params[1].contains("yangmingquan")) {
            // 私发
            channelMap.get("yangmingquan").writeAndFlush(new TextWebSocketFrame(msg));
        }
        if (params[1].contains("wuzhenyong")) {
            // 私发
            channelMap.get("wuzhenyong").writeAndFlush(new TextWebSocketFrame(msg));
        }
        if (params[1].contains("all")) {
            // 群发
            channels.writeAndFlush(new TextWebSocketFrame(msg));
        }
    }
    /**
     * 通道未就绪--channel下线
     *
     * @param ctx ctx
     * @throws Exception 异常
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        SocketAddress socketAddress = ctx.channel().remoteAddress();
        System.out.println("通道:" + socketAddress + "已下线");
        //当有客户端断开连接的时候,就移除对应的通道
        channelList.remove(channel);
        ctx.close();
    }
    /**
     * 异常处理事件
     *
     * @param ctx   ctx
     * @param cause 导致
     * @throws Exception 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        Channel channel = ctx.channel();
        //移除集合
        channelList.remove(channel);
        ctx.close();
    }
}

5. 创建常量类

package com.cnpc.modules.netty;
/**
 * @author wuzhenyong
 * date:2022-05-05 8:35
 * Description:
 */
public final class WebSocketConstant {
    /**
     * websocket head参数key
     */
    public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";
}

6. 前端js

//1.创建websocket客户端
    var wsServer = 'ws://ip/';
    var limitConnect = 3;  // 断线重连次数
    var timeConnect = 0;
    function websocket() {
        //这里需要注意的是,prompt有两个参数,前面是提示的话,后面是当对话框出来后,在对话框里的默认值
        var username = prompt("请输入您的名字", ""); //将输入的内容赋给变量 name ,
        //建立WebSocket通讯
        //注意:如果你要兼容ie8+,建议你采用 socket.io 的版本。下面是以原生WS为例
        wsServer = 'ws://127.0.0.1:9090/ws?username=' + username;
        var socket = new WebSocket(wsServer);
        //连接成功时触发
        socket.onopen = function() {
        };
        //收到的消息事件 按自己需求处理
        socket.onmessage = function(res) {
            //res为接受到的值,如 {"emit": "messageName", "data": {}}
            //emit即为发出的事件名,用于区分不同的消息
            console.log('接收到消息:', res)
        } ;
        socket.onclose = function() {
            reconnect();
        };
        // 另外还有onclose、onerror,分别是在链接关闭和出错时触发
    }
    // 重连
    function reconnect() {
        // lockReconnect加锁,防止onclose、onerror两次重连
        if (limitConnect > 0) {
            limitConnect--;
            timeConnect++;
            console.log("第" + timeConnect + "次重连");
            // 进行重连
            setTimeout(function() {
                websocket();
            },2000);
        } else {
            console.log("TCP连接已超时");
        }
    }

7. 以上就可以使用websocket的方式进行聊天了

8. 遇到的问题

  • websocket路径传参问题 通道初始化对象 WebSocketChannelInit类
// 将http协议升级为ws协议. websocket的支持
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, false, true));
// 最后一个参数设置为true则连接websocket可以进行路径传参,否则传参连接不成功

websocket路径参数获取问题 自定义处理器 WebSocketHandler类

// 重写userEventTriggered方法  用户连接触发事件
/**
 * 用户事件触发 token校验
 *
 * @param ctx ctx
 * @param evt evt
 * @throws Exception 异常
 */
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    System.out.println("触发事件");
    if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
        WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
        // 获取header信息
        HttpHeaders httpHeaders = complete.requestHeaders();
        // 自行处理鉴权问题
        System.out.println("uri: " + uri);
        System.out.println("握手成功");
        channelMap.put(paramValue, ctx.channel());
    }
    super.userEventTriggered(ctx, evt);
}

启动失败问题 Netty服务器 NettyWebSocketServer类

//init方法更换为一下代码,删除异常捕获关闭事件
public void init() throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    //1.创建服务端启动助手
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    //2.设置线程组
    serverBootstrap.group(bossGroup, workerGroup);
    //3.设置参数
    serverBootstrap.channel(NioServerSocketChannel.class)
        .handler(new LoggingHandler(LogLevel.DEBUG))
        .childHandler(webSocketChannelInit);
    //4.启动  绑定端口不能和服务端口一致
    ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
    System.out.println("--Netty服务端启动成功---");
}

js websocket传入header参数

var ws = new WebSocket("地址", ['header参数信息']);
相关文章
|
4月前
|
NoSQL 前端开发 Java
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
|
3月前
|
编解码 分布式计算 网络协议
Netty高性能网络框架(一)
Netty高性能网络框架(一)
|
4月前
|
设计模式
Lettuce的特性和内部实现问题之Netty NIO的性能优于BIO的问题如何解决
Lettuce的特性和内部实现问题之Netty NIO的性能优于BIO的问题如何解决
|
29天前
|
前端开发 JavaScript API
前端:事件循环/异步
前端开发中的事件循环和异步处理是核心机制,用于管理任务执行、性能优化及响应用户操作,确保网页流畅运行。事件循环负责调度任务,而异步则通过回调、Promise等实现非阻塞操作。
|
2月前
|
JavaScript 前端开发 测试技术
前端全栈之路Deno篇(五):如何快速创建 WebSocket 服务端应用 + 客户端应用 - 可能是2025最佳的Websocket全栈实时应用框架
本文介绍了如何使用Deno 2.0快速构建WebSocket全栈应用,包括服务端和客户端的创建。通过一个简单的代码示例,展示了Deno在WebSocket实现中的便捷与强大,无需额外依赖,即可轻松搭建具备基本功能的WebSocket应用。Deno 2.0被认为是最佳的WebSocket全栈应用JS运行时,适合全栈开发者学习和使用。
139 7
|
2月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
72 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
2月前
|
设计模式 前端开发 JavaScript
前端编程的异步解决方案有哪些?
本文首发于微信公众号“前端徐徐”,介绍了异步编程的背景和几种常见方案,包括回调、事件监听、发布订阅、Promise、Generator、async/await和响应式编程。每种方案都有详细的例子和优缺点分析,帮助开发者根据具体需求选择最合适的异步编程方式。
92 1
|
3月前
|
Java
Netty BIO/NIO/AIO介绍
Netty BIO/NIO/AIO介绍
|
3月前
|
设计模式 缓存 算法
Netty框架的重要性
Netty框架的重要性
|
4月前
|
消息中间件 存储 监控
Django后端架构开发:Celery异步调优,任务队列和调度
Django后端架构开发:Celery异步调优,任务队列和调度
81 1