Netty对WebSocket的支持#
首先每一个Netty服务端的程序都是形似的,想创建不同的服务端,就得给Netty装配的pipeline不同的Handler
针对聊天程序,处理String类型的Json信息,我们选取SimpleChannelInboundHandler
, 他是个典型的入站处理器,并且如果我们没有出来数据,她会帮我们回收 重写它里面未实现抽象方法,这些抽象方法同样是回调方法, 当一个新的Channel进来, 它注册进Selector上的过程中,会回调不同的抽象方法
方法名 | 回调时机 |
handlerAdded(ChannelHandlerContext ctx) | Pepiline中的Handler添加完成回调 |
channelRegistered(ChannelHandlerContext ctx) | channel注册进Selector后回调 |
channelActive(ChannelHandlerContext ctx) | channel处于活动状态回调 |
channelReadComplete(ChannelHandlerContext ctx) | channel, read结束后回调 |
userEventTriggered(ChannelHandlerContext ctx, Object evt) | 当出现用户事件时回调,如 读/写 |
channelInactive(ChannelHandlerContext ctx) | 客户端断开连接时回调 |
channelUnregistered(ChannelHandlerContext ctx) | 客户端断开连接后,取消channel的注册时回调 |
handlerRemoved(ChannelHandlerContext ctx) | 取消channel的注册后,将channel移除ChannelGroup后回调 |
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) | 出现异常时回调 |
handler的设计编码#
要做到点对点的聊天,前提是服务端拥有全部的channel因为所有数据的读写都依赖于它,而 netty为我们提供了ChannelGroup
用来保存所有新添加进来的channel, 此外点对点的聊天,我们需要将用户信息和它所属的channel进行一对一的绑定,才可以精准的匹配出两个channel进而数据交互, 因此添加UserChannel映射类
public class UserChanelRelationship { private static HashMap<String, Channel> manager = new HashMap<>(); public static void put(String sendId,Channel channel){ manager.put(sendId,channel); } public static Channel get(String sendId){ return manager.get(sendId); } public static void outPut(){ for (HashMap.Entry<String,Channel> entry:manager.entrySet()){ System.out.println("UserId: "+entry.getKey() + "channelId: "+entry.getValue().id().asLongText()); } } }
我们把User和Channel之间的关系以键值对的形式存放进Map中,服务端启动后,程序就会维护这个map, 那么问题来了? 什么时候添加两者之间的映射关系呢? 看上handler的回调函数,我们选择 channelRead0()
当我们判断出 客户端发送过来的信息是 CONNECT
类型时,添加映射关系
下面是handler的处理编码
public class MyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { // 用于管理整个客户端的 组 public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame frame) throws Exception { Channel currentChanenl = channelHandlerContext.channel(); // 1. 获取客户端发送的消息 String content = frame.text(); System.out.println(" content: "+content); // 2. 判断不同的消息的类型, 根据不同的类型进行不同的处理 // 当建立连接时, 第一次open , 初始化channel,将channel和数据库中的用户做一个唯一的关联 DataContent dataContent = JsonUtils.jsonToPojo(content,DataContent.class); Integer action = dataContent.getAction(); if (action == MsgActionEnum.CHAT.type) { // 3. 把聊天记录保存到数据库 // 4. 同时标记消息的签收状态 [未签收] // 5. 从我们的映射中获取接受方的chanel 发送消息 // 6. 从 chanelGroup中查找 当前的channel是否存在于 group, 只有存在,我们才进行下一步发送 // 6.1 如果没有接受者用户channel就不writeAndFlush, 等着用户上线后,通过js发起请求拉取未接受的信息 // 6.2 如果没有接受者用户channel就不writeAndFlush, 可以选择推送 }else if (action == MsgActionEnum.CONNECT.type){ // 当建立连接时, 第一次open , 初始化channel,将channel和数据库中的用户做一个唯一的关联 String sendId = dataContent.getChatMsg().getSenderId(); UserChanelRelationship.put(sendId,currentChanenl); }else if(action == MsgActionEnum.SINGNED.type){ // 7. 当用户没有上线时,发送消息的人把要发送的消息持久化在数据库,但是却没有把信息写回到接受者的channel, 把这种消息称为未签收的消息 // 8. 签收消息, 就是修改数据库中消息的签收状态, 我们和前端约定,前端如何签收消息在上面有提到 String extend = dataContent.getExtand(); // 扩展字段在 signed类型代表 需要被签收的消息的id, 用逗号分隔 String[] msgIdList = extend.split(","); List<String> msgIds = new ArrayList<>(); Arrays.asList(msgIdList).forEach(s->{ if (null!=s){ msgIds.add(s); } }); if (!msgIds.isEmpty()&&null!=msgIds&&msgIds.size()>0){ // 批量签收 } }else if (action == MsgActionEnum.KEEPALIVE.type){ // 6. 心跳类型 System.out.println("收到来自channel 为" +currentChanenl+" 的心跳包... "); } } // handler 添加完成后回调 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 获取链接, 并且若想要群发的话,就得往每一个channel中写数据, 因此我们得在创建连接时, 把channel保存起来 System.err.println("handlerAdded"); users .add(ctx.channel()); } // 用户关闭了浏览器回调 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // 断开连接后, channel会自动移除group // 我们主动的关闭进行, channel会被移除, 但是我们如果是开启的飞行模式,不会被移除 System.err.println("客户端channel被移出: "+ctx.channel().id().asShortText()); users.remove(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 发生异常关闭channel, 并从ChannelGroup中移除Channel ctx.channel().close(); users.remove(ctx.channel()); } ... 其他方法
前后端的心跳维持#
双方建立起WebSocket连接后,服务端需要明确的知道,自己维护的诸多channel中,谁已经挂掉了, 为了提高性能,需要及早把废弃的channel移除ChanelGroup
客户端杀掉了进程,或者开启了飞行模式, 这时服务端是感知不到它维护的channel中已经有一个不能使用了,首先来说,维护一个不能使用的channel会影响性能,而且当这个channel的好友给他发送消息时,服务端认为用户在线,于是向一个不存在的channel写入刷新数据,会带来额外的麻烦
这时我们就需要添加心跳机制,客户端设置定时任务,每个一段时间就往服务端发送心跳包,心跳包的内容是什么不是重点,它的作用就是告诉服务端自己还active, N多个客户端都要向服务端发送心跳,这并不会增加服务端的请求,因为这个请求是通过WebSocket的send方法发送过去的,只不过dataContent的类型是 KEEPALIVE , 同样这是我们提前约定好的(此外,服务端向客户端发送心跳看起来是没有必要的)
于是对于后端来说,我们发送的心跳包,会使得当前客户端对应的channel的channelRead0()方法回调, netty为我们提供了心跳相关的handler, 每一次的chanelRead0()的回调,都是read/write事件, 下面是netty对心跳的支持的实现
/** * @Author: Changwu * @Date: 2019/7/2 9:33 * 我们的心跳handler不需要实现handler0方法,我们选择,直接继承SimpleInboundHandler的父类 */ public class HeartHandler extends ChannelInboundHandlerAdapter { // 我们重写 EventTrigger 方法 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // 当出现read/write 读写写空闲时触发 if(evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent) evt; if (event.state()== IdleState.READER_IDLE){ // 读空闲 System.out.println(ctx.channel().id().asShortText()+" 读空闲... "); }else if (event.state()==IdleState.WRITER_IDLE){ System.out.println(ctx.channel().id().asShortText()+" 写空闲... "); }else if (event.state()==IdleState.ALL_IDLE){ System.out.println("channel 读写空闲, 准备关闭当前channel , 当前UsersChanel的数量: "+MyHandler.users.size()); Channel channel = ctx.channel(); channel.close(); System.out.println("channel 关闭后, UsersChanel的数量: "+MyHandler.users.size()); } } }
Handler我们不再使用SimpleChannelInboundHandler了,因为它当中的方法都是抽象方法,而我们需要回调的函数时机是,每次当有用户事件时回调, 比如read,write事件, 这些事件可以证明channel还活着,对应的方法是userEventTriggered()
此外, ChannelInboundHandlerAdapter是netty中,适配器模式的体现, 它实现了全都抽象方法,然后他的实现方法中并不是在干活,而是把这个事件往下传播下去了,现在我们重写userEventTriggered()
执行的就是我们的逻辑
另外,我们需要在pipeline中添加handler
... / 添加netty为我们提供的 检测空闲的处理器, 每 20 40 60 秒, 会触发userEventTriggered事件的回调 pipeline.addLast(new IdleStateHandler(10,20,30)); // todo 添加心跳的支持 pipeline.addLast("heartHandler",new HeartHandler());
服务端主动向客户端推送数据#
如, 添加好友的操作中, A向B发送添加好友请求的过程,会经过如下几步
- A向服务端发送ajax请求,将自己的id, 目标朋友的id持久化到 数据库,请求friend_request表
- 用户B上线,通过js,向后端拉取friend_request表中有没有关于自己的信息,于是服务端把A的请求给B推送过去
- 在B的前端回显A的请求, B进一步处理这个信息, 此时两种情况
- B拒绝了A的请求: 后端把friend_request表关于AB的信息清除
- B同意了A的请求: 后端在firend_List表中,将AB双方的信息都持久化进去, 这时我们可以顺势在后端的方法中,给B推送最新的联系人信息, 但是这不属于主动推送,因为这次会话是客户端主动发起的
但是A却不知道,B已经同意了,于是需要给A主动的推送数据, 怎么推送呢? 我们需要在上面的UserChannel的关系中,拿出发送者的channel, 然后往回writeAndFlush
内容,这时A就得知B已经同意了,重新加载好友列表