跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
全局流量管理 GTM,标准版 1个月
简介: 关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。

本文由竹子爱熊猫分享,原题“(十一)Netty实战篇:基于Netty框架打造一款高性能的IM即时通讯程序”,本文有修订和改动。

1、引言

关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。

原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。

技术交流:

- 移动端IM开发入门文章:《新手入门一篇就够:从零开发移动端IM

- 开源IM框架源码:https://github.com/JackJiang2011/MobileIMSDK备用地址点此

(本文已同步发布于:http://www.52im.net/thread-4530-1-1.html

2、配套源码

本文配套源码的开源托管地址是:

3、知识准备

关于 Netty 是什么,这里简单介绍下:

Netty 是一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。

Netty 相当简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。

有关Netty的入门文章:

如果你连Java NIO都不知道,下面的文章建议优先读:

Netty源码和API 在线查阅地址:

4、基于Netty设计通信协议

协议,这玩意儿相信大家肯定不陌生了,简单回顾一下协议的概念:网络协议是指一种通信双方都必须遵守的约定,两个不同的端,按照一定的格式对数据进行“编码”,同时按照相同的规则进行“解码”,从而实现两者之间的数据传输与通信。

当自己想要打造一款IM通信程序时,对于消息的封装、拆分也同样需要设计一个协议,通信的两端都必须遵守该协议工作,这也是实现通信程序的前提。

但为什么需要通信协议呢?

因为TCP/IP中是基于流的方式传输消息,消息与消息之间没有边界,而协议的目的则在于约定消息的样式、边界等。

5、Redis通信的RESP协议参考学习

不知大家是否还记得之前我聊到的RESP客户端协议,这是Redis提供的一种客户端通信协议。如果想要操作Redis,就必须遵守该协议的格式发送数据。

这个协议特别简单,如下:

  • 1)首先要求所有命令,都以*开头,后面跟着具体的子命令数量,接着用换行符分割;
  • 2)接着需要先用$符号声明每个子命令的长度,然后再用换行符分割;
  • 3)最后再拼接上具体的子命令,同样用换行符分割。

这样描述有些令人难懂,那就直接看个案例,例如一条简单set命令。

如下:

客户端命令:

   setname ZhuZi

转变为RESP指令:

   *3

   $3

   set

   $4

   name

   $5

   ZhuZi

按照Redis的规定,但凡满足RESP协议的客户端,都可以直接连接并操作Redis服务端,这也就意味着咱们可以直接通过Netty来手写一个Redis客户端。

代码如下:

// 基于Netty、RESP协议实现的Redis客户端

publicclassRedisClient {

   // 换行符的ASCII码

   staticfinalbyte[] LINE = {13, 10};

 

   publicstaticvoidmain(String[] args) {

       EventLoopGroup worker = newNioEventLoopGroup();

       Bootstrap client = newBootstrap();

 

       try{

           client.group(worker);

           client.channel(NioSocketChannel.class);

           client.handler(newChannelInitializer<SocketChannel>() {

               @Override

               protectedvoidinitChannel(SocketChannel socketChannel)

                                                       throwsException {

                   ChannelPipeline pipeline = socketChannel.pipeline();

 

                   pipeline.addLast(newChannelInboundHandlerAdapter(){

 

                       // 通道建立成功后调用:向Redis发送一条set命令

                       @Override

                       publicvoidchannelActive(ChannelHandlerContext ctx)

                                                           throwsException {

                           String command = "set name ZhuZi";

                           ByteBuf buffer = respCommand(command);

                           ctx.channel().writeAndFlush(buffer);

                       }

 

                       // Redis响应数据时触发:打印Redis的响应结果

                       @Override

                       publicvoidchannelRead(ChannelHandlerContext ctx,

                                               Object msg) throwsException {

                           // 接受Redis服务端执行指令后的结果

                           ByteBuf buffer = (ByteBuf) msg;

                           System.out.println(buffer.toString(CharsetUtil.UTF_8));

                       }

                   });

               }

           });

 

           // 根据IP、端口连接Redis服务端

           client.connect("192.168.12.129", 6379).sync();

       } catch(Exception e){

           e.printStackTrace();

       }

   }

 

   privatestaticByteBuf respCommand(String command){

       // 先对传入的命令以空格进行分割

       String[] commands = command.split(" ");

       ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();

 

       // 遵循RESP协议:先写入指令的个数

       buffer.writeBytes(("*"+ commands.length).getBytes());

       buffer.writeBytes(LINE);

 

       // 接着分别写入每个指令的长度以及具体值

       for(String s : commands) {

           buffer.writeBytes(("$"+ s.length()).getBytes());

           buffer.writeBytes(LINE);

           buffer.writeBytes(s.getBytes());

           buffer.writeBytes(LINE);

       }

       // 把转换成RESP格式的命令返回

       returnbuffer;

   }

}

在上述这个案例中,也仅仅只是通过respCommand()这个方法,对用户输入的指令进行了转换。同时在上面通过Netty,与Redis的地址、端口建立了连接。在连接建立成功后,就会向Redis发送一条转换成RESP指令的set命令。接着等待Redis的响应结果并输出,如下:

+OK

因为这是一条写指令,所以当Redis收到执行完成后,最终就会返回一个OK,大家也可直接去Redis中查询,也依旧能够查询到刚刚写入的name这个键值。

6、HTTP超文本传输协议参考学习

前面咱们自己针对于Redis的RESP协议,对用户指令进行了封装,然后发往Redis执行。

但对于这些常用的协议,Netty早已提供好了现成的处理器,想要使用时无需从头开发,可以直接使用现成的处理器来实现。

比如现在咱们可以基于Netty提供的处理器,实现一个简单的HTTP服务器。

代码如下:

// 基于Netty提供的处理器实现HTTP服务器

publicclassHttpServer {

   publicstaticvoidmain(String[] args) throwsInterruptedException {

       EventLoopGroup boss = newNioEventLoopGroup();

       EventLoopGroup worker = newNioEventLoopGroup();

       ServerBootstrap server = newServerBootstrap();

       server

           .group(boss,worker)

           .channel(NioServerSocketChannel.class)

           .childHandler(newChannelInitializer<NioSocketChannel>() {

               @Override

               protectedvoidinitChannel(NioSocketChannel ch) {

                   ChannelPipeline pipeline = ch.pipeline();

 

                   // 添加一个Netty提供的HTTP处理器

                   pipeline.addLast(newHttpServerCodec());

                   pipeline.addLast(newChannelInboundHandlerAdapter() {

                       @Override

                       publicvoidchannelRead(ChannelHandlerContext ctx,

                                               Object msg) throwsException {

                           // 在这里输出一下消息的类型

                           System.out.println("消息类型:"+ msg.getClass());

                           super.channelRead(ctx, msg);

                       }

                   });

                   pipeline.addLast(newSimpleChannelInboundHandler<HttpRequest>() {

                       @Override

                       protectedvoidchannelRead0(ChannelHandlerContext ctx,

                                                   HttpRequest msg) throwsException {

                           System.out.println("客户端的请求路径:"+ msg.uri());

 

                           // 创建一个响应对象,版本号与客户端保持一致,状态码为OK/200

                           DefaultFullHttpResponse response =

                                   newDefaultFullHttpResponse(

                                           msg.protocolVersion(),

                                           HttpResponseStatus.OK);

 

                           // 构造响应内容

                           byte[] content = "<h1>Hi, ZhuZi!</h1>".getBytes();

 

                           // 设置响应头:告诉客户端本次响应的数据长度

                           response.headers().setInt(

                               HttpHeaderNames.CONTENT_LENGTH,content.length);

                           // 设置响应主体

                           response.content().writeBytes(content);

 

                           // 向客户端写入响应数据

                           ctx.writeAndFlush(response);

                       }

                   });

               }

           })

           .bind("127.0.0.1",8888)

           .sync();

   }

}

