用Netty实现WebSocket网络聊天室

简介: 用Netty实现WebSocket网络聊天室

  最近学习Netty,学习到WebSocket一章节,将Netty中WebSocket的样例代码做了一些简单的改造,实现了一个简易的WebSocket网络聊天室,源码并非完全自己实现,只是将一些别人的代码做了下整合改造,今分享至此,希望对大家学习Netty有所帮助。

  首先介绍下什么是WebSocket,这就不得不先提到HTTP协议了。众所周知,在HTTP/2发布前,所有HTTP请求都是 请求-应答的 模式,这就意味着客户端只能向服务器要数据,然后服务端被动应答,而服务器无法主动将数据推送给客户端。这就导致一些高时效性的场景用HTTP就会有些问题,就拿实时聊天举例吧,客户端想知道近期有没有人说过话,就只能不断问服务器 有没有人发了消息? 有的话服务器就返回,没有就不返回,这种行为被称为轮询。 轮询的问题在于如果询问的时间间隔太长,消息的及时性无法得到保证,但如果时间太短,对服务器的压力就会大幅提升(因为不断要请求响应)。 有没有可能服务器有消息的时候,主动推送给客户端?

  WebSocket因此而诞生,它允许客户端和服务端之间在HTTP之上建立一个全双工的TCP长连接,这里的关键点在于全双工,意味着服务端也能通过这个连接给客户端发送即时消息,从而解决了轮询的性能和时效性矛盾的问题。了解过Socket编程的同学应该很容易理解了,WebSocket其实本质上就是Socket,只不过WebSocket是建立在HTTP协议之上的。

  回到我们的正题,如何用Netty+WebSocket写一个网络聊天室? 其实Netty里已经封装好了HTTP和WebSocket的实现,我们只需要实现部分聊天室的功能即可,接下来看下我实现的完整代码:

首先是ServerBootstrap的部分,这里是Netty的启动入口。

复制

@Service
public class WebSocketServer {
    static final String WEBSOCKET_PATH = "/ws";
    private ChannelFuture f;
    @Resource
    private WebSocketFrameHandler webSocketFrameHandler;
    @PostConstruct
    private void init() {
        bind(Constant.SOCKET_PORT);
    }
    public static voud bind(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new HttpServerCodec());  // netty中http协议的编解码
                            pipeline.addLast(new HttpObjectAggregator(65536));  
                            pipeline.addLast(new WebSocketServerCompressionHandler());
                            pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
                            pipeline.addLast(new WebSocketIndexPageHandler(WEBSOCKET_PATH));  // demo页面的handler,这个是非必须的,可以换成其他第三方的WebSocket客户端工具  
                            pipeline.addLast(webSocketFrameHandler); // 聊天室的主要逻辑
                        }
                    });
            Channel f = b.bind(port).sync().channel();
            f.closeFuture().sync();
        } catch (Exception e) {
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

  因为HttpServerCodec HttpObjectAggregator WebSocketServerCompressionHandler WebSocketServerProtocolHandler是Netty组件中提供的组件,其作用就是完成Http和WebSocket协议数据到Java对象的相互转换,这里就不再展开了,我们直接看下剩下的两个Handler。

  首先是WebSocketIndexPageHandler,这个也是我直接从Netty样例中Copy出来的,它的作用就是构建一个Http首页,这个首页实现了一个简单的WebSocket网页客户端,如果你不需要这个网页客户端,你也可直接删掉。

复制

public class WebSocketIndexPageHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private final String websocketPath;
    public WebSocketIndexPageHandler(String websocketPath) {
        this.websocketPath = websocketPath;
    }
    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        // Generate an error page if response getStatus code is not OK (200).
        HttpResponseStatus responseStatus = res.status();
        if (responseStatus.code() != 200) {
            ByteBufUtil.writeUtf8(res.content(), responseStatus.toString());
            HttpUtil.setContentLength(res, res.content().readableBytes());
        }
        // Send the response and close the connection if necessary.
        boolean keepAlive = HttpUtil.isKeepAlive(req) && responseStatus.code() == 200;
        HttpUtil.setKeepAlive(res, keepAlive);
        ChannelFuture future = ctx.writeAndFlush(res);
        if (!keepAlive) {
            future.addListener(ChannelFutureListener.CLOSE);
        }
    }
    private static String getWebSocketLocation(ChannelPipeline cp, HttpRequest req, String path) {
        String protocol = "ws";
        if (cp.get(SslHandler.class) != null) {
            // SSL in use so use Secure WebSockets
            protocol = "wss";
        }
        return protocol + "://" + req.headers().get(HttpHeaderNames.HOST) + path;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
        // Handle a bad request.
        if (!req.decoderResult().isSuccess()) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(req.protocolVersion(), BAD_REQUEST,
                                                                   ctx.alloc().buffer(0)));
            return;
        }
        // Allow only GET methods.
        if (!GET.equals(req.method())) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(req.protocolVersion(), FORBIDDEN,
                                                                   ctx.alloc().buffer(0)));
            return;
        }
        // Send the index page
        if ("/".equals(req.uri()) || "/index.html".equals(req.uri())) {
            String webSocketLocation = getWebSocketLocation(ctx.pipeline(), req, websocketPath);
            ByteBuf content = WebSocketServerIndexPage.getContent(webSocketLocation);
            FullHttpResponse res = new DefaultFullHttpResponse(req.protocolVersion(), OK, content);
            res.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");
            HttpUtil.setContentLength(res, content.readableBytes());
            sendHttpResponse(ctx, req, res);
        } else {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(req.protocolVersion(), NOT_FOUND,
                                                                   ctx.alloc().buffer(0)));
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

  上面这部分代码还依赖于一个静态的页面,我对这个页面做了简单的改造,方便大家愉快地一起聊天。

