WebSocket+Net二ty构建web聊天程序(二)

简介: WebSocket+Net二ty构建web聊天程序(二)

Netty对WebSocket的支持#


首先每一个Netty服务端的程序都是形似的,想创建不同的服务端,就得给Netty装配的pipeline不同的Handler


针对聊天程序,处理String类型的Json信息,我们选取SimpleChannelInboundHandler, 他是个典型的入站处理器,并且如果我们没有出来数据,她会帮我们回收 重写它里面未实现抽象方法,这些抽象方法同样是回调方法, 当一个新的Channel进来, 它注册进Selector上的过程中,会回调不同的抽象方法


方法名 回调时机
handlerAdded(ChannelHandlerContext ctx) Pepiline中的Handler添加完成回调
channelRegistered(ChannelHandlerContext ctx) channel注册进Selector后回调
channelActive(ChannelHandlerContext ctx) channel处于活动状态回调
channelReadComplete(ChannelHandlerContext ctx) channel, read结束后回调
userEventTriggered(ChannelHandlerContext ctx, Object evt) 当出现用户事件时回调,如 读/写
channelInactive(ChannelHandlerContext ctx) 客户端断开连接时回调
channelUnregistered(ChannelHandlerContext ctx) 客户端断开连接后,取消channel的注册时回调
handlerRemoved(ChannelHandlerContext ctx) 取消channel的注册后,将channel移除ChannelGroup后回调
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 出现异常时回调


handler的设计编码#


要做到点对点的聊天,前提是服务端拥有全部的channel因为所有数据的读写都依赖于它,而 netty为我们提供了ChannelGroup 用来保存所有新添加进来的channel, 此外点对点的聊天,我们需要将用户信息和它所属的channel进行一对一的绑定,才可以精准的匹配出两个channel进而数据交互, 因此添加UserChannel映射类


public class UserChanelRelationship {
    private static HashMap<String, Channel> manager = new HashMap<>();
    public static  void put(String sendId,Channel channel){
        manager.put(sendId,channel);
    }
    public static Channel get(String sendId){
        return  manager.get(sendId);
    }
    public static void outPut(){
        for (HashMap.Entry<String,Channel> entry:manager.entrySet()){
            System.out.println("UserId: "+entry.getKey() + "channelId: "+entry.getValue().id().asLongText());
        }
    }
}


我们把User和Channel之间的关系以键值对的形式存放进Map中,服务端启动后,程序就会维护这个map, 那么问题来了? 什么时候添加两者之间的映射关系呢? 看上handler的回调函数,我们选择 channelRead0() 当我们判断出 客户端发送过来的信息是 CONNECT类型时,添加映射关系


下面是handler的处理编码


public class MyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 用于管理整个客户端的 组
public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame frame) throws Exception {
Channel currentChanenl = channelHandlerContext.channel();
// 1. 获取客户端发送的消息
String content = frame.text();
System.out.println("  content:  "+content);
// 2. 判断不同的消息的类型, 根据不同的类型进行不同的处理
    // 当建立连接时, 第一次open , 初始化channel,将channel和数据库中的用户做一个唯一的关联
DataContent dataContent = JsonUtils.jsonToPojo(content,DataContent.class);
Integer action = dataContent.getAction();
if (action == MsgActionEnum.CHAT.type) {
    // 3. 把聊天记录保存到数据库
    // 4. 同时标记消息的签收状态 [未签收]
    // 5. 从我们的映射中获取接受方的chanel  发送消息
    // 6. 从 chanelGroup中查找 当前的channel是否存在于 group, 只有存在,我们才进行下一步发送
    //  6.1 如果没有接受者用户channel就不writeAndFlush, 等着用户上线后,通过js发起请求拉取未接受的信息
    //  6.2 如果没有接受者用户channel就不writeAndFlush, 可以选择推送
}else if (action == MsgActionEnum.CONNECT.type){
    // 当建立连接时, 第一次open , 初始化channel,将channel和数据库中的用户做一个唯一的关联
    String sendId = dataContent.getChatMsg().getSenderId();
    UserChanelRelationship.put(sendId,currentChanenl);
}else if(action == MsgActionEnum.SINGNED.type){
    // 7. 当用户没有上线时,发送消息的人把要发送的消息持久化在数据库,但是却没有把信息写回到接受者的channel, 把这种消息称为未签收的消息
    // 8. 签收消息, 就是修改数据库中消息的签收状态, 我们和前端约定,前端如何签收消息在上面有提到
    String extend = dataContent.getExtand();
    // 扩展字段在 signed类型代表 需要被签收的消息的id, 用逗号分隔
    String[] msgIdList = extend.split(",");
    List<String> msgIds = new ArrayList<>();
    Arrays.asList(msgIdList).forEach(s->{
        if (null!=s){
            msgIds.add(s);
        }
    });
    if (!msgIds.isEmpty()&&null!=msgIds&&msgIds.size()>0){
        // 批量签收
    }
}else if (action == MsgActionEnum.KEEPALIVE.type){
    // 6. 心跳类型
    System.out.println("收到来自channel 为" +currentChanenl+" 的心跳包... ");
}
}
// handler 添加完成后回调
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 获取链接, 并且若想要群发的话,就得往每一个channel中写数据, 因此我们得在创建连接时, 把channel保存起来
System.err.println("handlerAdded");
users .add(ctx.channel());
}
// 用户关闭了浏览器回调
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 断开连接后, channel会自动移除group
// 我们主动的关闭进行, channel会被移除, 但是我们如果是开启的飞行模式,不会被移除
System.err.println("客户端channel被移出: "+ctx.channel().id().asShortText());
users.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 发生异常关闭channel, 并从ChannelGroup中移除Channel
ctx.channel().close();
users.remove(ctx.channel());
}
... 其他方法


