WebSocket+Net二ty构建web聊天程序(二)

简介: WebSocket+Net二ty构建web聊天程序(二)

Netty对WebSocket的支持#


首先每一个Netty服务端的程序都是形似的,想创建不同的服务端,就得给Netty装配的pipeline不同的Handler


针对聊天程序,处理String类型的Json信息,我们选取SimpleChannelInboundHandler, 他是个典型的入站处理器,并且如果我们没有出来数据,她会帮我们回收 重写它里面未实现抽象方法,这些抽象方法同样是回调方法, 当一个新的Channel进来, 它注册进Selector上的过程中,会回调不同的抽象方法


方法名 回调时机
handlerAdded(ChannelHandlerContext ctx) Pepiline中的Handler添加完成回调
channelRegistered(ChannelHandlerContext ctx) channel注册进Selector后回调
channelActive(ChannelHandlerContext ctx) channel处于活动状态回调
channelReadComplete(ChannelHandlerContext ctx) channel, read结束后回调
userEventTriggered(ChannelHandlerContext ctx, Object evt) 当出现用户事件时回调,如 读/写
channelInactive(ChannelHandlerContext ctx) 客户端断开连接时回调
channelUnregistered(ChannelHandlerContext ctx) 客户端断开连接后,取消channel的注册时回调
handlerRemoved(ChannelHandlerContext ctx) 取消channel的注册后,将channel移除ChannelGroup后回调
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 出现异常时回调


handler的设计编码#


要做到点对点的聊天,前提是服务端拥有全部的channel因为所有数据的读写都依赖于它,而 netty为我们提供了ChannelGroup 用来保存所有新添加进来的channel, 此外点对点的聊天,我们需要将用户信息和它所属的channel进行一对一的绑定,才可以精准的匹配出两个channel进而数据交互, 因此添加UserChannel映射类


public class UserChanelRelationship {
    private static HashMap<String, Channel> manager = new HashMap<>();
    public static  void put(String sendId,Channel channel){
        manager.put(sendId,channel);
    }
    public static Channel get(String sendId){
        return  manager.get(sendId);
    }
    public static void outPut(){
        for (HashMap.Entry<String,Channel> entry:manager.entrySet()){
            System.out.println("UserId: "+entry.getKey() + "channelId: "+entry.getValue().id().asLongText());
        }
    }
}


我们把User和Channel之间的关系以键值对的形式存放进Map中,服务端启动后,程序就会维护这个map, 那么问题来了? 什么时候添加两者之间的映射关系呢? 看上handler的回调函数,我们选择 channelRead0() 当我们判断出 客户端发送过来的信息是 CONNECT类型时,添加映射关系


下面是handler的处理编码


public class MyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 用于管理整个客户端的 组
public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame frame) throws Exception {
Channel currentChanenl = channelHandlerContext.channel();
// 1. 获取客户端发送的消息
String content = frame.text();
System.out.println("  content:  "+content);
// 2. 判断不同的消息的类型, 根据不同的类型进行不同的处理
    // 当建立连接时, 第一次open , 初始化channel,将channel和数据库中的用户做一个唯一的关联
DataContent dataContent = JsonUtils.jsonToPojo(content,DataContent.class);
Integer action = dataContent.getAction();
if (action == MsgActionEnum.CHAT.type) {
    // 3. 把聊天记录保存到数据库
    // 4. 同时标记消息的签收状态 [未签收]
    // 5. 从我们的映射中获取接受方的chanel  发送消息
    // 6. 从 chanelGroup中查找 当前的channel是否存在于 group, 只有存在,我们才进行下一步发送
    //  6.1 如果没有接受者用户channel就不writeAndFlush, 等着用户上线后,通过js发起请求拉取未接受的信息
    //  6.2 如果没有接受者用户channel就不writeAndFlush, 可以选择推送
}else if (action == MsgActionEnum.CONNECT.type){
    // 当建立连接时, 第一次open , 初始化channel,将channel和数据库中的用户做一个唯一的关联
    String sendId = dataContent.getChatMsg().getSenderId();
    UserChanelRelationship.put(sendId,currentChanenl);
}else if(action == MsgActionEnum.SINGNED.type){
    // 7. 当用户没有上线时,发送消息的人把要发送的消息持久化在数据库,但是却没有把信息写回到接受者的channel, 把这种消息称为未签收的消息
    // 8. 签收消息, 就是修改数据库中消息的签收状态, 我们和前端约定,前端如何签收消息在上面有提到
    String extend = dataContent.getExtand();
    // 扩展字段在 signed类型代表 需要被签收的消息的id, 用逗号分隔
    String[] msgIdList = extend.split(",");
    List<String> msgIds = new ArrayList<>();
    Arrays.asList(msgIdList).forEach(s->{
        if (null!=s){
            msgIds.add(s);
        }
    });
    if (!msgIds.isEmpty()&&null!=msgIds&&msgIds.size()>0){
        // 批量签收
    }
}else if (action == MsgActionEnum.KEEPALIVE.type){
    // 6. 心跳类型
    System.out.println("收到来自channel 为" +currentChanenl+" 的心跳包... ");
}
}
// handler 添加完成后回调
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 获取链接, 并且若想要群发的话,就得往每一个channel中写数据, 因此我们得在创建连接时, 把channel保存起来
System.err.println("handlerAdded");
users .add(ctx.channel());
}
// 用户关闭了浏览器回调
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 断开连接后, channel会自动移除group
// 我们主动的关闭进行, channel会被移除, 但是我们如果是开启的飞行模式,不会被移除
System.err.println("客户端channel被移出: "+ctx.channel().id().asShortText());
users.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 发生异常关闭channel, 并从ChannelGroup中移除Channel
ctx.channel().close();
users.remove(ctx.channel());
}
... 其他方法