复制

public final class WebSocketServerIndexPage {
    public static ByteBuf getContent(String webSocketLocation) {
        return Unpooled.copiedBuffer("\n"
                                     + "<html><head><title>Web Socket Test</title></head>\n"
                                     + "<body>\n"
                                     + "<script type=\"text/javascript\">\n"
                                     + "var socket;\n"
                                     + "if (!window.WebSocket) {\n"
                                     + "  window.WebSocket = window.MozWebSocket;\n"
                                     + "}\n"
                                     + "if (window.WebSocket) {\n"
                                     + "  socket = new WebSocket(\"" + webSocketLocation + "\");\n"
                                     + "  socket.onmessage = function(event) {\n"
                                     + "    var ta = document.getElementById('responseText');\n"
                                     + "    ta.value = ta.value + '\\n' + event.data\n"
                                     + "    ta.scrollTop = ta.scrollHeight\n"
                                     + "  };\n"
                                     + "  socket.onopen = function(event) {\n"
                                     + "    var ta = document.getElementById('responseText');\n"
                                     + "    ta.value = \"Web Socket opened!\";\n"
                                     + "    ta.scrollTop = ta.scrollHeight\n"
                                     + "  };\n"
                                     + "  socket.onclose = function(event) {\n"
                                     + "    var ta = document.getElementById('responseText');\n"
                                     + "    ta.value = ta.value + \"Web Socket closed!\";\n"
                                     + "    ta.scrollTop = ta.scrollHeight\n"
                                     + "  };\n"
                                     + "} else {\n"
                                     + "  alert(\"Your browser does not support Web Socket.\");\n"
                                     + "}\n"
                                     + "\n"
                                     + "function send(message) {\n"
                                     + "  if (!window.WebSocket) { return; }\n"
                                     + "  if (socket.readyState == WebSocket.OPEN) {\n"
                                     + "    socket.send(message);\n"
                                     + "    document.getElementById('msgForm').value = ''\n"
                                     + "  } else {\n"
                                     + "    alert(\"The socket is not open.\");\n"
                                     + "  }\n"
                                     + "}\n"
                                     + "</script>\n"
                                     + "<textarea id=\"responseText\" style=\"width:500px;height:300px;\"></textarea>\n"
                                     + "<form onsubmit=\"return false;\">\n"
                                     + "<input type=\"text\" name=\"message\" id=\"msgForm\"/><input type=\"button\" value=\"Send\"\n"
                                     + "       onclick=\"send(this.form.message.value)\" />\n"
                                     + "</form>\n"
                                     + "</body>\n"
                                     + "</html>\n", CharsetUtil.US_ASCII);
    }
}

  改造后的页面效果长这样,虽然有些简陋,但还是可以收发消息。

  最核心的就是WebSocketFrameHandler这个类了,所有的逻辑都是在这里面的,其实也不复杂,就是在连接建立后,给这个连接分配一个随机名字,将某个人发的消息转发到其他已有的连接上,另外及时清理掉断开的连接,防止资源泄露,代码很简单,相信你一看就懂。

复制