在该案例中,咱们就未曾手动对HTTP的数据包进行拆包处理了,而是在服务端的pipeline上添加了一个HttpServerCodec处理器,这个处理器是Netty官方提供的。

其类继承关系如下:

publicfinalclassHttpServerCodec

   extendsCombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>

   implementsSourceCodec {

   // ......

}

观察会发现,该类继承自CombinedChannelDuplexHandler这个组合类,它组合了编码器、解码器。

这也就意味着HttpServerCodec即可以对客户端的数据做解码,也可以对服务端响应的数据做编码。

同时除开添加了这个处理器外,在第二个处理器中打印了一下客户端的消息类型,最后一个处理器中,对客户端的请求做出了响应,其实也就是返回了一句话而已。

此时在浏览器输入http://127.0.0.1:8888/index.html,结果如下:

消息类型:classio.netty.handler.codec.http.DefaultHttpRequest

消息类型:classio.netty.handler.codec.http.LastHttpContent$1

客户端的请求路径:/index.html

此时来看结果,客户端的请求会被解析成两个部分:

  • 1)第一个是请求信息;
  • 2)第二个是主体信息。

但按理来说浏览器发出的请求,属于GET类型的请求,GET请求是没有请求体信息的,但Netty依旧会解析成两部分~,只不过GET请求的第二部分是空的。

在第三个处理器中,咱们直接向客户端返回了一个h1标签,同时也要记得在响应头里面,加上响应内容的长度信息,否则浏览器的加载圈,会一直不同的转动,毕竟浏览器也不知道内容有多长,就会一直反复加载,尝试等待更多的数据。

7、自定义消息传输协议

7.1概述

Netty除开提供了HTTP协议的处理器外,还提供了DNS、HaProxy、MemCache、MQTT、Protobuf、Redis、SCTP、RTSP.....一系列协议的实现,具体定义位于io.netty.handler.codec这个包下,当然,咱们也可以自己实现自定义协议,按照自己的逻辑对数据进行编解码处理。

很多基于Netty开发的中间件/组件,其内部基本上都开发了专属的通信协议,以此来作为不同节点间通信的基础,所以解下来咱们基于Netty也来自己设计一款通信协议,这也会作为后续实现聊天程序时的基础。

所谓的协议设计,其实仅仅只需要按照一定约束,实现编码器与解码器即可,发送方在发出数据之前,会经过编码器对数据进行处理,而接收方在收到数据之前,则会由解码器对数据进行处理。

7.2自定义协议的要素

在自定义传输协议时,咱们必然需要考虑几个因素,如下:

  • 1)魔数:用来第一时间判断是否为自己需要的数据包;
  • 2)版本号:提高协议的拓展性,方便后续对协议进行升级;
  • 3)序列化算法:消息正文具体该使用哪种方式进行序列化传输,例如Json、ProtoBuf、JDK...;
  • 4)消息类型:第一时间判断出当前消息的类型;
  • 5)消息序号:为了实现双工通信,客户端和服务端之间收/发消息不会相互阻塞;
  • 6)正文长度:提供给LTC解码器使用,防止解码时出现粘包、半包的现象;
  • 7)消息正文:本次消息要传输的具体数据。

在设计协议时,一个完整的协议应该涵盖上述所说的几方面,这样才能提供双方通信时的基础。

基于上述几个字段,能够在第一时间内判断出:

  • 1)消息是否可用;
  • 2)当前协议版本;
  • 3)消息的具体类型;
  • 4)消息的长度等各类信息。

从而给后续处理器使用(自定义的协议规则本身就是一个编解码处理器而已)。

7.3自定义协议实战

前面简单聊到过,所谓的自定义协议就是自己规定消息格式,以及自己实现编/解码器对消息实现封装/拆解,所以这里想要自定义一个消息协议,就只需要满足前面两个条件即可。

因此实现如下:

@ChannelHandler.Sharable

publicclassChatMessageCodec extendsMessageToMessageCodec<ByteBuf, Message> {

 

   // 消息出站时会经过的编码方法(将原生消息对象封装成自定义协议的消息格式)

   @Override

   protectedvoidencode(ChannelHandlerContext ctx, Message msg,

                         List<Object> list) throwsException {

       ByteBuf outMsg = ctx.alloc().buffer();

       // 前五个字节作为魔数

       byte[] magicNumber = newbyte[]{'Z','h','u','Z','i'};

       outMsg.writeBytes(magicNumber);

       // 一个字节作为版本号

       outMsg.writeByte(1);

       // 一个字节表示序列化方式  0:JDK、1:Json、2:ProtoBuf.....

       outMsg.writeByte(0);

       // 一个字节用于表示消息类型

       outMsg.writeByte(msg.getMessageType());

       // 四个字节表示消息序号

       outMsg.writeInt(msg.getSequenceId());

 

       // 使用Java-Serializable的方式对消息对象进行序列化

       ByteArrayOutputStream bos = newByteArrayOutputStream();

       ObjectOutputStream oos = newObjectOutputStream(bos);

       oos.writeObject(msg);

       byte[] msgBytes = bos.toByteArray();

 

       // 使用四个字节描述消息正文的长度

       outMsg.writeInt(msgBytes.length);

       // 将序列化后的消息对象作为消息正文

       outMsg.writeBytes(msgBytes);

 

       // 将封装好的数据传递给下一个处理器

       list.add(outMsg);

   }

 

   // 消息入站时会经过的解码方法(将自定义格式的消息转变为具体的消息对象)

   @Override

   protectedvoiddecode(ChannelHandlerContext ctx,

                         ByteBuf inMsg, List<Object> list) throwsException {

       // 读取前五个字节得到魔数

       byte[] magicNumber = newbyte[5];

       inMsg.readBytes(magicNumber,0,5);

       // 再读取一个字节得到版本号

       byteversion = inMsg.readByte();

       // 再读取一个字节得到序列化方式

       byteserializableType = inMsg.readByte();

       // 再读取一个字节得到消息类型

       bytemessageType = inMsg.readByte();

       // 再读取四个字节得到消息序号

       intsequenceId = inMsg.readInt();

       // 再读取四个字节得到消息正文长度

       intmessageLength = inMsg.readInt();

 

       // 再根据正文长度读取序列化后的字节正文数据

       byte[] msgBytes = newbyte[messageLength];

       inMsg.readBytes(msgBytes,0,messageLength);

 

       // 对于读取到的消息正文进行反序列化,最终得到具体的消息对象

       ByteArrayInputStream bis = newByteArrayInputStream(msgBytes);

       ObjectInputStream ois = newObjectInputStream(bis);

       Message message = (Message) ois.readObject();

 

       // 最终把反序列化得到的消息对象传递给后续的处理器

       list.add(message);

   }

}