前后端的心跳维持#


双方建立起WebSocket连接后,服务端需要明确的知道,自己维护的诸多channel中,谁已经挂掉了, 为了提高性能,需要及早把废弃的channel移除ChanelGroup

客户端杀掉了进程,或者开启了飞行模式, 这时服务端是感知不到它维护的channel中已经有一个不能使用了,首先来说,维护一个不能使用的channel会影响性能,而且当这个channel的好友给他发送消息时,服务端认为用户在线,于是向一个不存在的channel写入刷新数据,会带来额外的麻烦

这时我们就需要添加心跳机制,客户端设置定时任务,每个一段时间就往服务端发送心跳包,心跳包的内容是什么不是重点,它的作用就是告诉服务端自己还active, N多个客户端都要向服务端发送心跳,这并不会增加服务端的请求,因为这个请求是通过WebSocket的send方法发送过去的,只不过dataContent的类型是 KEEPALIVE , 同样这是我们提前约定好的(此外,服务端向客户端发送心跳看起来是没有必要的)


于是对于后端来说,我们发送的心跳包,会使得当前客户端对应的channel的channelRead0()方法回调, netty为我们提供了心跳相关的handler, 每一次的chanelRead0()的回调,都是read/write事件, 下面是netty对心跳的支持的实现


/**
 * @Author: Changwu
 * @Date: 2019/7/2 9:33
 * 我们的心跳handler不需要实现handler0方法,我们选择,直接继承SimpleInboundHandler的父类
*/
public class HeartHandler extends ChannelInboundHandlerAdapter {
// 我们重写  EventTrigger 方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 当出现read/write  读写写空闲时触发
if(evt instanceof IdleStateEvent){
    IdleStateEvent event = (IdleStateEvent) evt;
    if (event.state()== IdleState.READER_IDLE){ // 读空闲
        System.out.println(ctx.channel().id().asShortText()+" 读空闲... ");
    }else if (event.state()==IdleState.WRITER_IDLE){
        System.out.println(ctx.channel().id().asShortText()+" 写空闲... ");
    }else if (event.state()==IdleState.ALL_IDLE){
        System.out.println("channel 读写空闲, 准备关闭当前channel  , 当前UsersChanel的数量: "+MyHandler.users.size());
        Channel channel = ctx.channel();
        channel.close();
        System.out.println("channel 关闭后, UsersChanel的数量: "+MyHandler.users.size());
    }
}
}


Handler我们不再使用SimpleChannelInboundHandler了,因为它当中的方法都是抽象方法,而我们需要回调的函数时机是,每次当有用户事件时回调, 比如read,write事件, 这些事件可以证明channel还活着,对应的方法是userEventTriggered()


此外, ChannelInboundHandlerAdapter是netty中,适配器模式的体现, 它实现了全都抽象方法,然后他的实现方法中并不是在干活,而是把这个事件往下传播下去了,现在我们重写userEventTriggered() 执行的就是我们的逻辑

另外,我们需要在pipeline中添加handler


... 
/ 添加netty为我们提供的 检测空闲的处理器,  每 20 40 60 秒, 会触发userEventTriggered事件的回调
pipeline.addLast(new IdleStateHandler(10,20,30));
// todo 添加心跳的支持
pipeline.addLast("heartHandler",new HeartHandler());


服务端主动向客户端推送数据#


如, 添加好友的操作中, A向B发送添加好友请求的过程,会经过如下几步

  • A向服务端发送ajax请求,将自己的id, 目标朋友的id持久化到 数据库,请求friend_request表
  • 用户B上线,通过js,向后端拉取friend_request表中有没有关于自己的信息,于是服务端把A的请求给B推送过去
  • 在B的前端回显A的请求, B进一步处理这个信息, 此时两种情况
  • B拒绝了A的请求: 后端把friend_request表关于AB的信息清除
  • B同意了A的请求: 后端在firend_List表中,将AB双方的信息都持久化进去, 这时我们可以顺势在后端的方法中,给B推送最新的联系人信息, 但是这不属于主动推送,因为这次会话是客户端主动发起的