@Service
@Slf4j
@ChannelHandler.Sharable
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
    // 我直接取了天龙八部里的名字,给每个聊天的人随机分配一个 
    private final List<String> names =
            List.of("刀白凤", "丁春秋", "马夫人", "马五德", "小翠", "于光豪", "巴天石", "不平道人", "邓百川", "风波恶",
                    "甘宝宝", "公冶乾", "木婉清", "少林老僧", "太皇太后", "天狼子", "天山童姥", "王语嫣", "乌老大",
                    "无崖子", "云岛主", "云中鹤", "止清", "白世镜", "包不同", "本参", "本观", "本相", "本因", "出尘子",
                    "冯阿三", "兰剑", "古笃诚", "过彦之", "平婆婆", "石清露", "石嫂", "司空玄", "司马林", "玄慈",
                    "玄寂", "玄苦", "玄难", "玄生", "玄痛", "叶二娘", "竹剑", "左子穆", "华赫艮", "乔峰", "李春来",
                    "李傀儡", "李秋水", "刘竹庄", "朴者和尚", "祁六三", "全冠清", "阮星竹", "西夏宫女", "许卓诚",
                    "朱丹臣", "努儿海", "阿碧", "阿洪", "阿胜", "阿朱", "阿紫", "波罗星", "陈孤雁", "鸠摩智", "来福儿",
                    "孟师叔", "宋长老", "苏星河", "苏辙", "完颜阿古打", "耶律洪基", "耶律莫哥", "耶律涅鲁古",
                    "耶律重元", "吴长风", "吴光胜", "吴领军", "辛双清", "严妈妈", "余婆婆", "岳老三", "张全祥",
                    "单伯山", "单季山", "单叔山", "单小山", "单正", "段延庆", "段誉", "段正淳", "段正明", "范禹",
                    "范百龄", "范骅", "苟读", "和里布", "何望海", "易大彪", "郁光标", "卓不凡", "宗赞王子", "哈大霸",
                    "姜师叔", "枯荣长老", "梦姑", "姚伯当", "神山上人", "神音", "狮鼻子", "室里", "项长老", "幽草",
                    "赵钱孙", "赵洵", "哲罗星", "钟灵", "钟万仇", "高升泰", "龚光杰", "贾老者", "康广陵", "秦红棉",
                    "虚竹", "容子矩", "桑土公", "唐光雄", "奚长老", "徐长老", "诸保昆", "崔百泉", "崔绿华", "符敏仪",
                    "黄眉和尚", "菊剑", "聋哑婆婆", "梅剑", "萧远山", "游骥", "游驹", "游坦之", "程青霜", "傅思归",
                    "葛光佩", "缘根", "智光大师", "鲍千灵", "褚万里", "瑞婆婆", "端木元", "黎夫人", "薛慕华", "慕容博",
                    "慕容复", "谭公", "谭婆", "谭青", "摘星子", "慧方", "慧观", "慧净", "慧真", "穆贵妃", "赫连铁树");
    // 名字到连接的映射
    private final Map<String, ChannelHandlerContext> name2ctx = new ConcurrentHashMap<>();
    // 连接到名字的映射
    private final Map<ChannelHandlerContext, String> ctx2name = new ConcurrentHashMap<>();
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // 先分配当前没有在使用中的名字
        Optional<String> nameOp = names.stream().filter(x -> !name2ctx.containsKey(x)).findFirst();
        if (!nameOp.isPresent()) {
            // 如果没分配到名字,直接断开连接,这么写的话,同时在线的人数取决于名字列表的大小
            ctx.writeAndFlush(new TextWebSocketFrame("当前连接人数过多,请稍后重试!"));
            log.info("当前连接人数过多,请稍后重试!");
            ctx.close();
            return;
        }
        String name = nameOp.get();
        name2ctx.put(name, ctx);
        ctx2name.put(ctx, name);
        broadcast(name + "加入了群聊!");
    }
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        String name = ctx2name.getOrDefault(ctx, "");
        name2ctx.remove(name);
        ctx2name.remove(ctx);
        broadcast(name + "离开了群聊!");
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        // 收到消息后将消息群发给所有人
        if (frame instanceof TextWebSocketFrame) {
            String request = ((TextWebSocketFrame) frame).text();
            String name = ctx2name.getOrDefault(ctx, "");
            broadcast(name + ":" + request);
        } else {
            String message = "unsupported frame type: " + frame.getClass().getName();
            throw new UnsupportedOperationException(message);
        }
    }
    /**
     * 将消息群发给所有在线的人
     */
    private void broadcast(String msg) {
        log.info("msg:{}", msg);
        name2ctx.entrySet().parallelStream().forEach(e -> {
            String name = e.getKey();
            ChannelHandlerContext ctx = e.getValue();
            if (ctx.channel().isActive()) {
                ctx.writeAndFlush(new TextWebSocketFrame(msg));
            } else {
                // 广播时清理掉不活跃的连接
                ctx2name.remove(ctx);
                name2ctx.remove(name);
            }
        });
    }
}

  这里特别提醒下,想实现群聊,那WebSocketFrameHandler必须标记为Sharable,并且全局共享一个对象,所以需要注意下线程安全的问题,这里我都用了ConcurrentHashMap。

  以上就是完整的代码了,有兴趣可以自己跑一跑,另外这个网络聊天室我已经部署的我的服务器上了,也可以直接点开体验下 http://xindoo.xyz:8083/