上面自定义的处理器中,继承了MessageToMessageCodec类,主要负责将数据在原生ByteBuf与Message之间进行相互转换,而Message对象是自定义的消息对象,这里暂且无需过多关心。

其中主要实现了两个方法:

  • 1)encode():出站时会经过的编码方法,会将原生消息对象按自定义的协议封装成对应的字节数据;
  • 2)decode():入站时会经过的解码方法,会将协议格式的字节数据,转变为具体的消息对象。

上述自定义的协议,也就是一定规则的字节数据,每条消息数据的组成如下:

  • 1)魔数:使用第1~5个字节来描述,这个魔数值可以按自己的想法自定义;
  • 2)版本号:使用第6个字节来描述,不同数字表示不同版本;
  • 3)序列化算法:使用第7个字节来描述,不同数字表示不同序列化方式;
  • 4)消息类型:使用第8个字节来描述,不同的消息类型使用不同数字表示;
  • 5)消息序号:使用第9~12个字节来描述,其实就是一个四字节的整数;
  • 6)正文长度:使用第13~16个字节来描述,也是一个四字节的整数;
  • 7)消息正文:长度不固定,根据每次具体发送的数据来决定。

在其中,为了实现简单,这里的序列化方式,则采用的是JDK默认的Serializable接口方式,但这种方式生成的对象字节较大,实际情况中最好还是选择谷歌的ProtoBuf方式,这种算法属于序列化算法中,性能最佳的一种落地实现。

当然,这个自定义的协议是提供给后续的聊天业务使用的,但这种实战型的内容分享,基本上代码量较高,所以大家看起来会有些枯燥,而本文所使用的聊天室案例,是基于《B站-黑马Netty视频教程》二次改良的,因此如若感觉文字描述较为枯燥,可直接点击前面给出的链接,观看P101~P121视频进行学习。

最后来观察一下,大家会发现,在咱们定义的这个协议编解码处理器上,存在着一个@ChannelHandler.Sharable注解,这个注解的作用是干吗的呢?其实很简单,用来标识当前处理器是否可在多线程环境下使用,如果带有该注解的处理器,则表示可以在多个通道间共用,因此只需要创建一个即可,反之同理,如果不带有该注解的处理器,则每个通道需要单独创建使用。

PS:如果你想系统学习Protobuf,可以从以下文章入手:

如何选择即时通讯应用的数据传输格式

强列建议将Protobuf作为你的即时通讯应用数据传输格式

IM通讯协议专题学习(一):Protobuf从入门到精通,一篇就够!

IM通讯协议专题学习(二):快速理解Protobuf的背景、原理、使用、优缺点

IM通讯协议专题学习(三):由浅入深,从根上理解Protobuf的编解码原理

IM通讯协议专题学习(四):从Base64到Protobuf,详解Protobuf的数据编码原理

IM通讯协议专题学习(八):金蝶随手记团队的Protobuf应用实践(原理篇)

8、实战要点1:IM程序的用户模块

8.1概述

聊天、聊天,自然是得先有人,然后才能进行聊天沟通。与QQ、微信类似,如果你想要使用某款聊天程序时,前提都得是先具备一个对应的账户才行。

因此在咱们设计IM系统之处,那也需要对应的用户功能实现。但这里为了简单,同样不再结合数据库实现完整的用户模块了,而是基于内存实现用户的管理。

如下:

publicinterfaceUserService {

   booleanlogin(String username, String password);

}

这是用户模块的顶层接口,仅仅只提供了一个登录接口,关于注册、鉴权、等级.....等一系列功能,大家感兴趣的可在后续进行拓展实现,接着来看看该接口的实现类。

如下:

publicclassUserServiceMemoryImpl implementsUserService {

   privateMap<String, String> allUserMap = newConcurrentHashMap<>();

 

   {

       // 在代码块中对用户列表进行初始化,向其中添加了两个用户信息

       allUserMap.put("ZhuZi", "123");

       allUserMap.put("XiongMao", "123");

   }

 

   @Override

   publicbooleanlogin(String username, String password) {

       String pass = allUserMap.get(username);

       if(pass == null) {

           returnfalse;

       }

       returnpass.equals(password);

   }

}

这个实现类并未结合数据库来实现,而是仅仅在程序启动时,通过代码块的方式,加载了ZhuZi、XiongMao两个用户信息并放入内存的Map容器中,这里有兴趣的小伙伴,可自行将Map容器换成数据库的表即可。

其中实现的login()登录接口尤为简单,仅仅只是判断了一下有没有对应用户,如果有的话则看看密码是否正确,正确返回true,密码错误则返回false。是的,我所写的登录功能就是这么简单,走个简单的过场,哈哈哈~

8.2服务端、客户端的基础架构

基本的用户模块有了,但这里还未曾套入具体实现,因此先简单的搭建出服务端、客户端的架构,然后再基于构建好的架构实现基础的用户登录功能。

服务端的基础搭建如下:

publicclassChatServer {

   publicstaticvoidmain(String[] args) {

       NioEventLoopGroup boss = newNioEventLoopGroup();

       NioEventLoopGroup worker = newNioEventLoopGroup();

 

       ChatMessageCodec MESSAGE_CODEC = newChatMessageCodec();

 

       try{

           ServerBootstrap serverBootstrap = newServerBootstrap();

           serverBootstrap.channel(NioServerSocketChannel.class);

           serverBootstrap.group(boss, worker);

           serverBootstrap.childHandler(newChannelInitializer<SocketChannel>() {

               @Override

               protectedvoidinitChannel(SocketChannel ch) throwsException {

                   ch.pipeline().addLast(MESSAGE_CODEC);

               }

           });

 

           Channel channel = serverBootstrap.bind(8888).sync().channel();

           channel.closeFuture().sync();

       } catch(InterruptedException e) {

           System.out.println("服务端出现错误:"+ e);

       } finally{

           boss.shutdownGracefully();

           worker.shutdownGracefully();

       }

   }

}

服务端的代码目前很简单,仅仅只是装载了一个自己的协议编/解码处理器,然后就是一些老步骤,不再过多的重复赘述,接着再来搭建一个简单的客户端。

代码实现如下:

publicclassChatClient {

   publicstaticvoidmain(String[] args) {

       NioEventLoopGroup group = newNioEventLoopGroup();

 

       ChatMessageCodec MESSAGE_CODEC = newChatMessageCodec();

 

       try{

           Bootstrap bootstrap = newBootstrap();

           bootstrap.channel(NioSocketChannel.class);

           bootstrap.group(group);

           bootstrap.handler(newChannelInitializer<SocketChannel>() {

               @Override

               protectedvoidinitChannel(SocketChannel ch) throwsException {

                   ch.pipeline().addLast(MESSAGE_CODEC);

               }

           });

           Channel channel = bootstrap.connect("localhost", 8888).sync().channel();

           channel.closeFuture().sync();

       } catch(Exception e) {

           System.out.println("客户端出现错误:"+ e);

       } finally{

           group.shutdownGracefully();

       }

   }

}