但是A却不知道,B已经同意了,于是需要给A主动的推送数据, 怎么推送呢? 我们需要在上面的UserChannel的关系中,拿出发送者的channel, 然后往回writeAndFlush内容,这时A就得知B已经同意了,重新加载好友列表

相关文章
|
1月前
|
开发框架 前端开发 JavaScript
ASP.NET Web Pages - 教程
ASP.NET Web Pages 是一种用于创建动态网页的开发模式,采用HTML、CSS、JavaScript 和服务器脚本。本教程聚焦于Web Pages,介绍如何使用Razor语法结合服务器端代码与前端技术,以及利用WebMatrix工具进行开发。适合初学者入门ASP.NET。
|
3月前
|
XML JSON API
ServiceStack:不仅仅是一个高性能Web API和微服务框架,更是一站式解决方案——深入解析其多协议支持及简便开发流程,带您体验前所未有的.NET开发效率革命
【10月更文挑战第9天】ServiceStack 是一个高性能的 Web API 和微服务框架,支持 JSON、XML、CSV 等多种数据格式。它简化了 .NET 应用的开发流程,提供了直观的 RESTful 服务构建方式。ServiceStack 支持高并发请求和复杂业务逻辑,安装简单,通过 NuGet 包管理器即可快速集成。示例代码展示了如何创建一个返回当前日期的简单服务,包括定义请求和响应 DTO、实现服务逻辑、配置路由和宿主。ServiceStack 还支持 WebSocket、SignalR 等实时通信协议,具备自动验证、自动过滤器等丰富功能,适合快速搭建高性能、可扩展的服务端应用。
191 3
|
11天前
|
运维 前端开发 C#
一套以用户体验出发的.NET8 Web开源框架
一套以用户体验出发的.NET8 Web开源框架
一套以用户体验出发的.NET8 Web开源框架
|
11天前
|
前端开发 C# 开发者
.NET使用Umbraco CMS快速构建一个属于自己的内容管理系统
.NET使用Umbraco CMS快速构建一个属于自己的内容管理系统
29 12
|
1月前
|
算法 Java 测试技术
Benchmark.NET:让 C# 测试程序性能变得既酷又简单
Benchmark.NET是一款专为 .NET 平台设计的性能基准测试框架,它可以帮助你测量代码的执行时间、内存使用情况等性能指标。它就像是你代码的 "健身教练",帮助你找到瓶颈,优化性能,让你的应用跑得更快、更稳!希望这个小教程能让你在追求高性能的路上越走越远,享受编程带来的无限乐趣!
104 13
|
29天前
|
弹性计算 开发框架 安全
基于云效 Windows 构建环境和 Nuget 制品仓库进行 .Net 应用开发
本文将基于云效 Flow 流水线 Windows 构建环境和云效 Packages Nuget 制品仓库手把手教你如何开发并部署一个 .NET 应用,从环境搭建到实战应用发布的详细教程,帮助你掌握 .NET 开发的核心技能。
|
1月前
|
网络协议 应用服务中间件 网络安全
odoo17在线聊天报错提示 Couldn‘t bind the websocket...
当 Odoo 17 报错 "Couldn't bind the websocket..." 时,通过检查和配置 WebSocket 端口、防火墙规则、代理服务器以及 Odoo 配置文件,可以有效解决此问题。确保每一步操作准确无误,最终重启相关服务,使配置生效。希望这些步骤能帮助您快速恢复 Odoo 的在线聊天功能。
48 1
|
1月前
|
开发框架 .NET PHP
ASP.NET Web Pages - 添加 Razor 代码
ASP.NET Web Pages 使用 Razor 标记添加服务器端代码,支持 C# 和 Visual Basic。Razor 语法简洁易学,类似于 ASP 和 PHP。例如,在网页中加入 `@DateTime.Now` 可以实时显示当前时间。
|
2月前
|
存储 缓存 前端开发
Web端IM聊天消息该不该用浏览器本地存储?一文即懂!
鉴于目前浏览器技术的进步(主要是HTML5的普及),在Web网页端IM聊天应用的技术选型阶段,很多开发者都会纠结到底该不该像原生移动端IM那样将聊天记录缓存在浏览器的本地,还是像传统Web端即时通讯那样继续存储在服务端?本文将为你简洁明了地讲清楚浏览器本地存储技术(Web Storage),然后你就知道到底该怎么选择了。
40 1
|
2月前
|
Kubernetes Cloud Native Ubuntu
庆祝 .NET 9 正式版发布与 Dapr 从 CNCF 毕业:构建高效云原生应用的最佳实践
2024年11月13日,.NET 9 正式版发布,Dapr 从 CNCF 毕业,标志着云原生技术的成熟。本文介绍如何使用 .NET 9 Aspire、Dapr 1.14.4、Kubernetes 1.31.0/Containerd 1.7.14、Ubuntu Server 24.04 LTS 和 Podman 5.3.0-rc3 构建高效、可靠的云原生应用。涵盖环境准备、应用开发、Dapr 集成、容器化和 Kubernetes 部署等内容。
79 5