前后端的心跳维持#


双方建立起WebSocket连接后,服务端需要明确的知道,自己维护的诸多channel中,谁已经挂掉了, 为了提高性能,需要及早把废弃的channel移除ChanelGroup

客户端杀掉了进程,或者开启了飞行模式, 这时服务端是感知不到它维护的channel中已经有一个不能使用了,首先来说,维护一个不能使用的channel会影响性能,而且当这个channel的好友给他发送消息时,服务端认为用户在线,于是向一个不存在的channel写入刷新数据,会带来额外的麻烦

这时我们就需要添加心跳机制,客户端设置定时任务,每个一段时间就往服务端发送心跳包,心跳包的内容是什么不是重点,它的作用就是告诉服务端自己还active, N多个客户端都要向服务端发送心跳,这并不会增加服务端的请求,因为这个请求是通过WebSocket的send方法发送过去的,只不过dataContent的类型是 KEEPALIVE , 同样这是我们提前约定好的(此外,服务端向客户端发送心跳看起来是没有必要的)


于是对于后端来说,我们发送的心跳包,会使得当前客户端对应的channel的channelRead0()方法回调, netty为我们提供了心跳相关的handler, 每一次的chanelRead0()的回调,都是read/write事件, 下面是netty对心跳的支持的实现


/**
 * @Author: Changwu
 * @Date: 2019/7/2 9:33
 * 我们的心跳handler不需要实现handler0方法,我们选择,直接继承SimpleInboundHandler的父类
*/
public class HeartHandler extends ChannelInboundHandlerAdapter {
// 我们重写  EventTrigger 方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 当出现read/write  读写写空闲时触发
if(evt instanceof IdleStateEvent){
    IdleStateEvent event = (IdleStateEvent) evt;
    if (event.state()== IdleState.READER_IDLE){ // 读空闲
        System.out.println(ctx.channel().id().asShortText()+" 读空闲... ");
    }else if (event.state()==IdleState.WRITER_IDLE){
        System.out.println(ctx.channel().id().asShortText()+" 写空闲... ");
    }else if (event.state()==IdleState.ALL_IDLE){
        System.out.println("channel 读写空闲, 准备关闭当前channel  , 当前UsersChanel的数量: "+MyHandler.users.size());
        Channel channel = ctx.channel();
        channel.close();
        System.out.println("channel 关闭后, UsersChanel的数量: "+MyHandler.users.size());
    }
}
}


Handler我们不再使用SimpleChannelInboundHandler了,因为它当中的方法都是抽象方法,而我们需要回调的函数时机是,每次当有用户事件时回调, 比如read,write事件, 这些事件可以证明channel还活着,对应的方法是userEventTriggered()


此外, ChannelInboundHandlerAdapter是netty中,适配器模式的体现, 它实现了全都抽象方法,然后他的实现方法中并不是在干活,而是把这个事件往下传播下去了,现在我们重写userEventTriggered() 执行的就是我们的逻辑

另外,我们需要在pipeline中添加handler


... 
/ 添加netty为我们提供的 检测空闲的处理器,  每 20 40 60 秒, 会触发userEventTriggered事件的回调
pipeline.addLast(new IdleStateHandler(10,20,30));
// todo 添加心跳的支持
pipeline.addLast("heartHandler",new HeartHandler());


服务端主动向客户端推送数据#


如, 添加好友的操作中, A向B发送添加好友请求的过程,会经过如下几步

  • A向服务端发送ajax请求,将自己的id, 目标朋友的id持久化到 数据库,请求friend_request表
  • 用户B上线,通过js,向后端拉取friend_request表中有没有关于自己的信息,于是服务端把A的请求给B推送过去
  • 在B的前端回显A的请求, B进一步处理这个信息, 此时两种情况
  • B拒绝了A的请求: 后端把friend_request表关于AB的信息清除
  • B同意了A的请求: 后端在firend_List表中,将AB双方的信息都持久化进去, 这时我们可以顺势在后端的方法中,给B推送最新的联系人信息, 但是这不属于主动推送,因为这次会话是客户端主动发起的


但是A却不知道,B已经同意了,于是需要给A主动的推送数据, 怎么推送呢? 我们需要在上面的UserChannel的关系中,拿出发送者的channel, 然后往回writeAndFlush内容,这时A就得知B已经同意了,重新加载好友列表