目前仅仅只是与服务端建立了连接,然后装载了一个自定义的编解码器,到这里就搭建了最基本的服务端、客户端的基础架构,接着来基于它实现简单的登录功能。

8.3用户登录功能的实现

对于登录功能,由于需要在服务端与客户端之间传输数据,因此咱们可以设计一个消息对象,但由于后续单聊、群聊都需要发送不同的消息格式,因此先设计出一个父类。

如下:

publicabstractclassMessage implementsSerializable {

 

   privateintsequenceId;

   privateintmessageType;

 

 

   @Override

   publicString toString() {

       return"Message{"+

               "sequenceId="+ sequenceId +

               ", messageType="+ messageType +

               '}';

   }

 

   publicintgetSequenceId() {

       returnsequenceId;

   }

 

   publicvoidsetSequenceId(intsequenceId) {

       this.sequenceId = sequenceId;

   }

 

   publicvoidsetMessageType(intmessageType) {

       this.messageType = messageType;

   }

 

   publicabstractintgetMessageType();

 

   publicstaticfinalintLoginRequestMessage = 0;

   publicstaticfinalintLoginResponseMessage = 1;

   publicstaticfinalintChatRequestMessage = 2;

   publicstaticfinalintChatResponseMessage = 3;

   publicstaticfinalintGroupCreateRequestMessage = 4;

   publicstaticfinalintGroupCreateResponseMessage = 5;

   publicstaticfinalintGroupJoinRequestMessage = 6;

   publicstaticfinalintGroupJoinResponseMessage = 7;

   publicstaticfinalintGroupQuitRequestMessage = 8;

   publicstaticfinalintGroupQuitResponseMessage = 9;

   publicstaticfinalintGroupChatRequestMessage = 10;

   publicstaticfinalintGroupChatResponseMessage = 11;

   publicstaticfinalintGroupMembersRequestMessage = 12;

   publicstaticfinalintGroupMembersResponseMessage = 13;

   publicstaticfinalintPingMessage = 14;

   publicstaticfinalintPongMessage = 15;

}

在这个消息父类中,定义了多种消息类型的状态码,不同的消息类型对应不同数字,同时其中还设计了一个抽象方法,即getMessageType(),该方法交给具体的子类实现,每个子类返回各自的消息类型,为了方便后续拓展,这里又创建了一个抽象类作为中间类。

如下:

publicabstractclassAbstractResponseMessage extendsMessage {

   privatebooleansuccess;

   privateString reason;

 

   publicAbstractResponseMessage() {

   }

 

   publicAbstractResponseMessage(booleansuccess, String reason) {

       this.success = success;

       this.reason = reason;

   }

 

   @Override

   publicString toString() {

       return"AbstractResponseMessage{"+

               "success="+ success +

               ", reason='"+ reason + '\''+

               '}';

   }

 

   publicbooleanisSuccess() {

       returnsuccess;

   }

 

   publicvoidsetSuccess(booleansuccess) {

       this.success = success;

   }

 

   publicString getReason() {

       returnreason;

   }

 

   publicvoidsetReason(String reason) {

       this.reason = reason;

   }

}

这个类主要是提供给响应时使用的,其中包含了响应状态以及响应信息,接着再设计两个登录时会用到的消息对象。

如下:

publicclassLoginRequestMessage extendsMessage {

   privateString username;

   privateString password;

 

   publicLoginRequestMessage() {

   }

 

   @Override

   publicString toString() {

       return"LoginRequestMessage{"+

               "username='"+ username + '\''+

               ", password='"+ password + '\''+

               '}';

   }

 

   publicString getUsername() {

       returnusername;

   }

 

   publicvoidsetUsername(String username) {

       this.username = username;

   }

 

   publicString getPassword() {

       returnpassword;

   }

 

   publicvoidsetPassword(String password) {

       this.password = password;

   }

 

   publicLoginRequestMessage(String username, String password) {

       this.username = username;

       this.password = password;

   }

 

   @Override

   publicintgetMessageType() {

       returnLoginRequestMessage;

   }

}

上述这个消息类,主要是提供给客户端登录时使用,本质上也就是一个涵盖用户名、用户密码的对象而已,同时还有一个用来给服务端响应时的响应类。

如下:

publicclassLoginResponseMessage extendsAbstractResponseMessage {

   publicLoginResponseMessage(booleansuccess, String reason) {

       super(success, reason);

   }

 

   @Override

   publicintgetMessageType() {

       returnLoginResponseMessage;

   }

}

登录响应类的实现十分简单,由登录状态和登录消息组成,OK,接着来看看登录的具体实现。

首先在客户端中,再通过pipeline添加一个处理器,如下:

CountDownLatch WAIT_FOR_LOGIN = newCountDownLatch(1);

AtomicBoolean LOGIN = newAtomicBoolean(false);

AtomicBoolean EXIT = newAtomicBoolean(false);

Scanner scanner = newScanner(System.in);

 

ch.pipeline().addLast("client handler", newChannelInboundHandlerAdapter() {

   @Override

   publicvoidchannelActive(ChannelHandlerContext ctx) throwsException {

       // 负责接收用户在控制台的输入,负责向服务器发送各种消息

       newThread(() -> {

           System.out.println("请输入用户名:");

           String username = scanner.nextLine();

           if(EXIT.get()){

               return;

           }

           System.out.println("请输入密码:");

           String password = scanner.nextLine();

           if(EXIT.get()){

               return;

           }

           // 构造消息对象

           LoginRequestMessage message = newLoginRequestMessage(username, password);

           System.out.println(message);

           // 发送消息

           ctx.writeAndFlush(message);

           System.out.println("等待后续操作...");

           try{

               WAIT_FOR_LOGIN.await();

           } catch(InterruptedException e) {

               e.printStackTrace();

           }

           // 如果登录失败

           if(!LOGIN.get()) {

               ctx.channel().close();

               return;

           }

   }).start();

}

在与服务端建立连接成功之后,就提示用户需要登录,接着接收用户输入的用户名、密码,然后构建出一个LoginRequestMessage消息对象,接着将其发送给服务端,由于前面装载了自定义的协议编解码器,所以消息在出站时,这个Message对象会被序列化成字节码,接着再服务端入站时,又会被反序列化成消息对象,接着来看看服务端的实现。

如下:

@ChannelHandler.Sharable

publicclassLoginRequestMessageHandler

           extendsSimpleChannelInboundHandler<LoginRequestMessage> {

   @Override

   protectedvoidchannelRead0(ChannelHandlerContext ctx,

               LoginRequestMessage msg) throwsException {

       String username = msg.getUsername();

       String password = msg.getPassword();

       booleanlogin = UserServiceFactory.getUserService().login(username, password);

       LoginResponseMessage message;

       if(login) {

           SessionFactory.getSession().bind(ctx.channel(), username);

           message = newLoginResponseMessage(true, "登录成功");

       } else{

           message = newLoginResponseMessage(false, "用户名或密码不正确");

       }

       ctx.writeAndFlush(message);

   }

}

在服务端中,新增了一个处理器类,继承自SimpleChannelInboundHandler这个处理器,其中指定的泛型为LoginRequestMessage,这表示当前处理器只关注这个类型的消息,当出现登录类型的消息时,会进入该处理器并触发内部的channelRead0()方法。

在该方法中,获取了登录消息中的用户名、密码,接着对其做了基本的登录效验,如果用户名存在并且密码正确,就会返回登录成功,否则会返回登录失败,最终登录后的状态会被封装成一个LoginResponseMessage对象,然后写回客户端的通道中。

当然,为了该处理器能够成功生效,这里需要将其装载到服务端的pipeline上。

如下:

LoginRequestMessageHandler LOGIN_HANDLER = newLoginRequestMessageHandler();

ch.pipeline().addLast(LOGIN_HANDLER);

装载好登录处理器后,接着分别启动服务端、客户端,测试结果如下:

从图中的效果来看,这里实现了最基本的登录功能,估计有些小伙伴看到这里就有些晕了,但其实非常简单,仅仅只是通过Netty在做数据交互而已,客户端则提供输入用户名、密码的功能,然后将用户输入的名称、密码发送给服务端,服务端提供登录判断的功能,最终根据判断结果再向客户端返回数据罢了。

9、实战要点2:实现点对点单聊

9.1概述

有了基本的用户登录功能后,接着来看看如何实现点对点的单聊功能呢?

首先我定义了一个会话接口,如下:

publicinterfaceSession {

   voidbind(Channel channel, String username);

   voidunbind(Channel channel);

   Channel getChannel(String username);

}

这个接口中依旧只有三个方法,释义如下:

  • 1)bind():传入一个用户名和Socket通道,让两者之间的产生绑定关系;
  • 2)unbind():取消一个用户与某个Socket通道的绑定关系;
  • 3)getChannel():根据一个用户名,获取与其存在绑定关系的通道。

