WebSocket+Net二ty构建web聊天程序(二)

简介: WebSocket+Net二ty构建web聊天程序(二)

Netty对WebSocket的支持#


首先每一个Netty服务端的程序都是形似的,想创建不同的服务端,就得给Netty装配的pipeline不同的Handler


针对聊天程序,处理String类型的Json信息,我们选取SimpleChannelInboundHandler, 他是个典型的入站处理器,并且如果我们没有出来数据,她会帮我们回收 重写它里面未实现抽象方法,这些抽象方法同样是回调方法, 当一个新的Channel进来, 它注册进Selector上的过程中,会回调不同的抽象方法


方法名 回调时机
handlerAdded(ChannelHandlerContext ctx) Pepiline中的Handler添加完成回调
channelRegistered(ChannelHandlerContext ctx) channel注册进Selector后回调
channelActive(ChannelHandlerContext ctx) channel处于活动状态回调
channelReadComplete(ChannelHandlerContext ctx) channel, read结束后回调
userEventTriggered(ChannelHandlerContext ctx, Object evt) 当出现用户事件时回调,如 读/写
channelInactive(ChannelHandlerContext ctx) 客户端断开连接时回调
channelUnregistered(ChannelHandlerContext ctx) 客户端断开连接后,取消channel的注册时回调
handlerRemoved(ChannelHandlerContext ctx) 取消channel的注册后,将channel移除ChannelGroup后回调
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 出现异常时回调


handler的设计编码#


要做到点对点的聊天,前提是服务端拥有全部的channel因为所有数据的读写都依赖于它,而 netty为我们提供了ChannelGroup 用来保存所有新添加进来的channel, 此外点对点的聊天,我们需要将用户信息和它所属的channel进行一对一的绑定,才可以精准的匹配出两个channel进而数据交互, 因此添加UserChannel映射类


public class UserChanelRelationship {
    private static HashMap<String, Channel> manager = new HashMap<>();
    public static  void put(String sendId,Channel channel){
        manager.put(sendId,channel);
    }
    public static Channel get(String sendId){
        return  manager.get(sendId);
    }
    public static void outPut(){
        for (HashMap.Entry<String,Channel> entry:manager.entrySet()){
            System.out.println("UserId: "+entry.getKey() + "channelId: "+entry.getValue().id().asLongText());
        }
    }
}


我们把User和Channel之间的关系以键值对的形式存放进Map中,服务端启动后,程序就会维护这个map, 那么问题来了? 什么时候添加两者之间的映射关系呢? 看上handler的回调函数,我们选择 channelRead0() 当我们判断出 客户端发送过来的信息是 CONNECT类型时,添加映射关系


下面是handler的处理编码


public class MyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 用于管理整个客户端的 组
public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame frame) throws Exception {
Channel currentChanenl = channelHandlerContext.channel();
// 1. 获取客户端发送的消息
String content = frame.text();
System.out.println("  content:  "+content);
// 2. 判断不同的消息的类型, 根据不同的类型进行不同的处理
    // 当建立连接时, 第一次open , 初始化channel,将channel和数据库中的用户做一个唯一的关联
DataContent dataContent = JsonUtils.jsonToPojo(content,DataContent.class);
Integer action = dataContent.getAction();
if (action == MsgActionEnum.CHAT.type) {
    // 3. 把聊天记录保存到数据库
    // 4. 同时标记消息的签收状态 [未签收]
    // 5. 从我们的映射中获取接受方的chanel  发送消息
    // 6. 从 chanelGroup中查找 当前的channel是否存在于 group, 只有存在,我们才进行下一步发送
    //  6.1 如果没有接受者用户channel就不writeAndFlush, 等着用户上线后,通过js发起请求拉取未接受的信息
    //  6.2 如果没有接受者用户channel就不writeAndFlush, 可以选择推送
}else if (action == MsgActionEnum.CONNECT.type){
    // 当建立连接时, 第一次open , 初始化channel,将channel和数据库中的用户做一个唯一的关联
    String sendId = dataContent.getChatMsg().getSenderId();
    UserChanelRelationship.put(sendId,currentChanenl);
}else if(action == MsgActionEnum.SINGNED.type){
    // 7. 当用户没有上线时,发送消息的人把要发送的消息持久化在数据库,但是却没有把信息写回到接受者的channel, 把这种消息称为未签收的消息
    // 8. 签收消息, 就是修改数据库中消息的签收状态, 我们和前端约定,前端如何签收消息在上面有提到
    String extend = dataContent.getExtand();
    // 扩展字段在 signed类型代表 需要被签收的消息的id, 用逗号分隔
    String[] msgIdList = extend.split(",");
    List<String> msgIds = new ArrayList<>();
    Arrays.asList(msgIdList).forEach(s->{
        if (null!=s){
            msgIds.add(s);
        }
    });
    if (!msgIds.isEmpty()&&null!=msgIds&&msgIds.size()>0){
        // 批量签收
    }
}else if (action == MsgActionEnum.KEEPALIVE.type){
    // 6. 心跳类型
    System.out.println("收到来自channel 为" +currentChanenl+" 的心跳包... ");
}
}
// handler 添加完成后回调
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 获取链接, 并且若想要群发的话,就得往每一个channel中写数据, 因此我们得在创建连接时, 把channel保存起来
System.err.println("handlerAdded");
users .add(ctx.channel());
}
// 用户关闭了浏览器回调
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 断开连接后, channel会自动移除group
// 我们主动的关闭进行, channel会被移除, 但是我们如果是开启的飞行模式,不会被移除
System.err.println("客户端channel被移出: "+ctx.channel().id().asShortText());
users.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 发生异常关闭channel, 并从ChannelGroup中移除Channel
ctx.channel().close();
users.remove(ctx.channel());
}
... 其他方法