相关文章
|
28天前
|
存储 Shell Linux
快速上手基于 BaGet 的脚本自动化构建 .net 应用打包
本文介绍了如何使用脚本自动化构建 `.net` 应用的 `nuget` 包并推送到指定服务仓库。首先概述了 `BaGet`——一个开源、轻量级且高性能的 `NuGet` 服务器,支持多种存储后端及配置选项。接着详细描述了 `BaGet` 的安装、配置及使用方法,并提供了 `PowerShell` 和 `Bash` 脚本实例,用于自动化推送 `.nupkg` 文件。最后总结了 `BaGet` 的优势及其在实际部署中的便捷性。
60 10
|
8天前
|
存储 JavaScript 前端开发
webSocket+Node+Js实现在线聊天(包含所有代码)
文章介绍了如何使用WebSocket、Node.js和JavaScript实现在线聊天功能,包括完整的前端和后端代码示例。
35 0
|
5天前
|
网络协议 JavaScript API
深入浅出 WebSocket:实现实时 web 通信
在现代Web应用中,实时通信至关重要。WebSocket通过单个TCP连接实现全双工通信,允许服务器主动向客户端发送消息。本文介绍了WebSocket的核心概念、实现方法及其优势。WebSocket建立了持久连接,支持实时数据传输,减少服务器负载,并提供双向通信。通过JavaScript API可轻松建立连接、发送接收消息及处理异常。使用WebSocket,开发者能构建更动态的Web应用。
|
7天前
|
前端开发 JavaScript 安全
深入理解Python Web开发中的前后端分离与WebSocket实时通信技术
在现代Web开发中,前后端分离已成为主流架构,通过解耦前端(用户界面)与后端(服务逻辑),提升了开发效率和团队协作。前端使用Vue.js、React等框架与后端通过HTTP/HTTPS通信,而WebSocket则实现了低延迟的全双工实时通信。本文结合Python框架如Flask和Django,探讨了前后端分离与WebSocket的最佳实践,包括明确接口规范、安全性考虑、性能优化及错误处理等方面,助力构建高效、实时且安全的Web应用。
20 2
|
7天前
|
前端开发 Python
前后端分离的进化:Python Web项目中的WebSocket实时通信解决方案
在现代Web开发领域,前后端分离已成为一种主流架构模式,它促进了开发效率、提升了应用的可维护性和可扩展性。随着实时数据交互需求的日益增长,WebSocket作为一种在单个长连接上进行全双工通讯的协议,成为了实现前后端实时通信的理想选择。在Python Web项目中,结合Flask框架与Flask-SocketIO库,我们可以轻松实现WebSocket的实时通信功能。
19 2
|
8天前
|
JavaScript 前端开发 UED
WebSocket在Python Web开发中的革新应用:解锁实时通信的新可能
在快速发展的Web应用领域中,实时通信已成为许多现代应用不可或缺的功能。传统的HTTP请求/响应模式在处理实时数据时显得力不从心,而WebSocket技术的出现,为Python Web开发带来了革命性的变化,它允许服务器与客户端之间建立持久的连接,从而实现了数据的即时传输与交换。本文将通过问题解答的形式,深入探讨WebSocket在Python Web开发中的革新应用及其实现方法。
21 3
|
10天前
|
前端开发 API Python
WebSocket技术详解:如何在Python Web应用中实现无缝实时通信
在Web开发的广阔领域中,实时通信已成为许多应用的核心需求。传统的HTTP请求-响应模型在实时性方面存在明显不足,而WebSocket作为一种在单个长连接上进行全双工通信的协议,为Web应用的实时通信提供了强有力的支持。本文将深入探讨WebSocket技术,并通过一个Python Web应用的案例分析,展示如何在Python中利用WebSocket实现无缝实时通信。
16 2
|
28天前
|
Ubuntu 持续交付 API
如何使用 dotnet pack 打包 .NET 跨平台程序集?
`dotnet pack` 是 .NET Core 的 NuGet 包打包工具,用于将代码打包成 NuGet 包。通过命令 `dotnet pack` 可生成 `.nupkg` 文件。使用 `--include-symbols` 和 `--include-source` 选项可分别创建包含调试符号和源文件的包。默认情况下,`dotnet pack` 会先构建项目,可通过 `--no-build` 跳过构建。此外,还可以使用 `--output` 指定输出目录、`-c` 设置配置等。示例展示了创建类库项目并打包的过程。更多详情及命令选项,请参考官方文档。
86 11
|
27天前
|
存储 运维
.NET开发必备技巧:使用Visual Studio分析.NET Dump,快速查找程序内存泄漏问题!
.NET开发必备技巧:使用Visual Studio分析.NET Dump,快速查找程序内存泄漏问题!
|
27天前
|
自然语言处理 C# 图形学
使用dnSpyEx对.NET Core程序集进行反编译、编辑和调试
使用dnSpyEx对.NET Core程序集进行反编译、编辑和调试
下一篇
无影云桌面