该接口的实现类如下:

publicclassSessionMemoryImpl implementsSession {

 

   privatefinalMap<String, Channel> usernameChannelMap = newConcurrentHashMap<>();

   privatefinalMap<Channel, String> channelUsernameMap = newConcurrentHashMap<>();

 

   @Override

   publicvoidbind(Channel channel, String username) {

       usernameChannelMap.put(username, channel);

       channelUsernameMap.put(channel, username);

       channelAttributesMap.put(channel, newConcurrentHashMap<>());

   }

 

   @Override

   publicvoidunbind(Channel channel) {

       String username = channelUsernameMap.remove(channel);

       usernameChannelMap.remove(username);

       channelAttributesMap.remove(channel);

   }

 

   @Override

   publicChannel getChannel(String username) {

       returnusernameChannelMap.get(username);

   }

 

   @Override

   publicString toString() {

       returnusernameChannelMap.toString();

   }

}

该实现类最关键的是其中的两个Map容器,usernameChannelMap用来存储所有用户名与Socket通道的绑定关系,而channelUsernameMap则是反过来的顺序,这主要是为了方便,即可以通过用户名获得对应通道,也可以通过通道判断出用户名,实际上一个Map也能搞定,但还是那句话,主要为了简单嘛~

有了上述这个最简单的会话管理功能后,就要着手实现具体的功能了,其实在前面实现登录功能的时候,就用过这其中的bind()方法,也就是当登录成功之后,就会将当前发送登录消息的通道,与正在登录的用户名产生绑定关系,这样就方便后续实现单聊、群聊的功能。

9.2定义单聊的消息对象

与登录时相同,由于需要在服务端和客户端之间实现数据的转发,因此这里也需要两个消息对象,用来作为数据交互的消息格式。

如下:

publicclassChatRequestMessage extendsMessage {

   privateString content;

   privateString to;

   privateString from;

 

   publicChatRequestMessage() {

   }

 

   publicChatRequestMessage(String from, String to, String content) {

       this.from = from;

       this.to = to;

       this.content = content;

   }

   // 省略Get/Setting、toString()方法.....

}

上述这个类,是提供给客户端用来发送消息数据的,其中主要包含了三个值,聊天的消息内容、发送人与接收人。因为这里是需要实现一个IM聊天程序,所以并不是客户端与服务端进行数据交互,而是客户端与客户端之间进行数据交互,服务端仅仅只提供消息转发的功能,接着再构建一个消息类。

如下:

publicclassChatResponseMessage extendsAbstractResponseMessage {

 

   privateString from;

   privateString content;

 

   @Override

   publicString toString() {

       return"ChatResponseMessage{"+

               "from='"+ from + '\''+

               ", content='"+ content + '\''+

               '}';

   }

 

   publicChatResponseMessage(booleansuccess, String reason) {

       super(success, reason);

   }

 

   publicChatResponseMessage(String from, String content) {

       this.from = from;

       this.content = content;

   }

 

   @Override

   publicintgetMessageType() {

       returnChatResponseMessage;

   }

   // 省略Get/Setting、toString()方法.....

}

这个类是提供给服务端用来转发的,当服务端收到一个聊天消息后,因为聊天消息中包含了接收人,所以可以先根据接收人的用户名,找到对应的客户端通道,然后再封装成一个响应消息,转发给对应的客户端即可,下面来做具体实现。

9.3实现点对点单聊功能

由于聊天功能是提供给客户端使用的,所以当一个客户端登录成功之后,应该暴露给用户一个操作菜单,所以直接在原本客户端的channelActive()方法中,登录成功之后继续加代码即可。

代码如下:

while(true) {

   System.out.println("==================================");

   System.out.println("\t1、发送单聊消息");

   System.out.println("\t2、发送群聊消息");

   System.out.println("\t3、创建一个群聊");

   System.out.println("\t4、获取群聊成员");

   System.out.println("\t5、加入一个群聊");

   System.out.println("\t6、退出一个群聊");

   System.out.println("\t7、退出聊天系统");

   System.out.println("==================================");

   String command = scanner.nextLine();

}

首先会开启一个死循环,然后不断接收用户的操作,接着使用switch语法来对具体的菜单功能进行实现,先实现单聊功能。

如下:

switch(command){

   case"1":

       System.out.print("请选择你要发送消息给谁:");

       String toUserName = scanner.nextLine();

       System.out.print("请输入你要发送的消息内容:");

       String content = scanner.nextLine();

       ctx.writeAndFlush(newChatRequestMessage(username, toUserName, content));

       break;

}

如果用户选择了单聊,接着会提示用户选择要发送消息给谁,这里也就是让用户输入对方的用户名,实际上如果有界面的话,这一步是并不需要用户自己输入的,而是提供窗口让用户点击,比如QQ、微信一样,想要给某个人发送消息时,只需要点击“他”的头像私聊即可。

等用户选择了聊天目标,并且输入了消息内容后,接着会构建一个ChatRequestMessage消息对象,然后会发送给服务端,但这里先不看服务端的实现,客户端这边还需要重写一个方法。

如下:

@Override