前后端的心跳维持#


双方建立起WebSocket连接后,服务端需要明确的知道,自己维护的诸多channel中,谁已经挂掉了, 为了提高性能,需要及早把废弃的channel移除ChanelGroup

客户端杀掉了进程,或者开启了飞行模式, 这时服务端是感知不到它维护的channel中已经有一个不能使用了,首先来说,维护一个不能使用的channel会影响性能,而且当这个channel的好友给他发送消息时,服务端认为用户在线,于是向一个不存在的channel写入刷新数据,会带来额外的麻烦

这时我们就需要添加心跳机制,客户端设置定时任务,每个一段时间就往服务端发送心跳包,心跳包的内容是什么不是重点,它的作用就是告诉服务端自己还active, N多个客户端都要向服务端发送心跳,这并不会增加服务端的请求,因为这个请求是通过WebSocket的send方法发送过去的,只不过dataContent的类型是 KEEPALIVE , 同样这是我们提前约定好的(此外,服务端向客户端发送心跳看起来是没有必要的)


于是对于后端来说,我们发送的心跳包,会使得当前客户端对应的channel的channelRead0()方法回调, netty为我们提供了心跳相关的handler, 每一次的chanelRead0()的回调,都是read/write事件, 下面是netty对心跳的支持的实现


/**
 * @Author: Changwu
 * @Date: 2019/7/2 9:33
 * 我们的心跳handler不需要实现handler0方法,我们选择,直接继承SimpleInboundHandler的父类
*/
public class HeartHandler extends ChannelInboundHandlerAdapter {
// 我们重写  EventTrigger 方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 当出现read/write  读写写空闲时触发
if(evt instanceof IdleStateEvent){
    IdleStateEvent event = (IdleStateEvent) evt;
    if (event.state()== IdleState.READER_IDLE){ // 读空闲
        System.out.println(ctx.channel().id().asShortText()+" 读空闲... ");
    }else if (event.state()==IdleState.WRITER_IDLE){
        System.out.println(ctx.channel().id().asShortText()+" 写空闲... ");
    }else if (event.state()==IdleState.ALL_IDLE){
        System.out.println("channel 读写空闲, 准备关闭当前channel  , 当前UsersChanel的数量: "+MyHandler.users.size());
        Channel channel = ctx.channel();
        channel.close();
        System.out.println("channel 关闭后, UsersChanel的数量: "+MyHandler.users.size());
    }
}
}


Handler我们不再使用SimpleChannelInboundHandler了,因为它当中的方法都是抽象方法,而我们需要回调的函数时机是,每次当有用户事件时回调, 比如read,write事件, 这些事件可以证明channel还活着,对应的方法是userEventTriggered()


此外, ChannelInboundHandlerAdapter是netty中,适配器模式的体现, 它实现了全都抽象方法,然后他的实现方法中并不是在干活,而是把这个事件往下传播下去了,现在我们重写userEventTriggered() 执行的就是我们的逻辑

另外,我们需要在pipeline中添加handler


... 
/ 添加netty为我们提供的 检测空闲的处理器,  每 20 40 60 秒, 会触发userEventTriggered事件的回调
pipeline.addLast(new IdleStateHandler(10,20,30));
// todo 添加心跳的支持
pipeline.addLast("heartHandler",new HeartHandler());


服务端主动向客户端推送数据#