目录
相关文章
|
3月前
|
开发框架 前端开发 网络协议
Spring Boot结合Netty和WebSocket,实现后台向前端实时推送信息
【10月更文挑战第18天】 在现代互联网应用中,实时通信变得越来越重要。WebSocket作为一种在单个TCP连接上进行全双工通信的协议,为客户端和服务器之间的实时数据传输提供了一种高效的解决方案。Netty作为一个高性能、事件驱动的NIO框架,它基于Java NIO实现了异步和事件驱动的网络应用程序。Spring Boot是一个基于Spring框架的微服务开发框架,它提供了许多开箱即用的功能和简化配置的机制。本文将详细介绍如何使用Spring Boot集成Netty和WebSocket,实现后台向前端推送信息的功能。
699 1
|
4月前
|
编解码 分布式计算 网络协议
Netty高性能网络框架(一)
Netty高性能网络框架(一)
|
19天前
|
JSON 算法 Java
Nettyの网络聊天室&扩展序列化算法
通过本文的介绍,我们详细讲解了如何使用Netty构建一个简单的网络聊天室,并扩展序列化算法以提高数据传输效率。Netty的高性能和灵活性使其成为实现各种网络应用的理想选择。希望本文能帮助您更好地理解和使用Netty进行网络编程。
36 12
|
2月前
|
消息中间件 编解码 网络协议
Netty从入门到精通:高性能网络编程的进阶之路
【11月更文挑战第17天】Netty是一个基于Java NIO(Non-blocking I/O)的高性能、异步事件驱动的网络应用框架。使用Netty,开发者可以快速、高效地开发可扩展的网络服务器和客户端程序。本文将带您从Netty的背景、业务场景、功能点、解决问题的关键、底层原理实现,到编写一个详细的Java示例,全面了解Netty,帮助您从入门到精通。
203 0
|
3月前
|
前端开发 网络协议
netty整合websocket(完美教程)
本文是一篇完整的Netty整合WebSocket的教程,介绍了WebSocket的基本概念、使用Netty构建WebSocket服务器的步骤和代码示例,以及如何创建前端WebSocket客户端进行通信的示例。
339 2
netty整合websocket(完美教程)
|
3月前
|
XML Java Nacos
网络协议与Netty
网络协议与Netty
|
4月前
|
存储 机器人 Linux
Netty(二)-服务端网络编程常见网络IO模型讲解
Netty(二)-服务端网络编程常见网络IO模型讲解
|
5月前
|
网络协议 C# 开发者
WPF与Socket编程的完美邂逅:打造流畅网络通信体验——从客户端到服务器端,手把手教你实现基于Socket的实时数据交换
【8月更文挑战第31天】网络通信在现代应用中至关重要,Socket编程作为其实现基础,即便在主要用于桌面应用的Windows Presentation Foundation(WPF)中也发挥着重要作用。本文通过最佳实践,详细介绍如何在WPF应用中利用Socket实现网络通信,包括创建WPF项目、设计用户界面、实现Socket通信逻辑及搭建简单服务器端的全过程。具体步骤涵盖从UI设计到前后端交互的各个环节,并附有详尽示例代码,助力WPF开发者掌握这一关键技术,拓展应用程序的功能与实用性。
165 0
|
5月前
|
Java 应用服务中间件 Linux
(九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽!
现如今的开发环境中,分布式/微服务架构大行其道,而分布式/微服务的根基在于网络编程,而Netty恰恰是Java网络编程领域的无冕之王。Netty这个框架相信大家定然听说过,其在Java网络编程中的地位,好比JavaEE中的Spring。
199 3
|
5月前
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
265 2

热门文章

最新文章