publicvoidchannelRead(ChannelHandlerContext ctx, Object msg) throwsException {

   System.out.println("收到消息:"+ msg);

   if((msg instanceofLoginResponseMessage)) {

       LoginResponseMessage response = (LoginResponseMessage) msg;

       if(response.isSuccess()) {

           // 如果登录成功

           LOGIN.set(true);

       }

       // 唤醒 system in 线程

       WAIT_FOR_LOGIN.countDown();

   }

}

前面的逻辑是在channelActive()方法中完成的,也就是连接建立成功后,就会让用户登录,接着登录成功之后会给用户一个菜单栏,提供给用户进行操作,但前面的逻辑中一直没有对服务端响应的消息进行处理,因此channelRead()方法中会对服务端响应的数据进行处理。

channelRead()方法会在有数据可读时被触发,所以当服务端响应数据时,首先会判断一下:目前服务端响应的是不是登录消息,如果是的话,则需要根据登录的结果来唤醒前面channelActive()方法中的线程。如果目前服务端响应的不是登录消息,这也就意味着客户端前面已经登录成功了,所以接着会直接打印一下收到的数据。

OK,有了上述客户端的代码实现后,接着再来服务端多创建一个处理器。

如下:

@ChannelHandler.Sharable

publicclassChatRequestMessageHandler

           extendsSimpleChannelInboundHandler<ChatRequestMessage> {

   @Override

   protectedvoidchannelRead0(ChannelHandlerContext ctx,

                   ChatRequestMessage msg) throwsException {

       String to = msg.getTo();

       Channel channel = SessionFactory.getSession().getChannel(to);

       // 在线

       if(channel != null) {

           channel.writeAndFlush(newChatResponseMessage(

                       msg.getFrom(), msg.getContent()));

       }

       // 不在线

       else{

           ctx.writeAndFlush(newChatResponseMessage(

                       false, "对方用户不存在或者不在线"));

       }

   }

}

这里依旧通过继承SimpleChannelInboundHandler类的形式,来特别关注ChatRequestMessage单聊类型的消息,如果目前服务端收到的是单聊消息,则会进入触发该处理器的channelRead0()方法。

该处理器内部的逻辑也并不复杂,首先根据单聊消息的接收人,去找一下与之对应的通道:

  • 1)如果根据用户名查到了通道,表示接收人目前是登录在线状态;
  • 2)反之,如果无法根据用户名找到通道,表示对应的用户不存在或者没有登录。

接着会根据上面的查询结果,进行对应的结果返回:

  • 1)如果在线:把要发送的单聊消息,直接写入至找到的通道中;
  • 2)如果不在线:向发送单聊消息的客户端,返回用户不存在或用户不在线。

有了这个处理器之后,接着还需要把该处理器装载到服务端上,如下:

ChatRequestMessageHandler CHAT_HANDLER = newChatRequestMessageHandler();

ch.pipeline().addLast(CHAT_HANDLER);

装载好单聊处理器后,接着分别启动一个服务端、两个客户端,测试结果如下:

从测试结果中可以明显看出效果,其中的单聊功能的确已经实现,可以实现A→B用户之间的单聊功能,两者之间借助服务器转发,可以实现两人私聊的功能。

10、实战要点3:打造多人聊天室

10.1概述

前面实现了两个用户之间的私聊功能,接着再来实现一个多人聊天室的功能,毕竟像QQ、微信、钉钉....等任何通讯软件,都支持多人建立群聊的功能。

但多人聊天室的功能,实现之前还需要先完成建群的功能,毕竟如果群都没建立,自然无法向某个群内发送数据。

实现拉群也好,群聊也罢,其实现步骤依旧和前面相同,如下:

  • 1)先定义对应的消息对象;
  • 2)实现客户端发送对应消息数据的功能;
  • 3)再写一个服务端的群聊处理器,然后装载到服务端上。

10.2定义拉群的消息体

首先来定义两个拉群时用的消息体,如下:

publicclassGroupCreateRequestMessage extendsMessage {

   privateString groupName;

   privateSet<String> members;

 

   publicGroupCreateRequestMessage(String groupName, Set<String> members) {

       this.groupName = groupName;

       this.members = members;

   }

 

   @Override

   publicintgetMessageType() {

       returnGroupCreateRequestMessage;

   }

 

   // 省略其他Get/Settings、toString()方法.....

}

上述这个消息体是提供给客户端使用的,其中主要存在两个成员,也就是群名称与群成员列表,存放所有群成员的容器选用了Set集合,因为Set集合具备不可重复性,因此可以有效的避免同一用户多次进群,接着再来看看服务端响应时用的消息体。

如下:

publicclassGroupCreateResponseMessage extendsAbstractResponseMessage {

   publicGroupCreateResponseMessage(booleansuccess, String reason) {

       super(success, reason);

   }

 

   @Override

   publicintgetMessageType() {

       returnGroupCreateResponseMessage;

   }

}

这个消息体的实现尤为简单,仅仅只是给客户端返回了拉群状态以及拉群的附加信息。

10.3定义群聊会话管理

前面单聊有单聊的会话管理机制,而实现多人群聊时,依旧需要有群聊的会话管理机制,首先封装了一个群聊实体类。

如下:

publicclassGroup {

   // 聊天室名称

   privateString name;

   // 聊天室成员

   privateSet<String> members;

 

   publicstaticfinalGroup EMPTY_GROUP = newGroup("empty", Collections.emptySet());

 

   publicGroup(String name, Set<String> members) {

       this.name = name;

       this.members = members;

   }

 

   // 省略其他Get/Settings、toString()方法.....

}

接着定义了一个群聊会话的顶级接口,如下:

publicinterfaceGroupSession {

   // 创建一个群聊

   Group createGroup(String name, Set<String> members);

   // 加入某个群聊

   Group joinMember(String name, String member);

   // 移除群聊中的某个成员

   Group removeMember(String name, String member);

   // 解散一个群聊

   Group removeGroup(String name);

   // 获取一个群聊的成员列表

   Set<String> getMembers(String name);

   // 获取一个群聊所有在线用户的Channel通道

   List<Channel> getMembersChannel(String name);

}

上述接口中,提供了几个接口方法,其实也主要是群聊系统中的一些日常操作,如创群、加群、踢人、解散群、查看群成员....等功能,接着来看看该接口的实现者。

如下:

publicclassGroupSessionMemoryImpl implementsGroupSession {

   privatefinalMap<String, Group> groupMap = newConcurrentHashMap<>();

 

   @Override

   publicGroup createGroup(String name, Set<String> members) {

       Group group = newGroup(name, members);

       returngroupMap.putIfAbsent(name, group);

   }

 

   @Override

   publicGroup joinMember(String name, String member) {

       returngroupMap.computeIfPresent(name, (key, value) -> {

           value.getMembers().add(member);

           returnvalue;

       });

   }

 

   @Override

   publicGroup removeMember(String name, String member) {

       returngroupMap.computeIfPresent(name, (key, value) -> {

           value.getMembers().remove(member);

           returnvalue;

       });

   }

 

   @Override

   publicGroup removeGroup(String name) {

       returngroupMap.remove(name);

   }

 

   @Override

   publicSet<String> getMembers(String name) {

       returngroupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();

   }

 

   @Override

   publicList<Channel> getMembersChannel(String name) {

       returngetMembers(name).stream()

               .map(member -> SessionFactory.getSession().getChannel(member))

               .filter(Objects::nonNull)

               .collect(Collectors.toList());

   }

}