如, 添加好友的操作中, A向B发送添加好友请求的过程,会经过如下几步

  • A向服务端发送ajax请求,将自己的id, 目标朋友的id持久化到 数据库,请求friend_request表
  • 用户B上线,通过js,向后端拉取friend_request表中有没有关于自己的信息,于是服务端把A的请求给B推送过去
  • 在B的前端回显A的请求, B进一步处理这个信息, 此时两种情况
  • B拒绝了A的请求: 后端把friend_request表关于AB的信息清除
  • B同意了A的请求: 后端在firend_List表中,将AB双方的信息都持久化进去, 这时我们可以顺势在后端的方法中,给B推送最新的联系人信息, 但是这不属于主动推送,因为这次会话是客户端主动发起的


但是A却不知道,B已经同意了,于是需要给A主动的推送数据, 怎么推送呢? 我们需要在上面的UserChannel的关系中,拿出发送者的channel, 然后往回writeAndFlush内容,这时A就得知B已经同意了,重新加载好友列表

相关文章
|
1天前
|
XML 开发框架 .NET
LabVIEW中加载.NET 2.0,3.0和3.5程序集
LabVIEW中加载.NET 2.0,3.0和3.5程序集
11 4
|
2天前
|
XML 开发框架 .NET
C#/ASP.NET应用程序配置文件app.config/web.config的增、删、改操作
C#/ASP.NET应用程序配置文件app.config/web.config的增、删、改操作
|
5天前
|
开发框架 JSON .NET
.Net4.0 Web.config 配置实践
.Net4.0 Web.config 配置实践
|
5天前
|
数据采集 存储 XML
如何利用Python构建高效的Web爬虫
本文将介绍如何使用Python语言以及相关的库和工具,构建一个高效的Web爬虫。通过深入讨论爬虫的基本原理、常用的爬虫框架以及优化技巧,读者将能够了解如何编写可靠、高效的爬虫程序,实现数据的快速获取和处理。
|
10天前
|
运维 前端开发 JavaScript
【专栏:HTML进阶篇】HTML与Web标准:构建可访问与可维护的网页
【4月更文挑战第30天】本文探讨了HTML与Web标准的关系,强调遵循标准对创建高质量、可访问、可维护网页的重要性。通过使用语义化标签、提供文本替代、合理使用表格和列表,可提升网页可访问性;通过结构化文档、添加注释、分离结构与表现,能增强网页可维护性。遵循Web标准,可确保网页在不同设备上的兼容性,并满足各类用户需求。
|
11天前
|
开发框架 Dart 前端开发
【Flutter前端技术开发专栏】Flutter中的Web支持:构建跨平台Web应用
【4月更文挑战第30天】Flutter,Google的开源跨平台框架,已延伸至Web领域,让开发者能用同一代码库构建移动和Web应用。Flutter Web通过将Dart代码编译成JavaScript和WASM运行在Web上。尽管性能可能不及原生Web应用,但适合交互性强、UI复杂的应用。开发者应关注性能优化、兼容性测试,并利用Flutter的声明式UI、热重载等优势。随着其发展,Flutter Web为跨平台开发带来更多潜力。
【Flutter前端技术开发专栏】Flutter中的Web支持:构建跨平台Web应用
|
11天前
|
缓存 监控 测试技术
【Go语言专栏】使用Go语言构建高性能Web服务
【4月更文挑战第30天】本文探讨了使用Go语言构建高性能Web服务的策略,包括Go语言在并发处理和内存管理上的优势、基本原则(如保持简单、缓存和并发控制)、标准库与第三方框架的选择、编写高效的HTTP处理器、数据库优化以及性能测试和监控。通过遵循最佳实践,开发者可以充分利用Go语言的特性,构建出高性能的Web服务。
|
12天前
|
网络协议 数据库 开发者
构建高效Python Web应用:异步编程与Tornado框架
【4月更文挑战第29天】在Web开发领域,响应时间和并发处理能力是衡量应用性能的关键指标。Python作为一种广泛使用的编程语言,其异步编程特性为创建高性能Web服务提供了可能。本文将深入探讨Python中的异步编程概念,并介绍Tornado框架如何利用这一机制来提升Web应用的性能。通过实例分析,我们将了解如何在实际应用中实现高效的请求处理和I/O操作,以及如何优化数据库查询,以支持更高的并发用户数和更快的响应时间。
|
网络协议 前端开发 安全
websocket和http的瓜葛以及websocket协议实现
websocket和http的瓜葛以及websocket协议实现
websocket和http的瓜葛以及websocket协议实现
|
JavaScript
js实现websocket实例
js实现websocket实例
197 0