这个实现类没啥好说的,重点记住里面有个Map容器即可,这个容器主要负责存储所有群名称与Group群聊对象的关系,后续可以通过群聊名称,在这个容器中找到一个对应群聊对象。同时为了方便后续调用这些接口,还提供了一个工具类。

如下:

publicabstractclassGroupSessionFactory {

   privatestaticGroupSession session = newGroupSessionMemoryImpl();

 

   publicstaticGroupSession getGroupSession() {

       returnsession;

   }

}

很简单,仅仅只实例化了一个群聊会话管理的实现类,因为这里没有结合Spring来实现,所以并不能依靠IOC技术来自动管理Bean,因此咱们需要手动创建出一个实例,以供于后续使用。

10.4实现拉群功能

前面客户端的功能菜单中,3对应着拉群功能,所以咱们需要对3做具体的功能实现。

逻辑如下:

case"3":

   System.out.print("请输入你要创建的群聊昵称:");

   String newGroupName = scanner.nextLine();

   System.out.print("请选择你要邀请的群成员(不同成员用、分割):");

   String members = scanner.nextLine();

   Set<String> memberSet = newHashSet<>(Arrays.asList(members.split("、")));

   memberSet.add(username); // 加入自己

   ctx.writeAndFlush(newGroupCreateRequestMessage(newGroupName, memberSet));

   break;

在该分支实现中,首先会要求用户输入一个群聊昵称,接着需要输入需要拉入群聊的用户名称,多个用户之间使用、分割,接着会把用户输入的群成员以及自己,全部放入到一个Set集合中,最终组装成一个拉群消息体,发送给服务端处理。

服务端的处理器如下:

@ChannelHandler.Sharable

publicclassGroupCreateRequestMessageHandler

       extendsSimpleChannelInboundHandler<GroupCreateRequestMessage> {

   @Override

   protectedvoidchannelRead0(ChannelHandlerContext ctx,

               GroupCreateRequestMessage msg) throwsException {

       String groupName = msg.getGroupName();

       Set<String> members = msg.getMembers();

       // 群管理器

       GroupSession groupSession = GroupSessionFactory.getGroupSession();

       Group group = groupSession.createGroup(groupName, members);

       if(group == null) {

           // 发生成功消息

           ctx.writeAndFlush(newGroupCreateResponseMessage(true,

                               groupName + "创建成功"));

           // 发送拉群消息

           List<Channel> channels = groupSession.getMembersChannel(groupName);

           for(Channel channel : channels) {

               channel.writeAndFlush(newGroupCreateResponseMessage(

                                   true, "您已被拉入"+ groupName));

           }

       } else{

           ctx.writeAndFlush(newGroupCreateResponseMessage(

                               false, groupName + "已经存在"));

       }

   }

}

这里依旧继承了SimpleChannelInboundHandler类,只关心拉群的消息,当客户端出现拉群消息时,首先会获取用户输入的群昵称和群成员,接着通过前面提供的创群接口,尝试创建一个群聊,如果群聊已经存在,则会创建失败,反之则会创建成功,在创建群聊成功的情况下,会给所有的群成员发送一条“你已被拉入[XXX]”的消息。

最后,同样需要将该处理器装载到服务端上,如下:

GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER =

                   newGroupCreateRequestMessageHandler();

ch.pipeline().addLast(GROUP_CREATE_HANDLER);

最后分别启动一个服务端、两个客户端进行效果测试,如下:

从上图的测试结果来看,的确实现了咱们的拉群效果,一个用户拉群之后,被邀请的成员都会收到来自于服务端的拉群提醒,这也就为后续群聊功能奠定了基础。

10.5定义群聊的消息体

这里就不重复赘述了,还是之前的套路,定义一个客户端用的消息体,如下:

publicclassGroupChatRequestMessage extendsMessage {

   privateString content;

   privateString groupName;

   privateString from;

 

   publicGroupChatRequestMessage(String from, String groupName, String content) {

       this.content = content;

       this.groupName = groupName;

       this.from = from;

   }

 

   @Override

   publicintgetMessageType() {

       returnGroupChatRequestMessage;

   }

   // 省略其他Get/Settings、toString()方法.....

}

这个是客户端用来发送群聊消息的消息体,其中存在三个成员,发送人、群聊昵称、消息内容,通过这三个成员,可以描述清楚任何一条群聊记录,接着来看看服务端响应时用的消息体。

如下:

publicclassGroupChatResponseMessage extendsAbstractResponseMessage {

   privateString from;

   privateString content;

 

   publicGroupChatResponseMessage(booleansuccess, String reason) {

       super(success, reason);

   }

 

   publicGroupChatResponseMessage(String from, String content) {

       this.from = from;

       this.content = content;

   }

   @Override

   publicintgetMessageType() {

       returnGroupChatResponseMessage;

   }

   // 省略其他Get/Settings、toString()方法.....

}

在这个消息体中,就省去了群聊昵称这个成员,因为这个消息体的用处,主要是给服务端转发给客户端时使用的,因此不需要群聊昵称,当然,要也可以,我这里就直接省去了。

10.6实现群聊功能

依旧先来做客户端的实现,实现了客户端之后再去完成服务端的实现,客户端实现如下:

case"2":

   System.out.print("请选择你要发送消息的群聊:");

   String groupName = scanner.nextLine();

   System.out.print("请输入你要发送的消息内容:");

   String groupContent = scanner.nextLine();

   ctx.writeAndFlush(newGroupChatRequestMessage(username, groupName, groupContent));

   break;

因为发送群聊消息对应着之前菜单中的2,所以这里对该分支进行实现,当用户选择发送群聊消息时,首先会让用户自己先选择一个群聊,接着输入要发送的消息内容,接着组装成一个群聊消息对象,发送给服务端处理。

服务端的实现如下:

@ChannelHandler.Sharable

publicclassGroupChatRequestMessageHandler

       extendsSimpleChannelInboundHandler<GroupChatRequestMessage> {

   @Override

   protectedvoidchannelRead0(ChannelHandlerContext ctx,

               GroupChatRequestMessage msg) throwsException {

       List<Channel> channels = GroupSessionFactory.getGroupSession()

               .getMembersChannel(msg.getGroupName());

 

       for(Channel channel : channels) {

           channel.writeAndFlush(newGroupChatResponseMessage(

                           msg.getFrom(), msg.getContent()));

       }

   }

}

这里依旧定义了一个处理器,关于原因就不再重复啰嗦了,服务端对于群聊消息的实现额外简单,也就是先根据用户选择的群昵称,找到该群所有的群成员,然后依次遍历成员列表,获取对应的Socket通道,转发消息即可。

接着将该处理器装载到服务端pipeline上,然后分别启动一个服务端、两个客户端,进行效果测试,如下:

效果如上图的注释,基于上述的代码测试,效果确实达到了咱们需要的群聊效果~

10.7聊天室的其他功能实现

到这里为止,实现了最基本的建群、群聊的功能,但对于踢人、加群、解散群....等一系列群聊功能还未曾实现,但我这里就不继续重复了。

毕竟还是那个套路:

  • 1)定义对应功能的消息体;
  • 2)客户端向服务端发送对应格式的消息;
  • 3)服务端编写处理器,对特定的消息进行处理。

所以大家感兴趣的情况下,可以根据上述步骤继续进行实现,实现的过程没有任何难度,重点就是时间问题罢了。

11、本文小结

看到这里,其实Netty实战篇的内容也就大致结束了,个人对于实战篇的内容并不怎么满意,因为与最初设想的实现存在很大偏差,这是由于近期工作、生活状态不对,所以内容输出也没那么夯实,对于这篇中的完整代码实现,也包括前面两篇中的一些代码实现(详见“2、配套源码”),大家感兴趣可以自行Down下去玩玩。

在我所撰写的案例中,自定义协议可以继续优化,选择性能更强的序列化方式,而聊天室也可以进一步拓展,比如将用户信息、群聊信息、联系人信息都结合数据库实现,进一步实现离线消息功能,但由于该案例的设计之初就有问题,所以是存在性能问题的,想要打造一款真正高性能的IM程序,那诸位可参考本系列前面的文章即可。

12、系列文章

跟着源码学IM(一):手把手教你用Netty实现心跳机制、断线重连机制

跟着源码学IM(二):自已开发IM很难?手把手教你撸一个Andriod版IM

跟着源码学IM(三):基于Netty,从零开发一个IM服务端

跟着源码学IM(四):拿起键盘就是干,教你徒手开发一套分布式IM系统

跟着源码学IM(五):正确理解IM长连接、心跳及重连机制,并动手实现

跟着源码学IM(六):手把手教你用Go快速搭建高性能、可扩展的IM系统

跟着源码学IM(七):手把手教你用WebSocket打造Web端IM聊天

跟着源码学IM(八):万字长文,手把手教你用Netty打造IM聊天

跟着源码学IM(九):基于Netty实现一套分布式IM系统

跟着源码学IM(十):基于Netty,搭建高性能IM集群(含技术思路+源码)

跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)

跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序》(* 本文)

SpringBoot集成开源IM框架MobileIMSDK,实现即时通讯IM聊天功能

13、参考资料

[1] 浅谈IM系统的架构设计

[2] 简述移动端IM开发的那些坑:架构设计、通信协议和客户端

[3] 一套海量在线用户的移动端IM架构设计实践分享(含详细图文)

[4] 一套原创分布式即时通讯(IM)系统理论架构方案

[5] 一套亿级用户的IM架构技术干货(上篇):整体架构、服务拆分等

[6] 一套亿级用户的IM架构技术干货(下篇):可靠性、有序性、弱网优化等

[7] 史上最通俗Netty框架入门长文:基本介绍、环境搭建、动手实战

[8] 强列建议将Protobuf作为你的即时通讯应用数据传输格式

[9] IM通讯协议专题学习(一):Protobuf从入门到精通,一篇就够!

[10] 融云技术分享:全面揭秘亿级IM消息的可靠投递机制

[11] IM群聊消息如此复杂,如何保证不丢不重?

[12] 零基础IM开发入门(四):什么是IM系统的消息时序一致性?

[13] 如何保证IM实时消息的“时序性”与“一致性”?

[14] 微信的海量IM聊天消息序列号生成实践(算法原理篇)

[15] 网易云信技术分享:IM中的万人群聊技术方案实践总结

[16] 融云IM技术分享:万人群聊消息投递方案的思考和实践

[17] 为何基于TCP协议的移动端IM仍然需要心跳保活机制?

[18] 一文读懂即时通讯应用中的网络心跳包机制:作用、原理、实现思路等

[19] 微信团队原创分享:Android版微信后台保活实战分享(网络保活篇)

[20] 融云技术分享:融云安卓端IM产品的网络链路保活技术实践

[21] 彻底搞懂TCP协议层的KeepAlive保活机制

[22] 深度解密钉钉即时消息服务DTIM的技术设计

(本文已同步发布于:http://www.52im.net/thread-4530-1-1.html

目录
相关文章
|
2月前
|
安全 前端开发 关系型数据库
IM即时通讯系统开发技术规则
IM即时通讯系统开发涵盖客户端与服务器端,涉及前端、后端、网络通信及多媒体处理等技术领域,支持文字、语音、图片、视频等多种实时交流方式。开发流程包括需求分析、技术选型、系统设计、开发实现、测试优化及部署维护等阶段,需关注网络通信、多媒体处理、安全性及可扩展性等关键技术点,广泛应用于社交、客服、团队协作及游戏等领域。
|
1月前
|
存储 网络协议 前端开发
基于开源IM即时通讯框架MobileIMSDK:RainbowChat v11.7版已发布
Android端主要更新内容: 1)[优化] 优化了首页“消息”列表中单聊类型未正确同步时的收发消息和点击后的处理逻辑; 2)[优化] 优化了首页“消息”列表中同一好友和陌生人会话不能自动合并的问题;
59 2
|
2月前
|
编解码 分布式计算 网络协议
Netty高性能网络框架(一)
Netty高性能网络框架(一)
|
12天前
|
消息中间件 编解码 网络协议
Netty从入门到精通:高性能网络编程的进阶之路
【11月更文挑战第17天】Netty是一个基于Java NIO(Non-blocking I/O)的高性能、异步事件驱动的网络应用框架。使用Netty,开发者可以快速、高效地开发可扩展的网络服务器和客户端程序。本文将带您从Netty的背景、业务场景、功能点、解决问题的关键、底层原理实现,到编写一个详细的Java示例,全面了解Netty,帮助您从入门到精通。
48 0
|
1月前
|
移动开发 网络协议 小程序
基于开源IM即时通讯框架MobileIMSDK:RainbowChat-iOS端v9.1版已发布
RainbowChat是一套基于开源IM聊天框架 MobileIMSDK 的产品级移动端IM系统。RainbowChat源于真实运营的产品,解决了大量的屏幕适配、细节优化、机器兼容问题
57 5
|
2月前
|
移动开发 前端开发 JavaScript
开源即时通讯IM框架MobileIMSDK的H5端技术概览
开源即时通讯IM框架MobileIMSDK的H5端技术概览
64 2
开源即时通讯IM框架MobileIMSDK的H5端技术概览
|
3月前
|
开发者
Netty运行原理问题之Netty高性能实现的问题如何解决
Netty运行原理问题之Netty高性能实现的问题如何解决
|
3月前
|
前端开发 网络协议
Netty实战巅峰:从零构建高性能IM即时通讯系统,解锁并发通信新境界
【8月更文挑战第3天】Netty是一款高性能、异步事件驱动的网络框架,适用于开发高并发网络应用,如即时通讯(IM)系统。本文将指导你利用Netty从零构建高性能IM程序,介绍Netty基础及服务器/客户端设计。服务器端使用`ServerBootstrap`启动,客户端通过`Bootstrap`连接服务器。示例展示了简单的服务器启动过程。通过深入学习,可进一步实现用户认证等功能,打造出更完善的IM系统。
169 1
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13503 1
|
6月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
132 1
下一篇
无影云桌面