引言
关于Netty
网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty
进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM
聊天程序。
原本打算做个多人斗地主,但斗地主需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的聊天程序,既简单,又能加深对
Netty
的理解。
一、基于Netty设计通信协议
协议,这玩意儿相信大家肯定不陌生了,毕竟在《网络编程》系列的前两章,都在围绕着网络协议展开叙述,再来简单回顾一下协议的概念:网络协议是指一种通信双方都必须遵守的约定,两个不同的端,按照一定的格式对数据进行“编码”,同时按照相同的规则进行“解码”,从而实现两者之间的数据传输与通信。
当自己想要打造一款IM
通信程序时,对于消息的封装、拆分也同样需要设计一个协议,通信的两端都必须遵守该协议工作,这也是实现通信程序的前提,但为什么需要通信协议呢?因为TCP/IP
中是基于流的方式传输消息,消息与消息之间没有边界,而协议的目的则在于约定消息的样式、边界等。
1.1、Redis通信的RESP协议
不知大家是否还记得之前在《Redis综述篇》中聊到的RESP
客户端协议,这是Redis
提供的一种客户端通信协议,如果想要操作Redis
,就必须遵守该协议的格式发送数据,但这个协议特别简单,如下:
- 首先要求所有命令,都以
*
开头,后面跟着具体的子命令数量,接着用换行符分割。 - 接着需要先用
$
符号声明每个子命令的长度,然后再用换行符分割。 - 最后再拼接上具体的子命令,同样用换行符分割。
这样描述有些令人难懂,那就直接看个案例,例如一条简单set
命令,如下:
客户端命令:
set name ZhuZi
转变为RESP指令:
*3
$3
set
$4
name
$5
ZhuZi
按照Redis
的规定,但凡满足RESP
协议的客户端,都可以直接连接并操作Redis
服务端,这也就意味着咱们可以直接通过Netty
来手写一个Redis
客户端,代码如下:
// 基于Netty、RESP协议实现的Redis客户端
public class RedisClient {
// 换行符的ASCII码
static final byte[] LINE = {
13, 10};
public static void main(String[] args) {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(worker);
client.channel(NioSocketChannel.class);
client.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ChannelInboundHandlerAdapter(){
// 通道建立成功后调用:向Redis发送一条set命令
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
String command = "set name ZhuZi";
ByteBuf buffer = respCommand(command);
ctx.channel().writeAndFlush(buffer);
}
// Redis响应数据时触发:打印Redis的响应结果
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) throws Exception {
// 接受Redis服务端执行指令后的结果
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(CharsetUtil.UTF_8));
}
});
}
});
// 根据IP、端口连接Redis服务端
client.connect("192.168.12.129", 6379).sync();
} catch (Exception e){
e.printStackTrace();
}
}
private static ByteBuf respCommand(String command){
// 先对传入的命令以空格进行分割
String[] commands = command.split(" ");
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// 遵循RESP协议:先写入指令的个数
buffer.writeBytes(("*" + commands.length).getBytes());
buffer.writeBytes(LINE);
// 接着分别写入每个指令的长度以及具体值
for (String s : commands) {
buffer.writeBytes(("$" + s.length()).getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes(s.getBytes());
buffer.writeBytes(LINE);
}
// 把转换成RESP格式的命令返回
return buffer;
}
}
在上述这个案例中,也仅仅只是通过respCommand()
这个方法,对用户输入的指令进行了转换,同时在上面通过Netty
,与Redis
的地址、端口建立了连接,在连接建立成功后,就会向Redis
发送一条转换成RESP
指令的set
命令,接着等待Redis
的响应结果并输出,如下:
+OK
因为这是一条写指令,所以当Redis
收到执行完成后,最终就会返回一个OK
,大家也可直接去Redis
中查询,也依旧能够查询到刚刚写入的name
这个键值。
1.2、HTTP超文本传输协议
前面咱们自己针对于Redis
的RESP
协议,对用户指令进行了封装,然后发往Redis
执行,但对于这些常用的协议,Netty
早已提供好了现成的处理器,想要使用时无需从头开发,可以直接使用现成的处理器来实现,比如现在咱们可以基于Netty
提供的处理器,实现一个简单的HTTP
服务器,代码如下:
// 基于Netty提供的处理器实现HTTP服务器
public class HttpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加一个Netty提供的HTTP处理器
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) throws Exception {
// 在这里输出一下消息的类型
System.out.println("消息类型:" + msg.getClass());
super.channelRead(ctx, msg);
}
});
pipeline.addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
HttpRequest msg) throws Exception {
System.out.println("客户端的请求路径:" + msg.uri());
// 创建一个响应对象,版本号与客户端保持一致,状态码为OK/200
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(
msg.protocolVersion(),
HttpResponseStatus.OK);
// 构造响应内容
byte[] content = "<h1>Hi, ZhuZi!</h1>".getBytes();
// 设置响应头:告诉客户端本次响应的数据长度
response.headers().setInt(
HttpHeaderNames.CONTENT_LENGTH,content.length);
// 设置响应主体
response.content().writeBytes(content);
// 向客户端写入响应数据
ctx.writeAndFlush(response);
}
});
}
})
.bind("127.0.0.1",8888)
.sync();
}
}
在该案例中,咱们就未曾手动对HTTP
的数据包进行拆包处理了,而是在服务端的pipeline
上添加了一个HttpServerCodec
处理器,这个处理器是Netty
官方提供的,其类继承关系如下:
public final class HttpServerCodec
extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements SourceCodec {
// ......
}
观察会发现,该类继承自CombinedChannelDuplexHandler
这个组合类,它组合了编码器、解码器,这也就意味着HttpServerCodec
即可以对客户端的数据做解码,也可以对服务端响应的数据做编码,同时除开添加了这个处理器外,在第二个处理器中打印了一下客户端的消息类型,最后一个处理器中,对客户端的请求做出了响应,其实也就是返回了一句话而已。
此时在浏览器输入http://127.0.0.1:8888/index.html
,结果如下:
消息类型:class io.netty.handler.codec.http.DefaultHttpRequest
消息类型:class io.netty.handler.codec.http.LastHttpContent$1
客户端的请求路径:/index.html
此时来看结果,客户端的请求会被解析成两个部分,第一个是请求信息,第二个是主体信息,但按理来说浏览器发出的请求,属于GET
类型的请求,GET
请求是没有请求体信息的,但Netty
依旧会解析成两部分~,只不过GET
请求的第二部分是空的。
在第三个处理器中,咱们直接向客户端返回了一个h1
标签,同时也要记得在响应头里面,加上响应内容的长度信息,否则浏览器的加载圈,会一直不同的转动,毕竟浏览器也不知道内容有多长,就会一直反复加载,尝试等待更多的数据。
1.3、自定义消息传输协议
Netty
除开提供了HTTP
协议的处理器外,还提供了DNS、HaProxy、MemCache、MQTT、Protobuf、Redis、SCTP、RTSP.....
一系列协议的实现,具体定义位于io.netty.handler.codec
这个包下,当然,咱们也可以自己实现自定义协议,按照自己的逻辑对数据进行编解码处理。
很多基于Netty
开发的中间件/组件,其内部基本上都开发了专属的通信协议,以此来作为不同节点间通信的基础,所以解下来咱们基于Netty
也来自己设计一款通信协议,这也会作为后续实现聊天程序时的基础。
但所谓的协议设计,其实仅仅只需要按照一定约束,实现编码器与解码器即可,发送方在发出数据之前,会经过编码器对数据进行处理,而接收方在收到数据之前,则会由解码器对数据进行处理。
1.3.1、自定义协议的要素
在自定义传输协议时,咱们必然需要考虑几个因素,如下:
- 魔数:用来第一时间判断是否为自己需要的数据包。
- 版本号:提高协议的拓展性,方便后续对协议进行升级。
- 序列化算法:消息正文具体该使用哪种方式进行序列化传输,例如
Json、ProtoBuf、JDK...
。 - 消息类型:第一时间判断出当前消息的类型。
- 消息序号:为了实现双工通信,客户端和服务端之间收/发消息不会相互阻塞。
- 正文长度:提供给
LTC
解码器使用,防止解码时出现粘包、半包的现象。 - 消息正文:本次消息要传输的具体数据。
在设计协议时,一个完整的协议应该涵盖上述所说的几方面,这样才能提供双方通信时的基础,基于上述几个字段,能够在第一时间内判断出:消息是否可用、当前协议版本、消息的具体类型、消息的长度等各类信息,从而给后续处理器使用(自定义的协议规则本身就是一个编解码处理器而已)。
1.3.2、自定义协议实战
前面简单聊到过,所谓的自定义协议就是自己规定消息格式,以及自己实现编/解码器对消息实现封装/拆解,所以这里想要自定义一个消息协议,就只需要满足前面两个条件即可,因此实现如下:
@ChannelHandler.Sharable
public class ChatMessageCodec extends MessageToMessageCodec<ByteBuf, Message> {
// 消息出站时会经过的编码方法(将原生消息对象封装成自定义协议的消息格式)
@Override
protected void encode(ChannelHandlerContext ctx, Message msg,
List<Object> list) throws Exception {
ByteBuf outMsg = ctx.alloc().buffer();
// 前五个字节作为魔数
byte[] magicNumber = new byte[]{
'Z','h','u','Z','i'};
outMsg.writeBytes(magicNumber);
// 一个字节作为版本号
outMsg.writeByte(1);
// 一个字节表示序列化方式 0:JDK、1:Json、2:ProtoBuf.....
outMsg.writeByte(0);
// 一个字节用于表示消息类型
outMsg.writeByte(msg.getMessageType());
// 四个字节表示消息序号
outMsg.writeInt(msg.getSequenceId());
// 使用Java-Serializable的方式对消息对象进行序列化
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] msgBytes = bos.toByteArray();
// 使用四个字节描述消息正文的长度
outMsg.writeInt(msgBytes.length);
// 将序列化后的消息对象作为消息正文
outMsg.writeBytes(msgBytes);
// 将封装好的数据传递给下一个处理器
list.add(outMsg);
}
// 消息入站时会经过的解码方法(将自定义格式的消息转变为具体的消息对象)
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf inMsg, List<Object> list) throws Exception {
// 读取前五个字节得到魔数
byte[] magicNumber = new byte[5];
inMsg.readBytes(magicNumber,0,5);
// 再读取一个字节得到版本号
byte version = inMsg.readByte();
// 再读取一个字节得到序列化方式
byte serializableType = inMsg.readByte();
// 再读取一个字节得到消息类型
byte messageType = inMsg.readByte();
// 再读取四个字节得到消息序号
int sequenceId = inMsg.readInt();
// 再读取四个字节得到消息正文长度
int messageLength = inMsg.readInt();
// 再根据正文长度读取序列化后的字节正文数据
byte[] msgBytes = new byte[messageLength];
inMsg.readBytes(msgBytes,0,messageLength);
// 对于读取到的消息正文进行反序列化,最终得到具体的消息对象
ByteArrayInputStream bis = new ByteArrayInputStream(msgBytes);
ObjectInputStream ois = new ObjectInputStream(bis);
Message message = (Message) ois.readObject();
// 最终把反序列化得到的消息对象传递给后续的处理器
list.add(message);
}
}
上面自定义的处理器中,继承了MessageToMessageCodec
类,主要负责将数据在原生ByteBuf
与Message
之间进行相互转换,而Message
对象是自定义的消息对象,这里暂且无需过多关心。其中主要实现了两个方法:
encode()
:出站时会经过的编码方法,会将原生消息对象按自定义的协议封装成对应的字节数据。decode()
:入站时会经过的解码方法,会将协议格式的字节数据,转变为具体的消息对象。
上述自定义的协议,也就是一定规则的字节数据,每条消息数据的组成如下:
- 魔数:使用第
1~5
个字节来描述,这个魔数值可以按自己的想法自定义。 - 版本号:使用第
6
个字节来描述,不同数字表示不同版本。 - 序列化算法:使用第
7
个字节来描述,不同数字表示不同序列化方式。 - 消息类型:使用第
8
个字节来描述,不同的消息类型使用不同数字表示。 - 消息序号:使用第
9~12
个字节来描述,其实就是一个四字节的整数。 - 正文长度:使用第
13~16
个字节来描述,也是一个四字节的整数。 - 消息正文:长度不固定,根据每次具体发送的数据来决定。
在其中,为了实现简单,这里的序列化方式,则采用的是JDK
默认的Serializable
接口方式,但这种方式生成的对象字节较大,实际情况中最好还是选择谷歌的ProtoBuf
方式,这种算法属于序列化算法中,性能最佳的一种落地实现。
当然,这个自定义的协议是提供给后续的聊天业务使用的,但这种实战型的内容分享,基本上代码量较高,所以大家看起来会有些枯燥,而本文所使用的聊天室案例,是基于《B站-黑马Netty视频教程》二次改良的,因此如若感觉文字描述较为枯燥,可直接点击前面给出的链接,观看
P101~P121
视频进行学习。
最后来观察一下,大家会发现,在咱们定义的这个协议编解码处理器上,存在着一个@ChannelHandler.Sharable
注解,这个注解的作用是干吗的呢?其实很简单,用来标识当前处理器是否可在多线程环境下使用,如果带有该注解的处理器,则表示可以在多个通道间共用,因此只需要创建一个即可,反之同理,如果不带有该注解的处理器,则每个通道需要单独创建使用。
二、基于Netty打造IM聊天程序
前面简单过了一下自定义协议后,接着来基于Netty
框架上手一个真正的实战项目,那也就是基于Netty
打造一款IM
即时通讯的聊天程序,这里在实现过程中,仅仅只会给出核心实现,但最后会提供完整代码的Github
链接,因此大家重点理解核心即可。
2.1、IM程序的用户模块
聊天、聊天,自然是得先有人,然后才能进行聊天沟通,与QQ、微信类似,如果你想要使用某款聊天程序时,前提都得是先具备一个对应的账户才行,因此在咱们设计IM
系统之处,那也需要对应的用户功能实现,但这里为了简单,同样不再结合数据库实现完整的用户模块了,而是基于内存实现用户的管理,如下:
public interface UserService {
boolean login(String username, String password);
}
这是用户模块的顶层接口,仅仅只提供了一个登录接口,关于注册、鉴权、等级.....等一系列功能,大家感兴趣的可在后续进行拓展实现,接着来看看该接口的实现类,如下:
public class UserServiceMemoryImpl implements UserService {
private Map<String, String> allUserMap = new ConcurrentHashMap<>();
{
// 在代码块中对用户列表进行初始化,向其中添加了两个用户信息
allUserMap.put("ZhuZi", "123");
allUserMap.put("XiongMao", "123");
}
@Override
public boolean login(String username, String password) {
String pass = allUserMap.get(username);
if (pass == null) {
return false;
}
return pass.equals(password);
}
}
这个实现类并未结合数据库来实现,而是仅仅在程序启动时,通过代码块的方式,加载了ZhuZi、XiongMao
两个用户信息并放入内存的Map
容器中,这里有兴趣的小伙伴,可自行将Map
容器换成数据库的表即可。
其中实现的login()
登录接口尤为简单,仅仅只是判断了一下有没有对应用户,如果有的话则看看密码是否正确,正确返回true
,密码错误则返回false
,是的,我所写的登录功能就是这么简单,走个简单的过场,哈哈哈~
2.1.1、服务端、客户端的基础架构
基本的用户模块有了,但这里还未曾套入具体实现,因此先简单的搭建出服务端、客户端的架构,然后再基于构建好的架构实现基础的用户登录功能,服务端的基础搭建如下:
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ChatMessageCodec MESSAGE_CODEC = new ChatMessageCodec();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(MESSAGE_CODEC);
}
});
Channel channel = serverBootstrap.bind(8888).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("服务端出现错误:" + e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
服务端的代码目前很简单,仅仅只是装载了一个自己的协议编/解码处理器,然后就是一些老步骤,不再过多的重复赘述,接着再来搭建一个简单的客户端,代码实现如下:
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
ChatMessageCodec MESSAGE_CODEC = new ChatMessageCodec();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(MESSAGE_CODEC);
}
});
Channel channel = bootstrap.connect("localhost", 8888).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
System.out.println("客户端出现错误:" + e);
} finally {
group.shutdownGracefully();
}
}
}
目前仅仅只是与服务端建立了连接,然后装载了一个自定义的编解码器,到这里就搭建了最基本的服务端、客户端的基础架构,接着来基于它实现简单的登录功能。
2.1.2、用户登录功能的实现
对于登录功能,由于需要在服务端与客户端之间传输数据,因此咱们可以设计一个消息对象,但由于后续单聊、群聊都需要发送不同的消息格式,因此先设计出一个父类,如下:
public abstract class Message implements Serializable {
private int sequenceId;
private int messageType;
@Override
public String toString() {
return "Message{" +
"sequenceId=" + sequenceId +
", messageType=" + messageType +
'}';
}
public int getSequenceId() {
return sequenceId;
}
public void setSequenceId(int sequenceId) {
this.sequenceId = sequenceId;
}
public void setMessageType(int messageType) {
this.messageType = messageType;
}
public abstract int getMessageType();
public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
public static final int PingMessage = 14;
public static final int PongMessage = 15;
}
在这个消息父类中,定义了多种消息类型的状态码,不同的消息类型对应不同数字,同时其中还设计了一个抽象方法,即getMessageType()
,该方法交给具体的子类实现,每个子类返回各自的消息类型,为了方便后续拓展,这里又创建了一个抽象类作为中间类,如下:
public abstract class AbstractResponseMessage extends Message {
private boolean success;
private String reason;
public AbstractResponseMessage() {
}
public AbstractResponseMessage(boolean success, String reason) {
this.success = success;
this.reason = reason;
}
@Override
public String toString() {
return "AbstractResponseMessage{" +
"success=" + success +
", reason='" + reason + '\'' +
'}';
}
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
public String getReason() {
return reason;
}
public void setReason(String reason) {
this.reason = reason;
}
}
这个类主要是提供给响应时使用的,其中包含了响应状态以及响应信息,接着再设计两个登录时会用到的消息对象,如下:
public class LoginRequestMessage extends Message {
private String username;
private String password;
public LoginRequestMessage() {
}
@Override
public String toString() {
return "LoginRequestMessage{" +
"username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public LoginRequestMessage(String username, String password) {
this.username = username;
this.password = password;
}
@Override
public int getMessageType() {
return LoginRequestMessage;
}
}
上述这个消息类,主要是提供给客户端登录时使用,本质上也就是一个涵盖用户名、用户密码的对象而已,同时还有一个用来给服务端响应时的响应类,如下:
public class LoginResponseMessage extends AbstractResponseMessage {
public LoginResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return LoginResponseMessage;
}
}
登录响应类的实现十分简单,由登录状态和登录消息组成,OK,接着来看看登录的具体实现。
首先在客户端中,再通过pipeline
添加一个处理器,如下:
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
AtomicBoolean LOGIN = new AtomicBoolean(false);
AtomicBoolean EXIT = new AtomicBoolean(false);
Scanner scanner = new Scanner(System.in);
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 负责接收用户在控制台的输入,负责向服务器发送各种消息
new Thread(() -> {
System.out.println("请输入用户名:");
String username = scanner.nextLine();
if(EXIT.get()){
return;
}
System.out.println("请输入密码:");
String password = scanner.nextLine();
if(EXIT.get()){
return;
}
// 构造消息对象
LoginRequestMessage message = new LoginRequestMessage(username, password);
System.out.println(message);
// 发送消息
ctx.writeAndFlush(message);
System.out.println("等待后续操作...");
try {
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 如果登录失败
if (!LOGIN.get()) {
ctx.channel().close();
return;
}
}).start();
}
在与服务端建立连接成功之后,就提示用户需要登录,接着接收用户输入的用户名、密码,然后构建出一个LoginRequestMessage
消息对象,接着将其发送给服务端,由于前面装载了自定义的协议编解码器,所以消息在出站时,这个Message
对象会被序列化成字节码,接着再服务端入站时,又会被反序列化成消息对象,接着来看看服务端的实现,如下:
@ChannelHandler.Sharable
public class LoginRequestMessageHandler
extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
boolean login = UserServiceFactory.getUserService().login(username, password);
LoginResponseMessage message;
if (login) {
SessionFactory.getSession().bind(ctx.channel(), username);
message = new LoginResponseMessage(true, "登录成功");
} else {
message = new LoginResponseMessage(false, "用户名或密码不正确");
}
ctx.writeAndFlush(message);
}
}
在服务端中,新增了一个处理器类,继承自SimpleChannelInboundHandler
这个处理器,其中指定的泛型为LoginRequestMessage
,这表示当前处理器只关注这个类型的消息,当出现登录类型的消息时,会进入该处理器并触发内部的channelRead0()
方法。
在该方法中,获取了登录消息中的用户名、密码,接着对其做了基本的登录效验,如果用户名存在并且密码正确,就会返回登录成功,否则会返回登录失败,最终登录后的状态会被封装成一个LoginResponseMessage
对象,然后写回客户端的通道中。
当然,为了该处理器能够成功生效,这里需要将其装载到服务端的pipeline
上,如下:
LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
ch.pipeline().addLast(LOGIN_HANDLER);
装载好登录处理器后,接着分别启动服务端、客户端,测试结果如下:
从图中的效果来看,这里实现了最基本的登录功能,估计有些小伙伴看到这里就有些晕了,但其实非常简单,仅仅只是通过Netty
在做数据交互而已,客户端则提供输入用户名、密码的功能,然后将用户输入的名称、密码发送给服务端,服务端提供登录判断的功能,最终根据判断结果再向客户端返回数据罢了。
2.2、基于Netty实现点对点单聊
有了基本的用户登录功能后,接着来看看如何实现点对点的单聊功能呢?首先我定义了一个会话接口,如下:
public interface Session {
void bind(Channel channel, String username);
void unbind(Channel channel);
Channel getChannel(String username);
}
这个接口中依旧只有三个方法,释义如下:
bind()
:传入一个用户名和Socket
通道,让两者之间的产生绑定关系。unbind()
:取消一个用户与某个Socket
通道的绑定关系。getChannel()
:根据一个用户名,获取与其存在绑定关系的通道。
该接口的实现类如下:
public class SessionMemoryImpl implements Session {
private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
@Override
public void bind(Channel channel, String username) {
usernameChannelMap.put(username, channel);
channelUsernameMap.put(channel, username);
channelAttributesMap.put(channel, new ConcurrentHashMap<>());
}
@Override
public void unbind(Channel channel) {
String username = channelUsernameMap.remove(channel);
usernameChannelMap.remove(username);
channelAttributesMap.remove(channel);
}
@Override
public Channel getChannel(String username) {
return usernameChannelMap.get(username);
}
@Override
public String toString() {
return usernameChannelMap.toString();
}
}
该实现类最关键的是其中的两个Map
容器,usernameChannelMap
用来存储所有用户名与Socket
通道的绑定关系,而channelUsernameMap
则是反过来的顺序,这主要是为了方便,即可以通过用户名获得对应通道,也可以通过通道判断出用户名,实际上一个Map
也能搞定,但还是那句话,主要为了简单嘛~
有了上述这个最简单的会话管理功能后,就要着手实现具体的功能了,其实在前面实现登录功能的时候,就用过这其中的bind()
方法,也就是当登录成功之后,就会将当前发送登录消息的通道,与正在登录的用户名产生绑定关系,这样就方便后续实现单聊、群聊的功能。
2.2.1、定义单聊的消息对象
与登录时相同,由于需要在服务端和客户端之间实现数据的转发,因此这里也需要两个消息对象,用来作为数据交互的消息格式,如下:
public class ChatRequestMessage extends Message {
private String content;
private String to;
private String from;
public ChatRequestMessage() {
}
public ChatRequestMessage(String from, String to, String content) {
this.from = from;
this.to = to;
this.content = content;
}
// 省略Get/Setting、toString()方法.....
}
上述这个类,是提供给客户端用来发送消息数据的,其中主要包含了三个值,聊天的消息内容、发送人与接收人,因为这里是需要实现一个IM
聊天程序,所以并不是客户端与服务端进行数据交互,而是客户端与客户端之间进行数据交互,服务端仅仅只提供消息转发的功能,接着再构建一个消息类,如下:
public class ChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
@Override
public String toString() {
return "ChatResponseMessage{" +
"from='" + from + '\'' +
", content='" + content + '\'' +
'}';
}
public ChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public ChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return ChatResponseMessage;
}
// 省略Get/Setting、toString()方法.....
}
这个类是提供给服务端用来转发的,当服务端收到一个聊天消息后,因为聊天消息中包含了接收人,所以可以先根据接收人的用户名,找到对应的客户端通道,然后再封装成一个响应消息,转发给对应的客户端即可,下面来做具体实现。
2.2.2、实现点对点单聊功能
由于聊天功能是提供给客户端使用的,所以当一个客户端登录成功之后,应该暴露给用户一个操作菜单,所以直接在原本客户端的channelActive()
方法中,登录成功之后继续加代码即可,代码如下:
while (true) {
System.out.println("==================================");
System.out.println("\t1、发送单聊消息");
System.out.println("\t2、发送群聊消息");
System.out.println("\t3、创建一个群聊");
System.out.println("\t4、获取群聊成员");
System.out.println("\t5、加入一个群聊");
System.out.println("\t6、退出一个群聊");
System.out.println("\t7、退出聊天系统");
System.out.println("==================================");
String command = scanner.nextLine();
}
首先会开启一个死循环,然后不断接收用户的操作,接着使用switch
语法来对具体的菜单功能进行实现,先实现单聊功能,如下:
switch (command){
case "1":
System.out.print("请选择你要发送消息给谁:");
String toUserName = scanner.nextLine();
System.out.print("请输入你要发送的消息内容:");
String content = scanner.nextLine();
ctx.writeAndFlush(new ChatRequestMessage(username, toUserName, content));
break;
}
如果用户选择了单聊,接着会提示用户选择要发送消息给谁,这里也就是让用户输入对方的用户名,实际上如果有界面的话,这一步是并不需要用户自己输入的,而是提供窗口让用户点击,比如QQ、微信一样,想要给某个人发送消息时,只需要点击“他”的头像私聊即可。
等用户选择了聊天目标,并且输入了消息内容后,接着会构建一个ChatRequestMessage
消息对象,然后会发送给服务端,但这里先不看服务端的实现,客户端这边还需要重写一个方法,如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到消息:" + msg);
if ((msg instanceof LoginResponseMessage)) {
LoginResponseMessage response = (LoginResponseMessage) msg;
if (response.isSuccess()) {
// 如果登录成功
LOGIN.set(true);
}
// 唤醒 system in 线程
WAIT_FOR_LOGIN.countDown();
}
}
前面的逻辑是在channelActive()
方法中完成的,也就是连接建立成功后,就会让用户登录,接着登录成功之后会给用户一个菜单栏,提供给用户进行操作,但前面的逻辑中一直没有对服务端响应的消息进行处理,因此channelRead()
方法中会对服务端响应的数据进行处理。
channelRead()
方法会在有数据可读时被触发,所以当服务端响应数据时,首先会判断一下:目前服务端响应的是不是登录消息,如果是的话,则需要根据登录的结果来唤醒前面channelActive()
方法中的线程。如果目前服务端响应的不是登录消息,这也就意味着客户端前面已经登录成功了,所以接着会直接打印一下收到的数据。
OK,有了上述客户端的代码实现后,接着再来服务端多创建一个处理器,如下:
@ChannelHandler.Sharable
public class ChatRequestMessageHandler
extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
ChatRequestMessage msg) throws Exception {
String to = msg.getTo();
Channel channel = SessionFactory.getSession().getChannel(to);
// 在线
if (channel != null) {
channel.writeAndFlush(new ChatResponseMessage(
msg.getFrom(), msg.getContent()));
}
// 不在线
else {
ctx.writeAndFlush(new ChatResponseMessage(
false, "对方用户不存在或者不在线"));
}
}
}
这里依旧通过继承SimpleChannelInboundHandler
类的形式,来特别关注ChatRequestMessage
单聊类型的消息,如果目前服务端收到的是单聊消息,则会进入触发该处理器的channelRead0()
方法,该处理器内部的逻辑也并不复杂,首先根据单聊消息的接收人,去找一下与之对应的通道:
- 如果根据用户名查到了通道,表示接收人目前是登录在线状态。
- 反之,如果无法根据用户名找到通道,表示对应的用户不存在或者没有登录。
接着会根据上面的查询结果,进行对应的结果返回:
- 如果在线:把要发送的单聊消息,直接写入至找到的通道中。
- 如果不在线:向发送单聊消息的客户端,返回用户不存在或用户不在线。
有了这个处理器之后,接着还需要把该处理器装载到服务端上,如下:
ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();
ch.pipeline().addLast(CHAT_HANDLER);
装载好单聊处理器后,接着分别启动一个服务端、两个客户端,测试结果如下:
从测试结果中可以明显看出效果,其中的单聊功能的确已经实现,可以实现A→B
用户之间的单聊功能,两者之间借助服务器转发,可以实现两人私聊的功能。
2.3、基于Netty打造多人聊天室
前面实现了两个用户之间的私聊功能,接着再来实现一个多人聊天室的功能,毕竟像QQ、微信、钉钉....等任何通讯软件,都支持多人建立群聊的功能,但多人聊天室的功能,实现之前还需要先完成建群的功能,毕竟如果群都没建立,自然无法向某个群内发送数据。
实现拉群也好,群聊也罢,其实现步骤依旧和前面相同,如下:
- ①先定义对应的消息对象。
- ②实现客户端发送对应消息数据的功能。
- ③再写一个服务端的群聊处理器,然后装载到服务端上。
2.3.1、定义拉群的消息体
首先来定义两个拉群时用的消息体,如下:
public class GroupCreateRequestMessage extends Message {
private String groupName;
private Set<String> members;
public GroupCreateRequestMessage(String groupName, Set<String> members) {
this.groupName = groupName;
this.members = members;
}
@Override
public int getMessageType() {
return GroupCreateRequestMessage;
}
// 省略其他Get/Settings、toString()方法.....
}
上述这个消息体是提供给客户端使用的,其中主要存在两个成员,也就是群名称与群成员列表,存放所有群成员的容器选用了Set
集合,因为Set
集合具备不可重复性,因此可以有效的避免同一用户多次进群,接着再来看看服务端响应时用的消息体,如下:
public class GroupCreateResponseMessage extends AbstractResponseMessage {
public GroupCreateResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return GroupCreateResponseMessage;
}
}
这个消息体的实现尤为简单,仅仅只是给客户端返回了拉群状态以及拉群的附加信息。
2.3.2、定义群聊会话管理
前面单聊有单聊的会话管理机制,而实现多人群聊时,依旧需要有群聊的会话管理机制,首先封装了一个群聊实体类,如下:
public class Group {
// 聊天室名称
private String name;
// 聊天室成员
private Set<String> members;
public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());
public Group(String name, Set<String> members) {
this.name = name;
this.members = members;
}
// 省略其他Get/Settings、toString()方法.....
}
接着定义了一个群聊会话的顶级接口,如下:
public interface GroupSession {
// 创建一个群聊
Group createGroup(String name, Set<String> members);
// 加入某个群聊
Group joinMember(String name, String member);
// 移除群聊中的某个成员
Group removeMember(String name, String member);
// 解散一个群聊
Group removeGroup(String name);
// 获取一个群聊的成员列表
Set<String> getMembers(String name);
// 获取一个群聊所有在线用户的Channel通道
List<Channel> getMembersChannel(String name);
}
上述接口中,提供了几个接口方法,其实也主要是群聊系统中的一些日常操作,如创群、加群、踢人、解散群、查看群成员....等功能,接着来看看该接口的实现者,如下:
public class GroupSessionMemoryImpl implements GroupSession {
private final Map<String, Group> groupMap = new ConcurrentHashMap<>();
@Override
public Group createGroup(String name, Set<String> members) {
Group group = new Group(name, members);
return groupMap.putIfAbsent(name, group);
}
@Override
public Group joinMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().add(member);
return value;
});
}
@Override
public Group removeMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().remove(member);
return value;
});
}
@Override
public Group removeGroup(String name) {
return groupMap.remove(name);
}
@Override
public Set<String> getMembers(String name) {
return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
}
@Override
public List<Channel> getMembersChannel(String name) {
return getMembers(name).stream()
.map(member -> SessionFactory.getSession().getChannel(member))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
这个实现类没啥好说的,重点记住里面有个Map
容器即可,这个容器主要负责存储所有群名称与Group
群聊对象的关系,后续可以通过群聊名称,在这个容器中找到一个对应群聊对象。同时为了方便后续调用这些接口,还提供了一个工具类,如下:
public abstract class GroupSessionFactory {
private static GroupSession session = new GroupSessionMemoryImpl();
public static GroupSession getGroupSession() {
return session;
}
}
很简单,仅仅只实例化了一个群聊会话管理的实现类,因为这里没有结合Spring
来实现,所以并不能依靠IOC
技术来自动管理Bean
,因此咱们需要手动创建出一个实例,以供于后续使用。
2.3.3、实现拉群功能
前面客户端的功能菜单中,3
对应着拉群功能,所以咱们需要对3
做具体的功能实现,逻辑如下:
case "3":
System.out.print("请输入你要创建的群聊昵称:");
String newGroupName = scanner.nextLine();
System.out.print("请选择你要邀请的群成员(不同成员用、分割):");
String members = scanner.nextLine();
Set<String> memberSet = new HashSet<>(Arrays.asList(members.split("、")));
memberSet.add(username); // 加入自己
ctx.writeAndFlush(new GroupCreateRequestMessage(newGroupName, memberSet));
break;
在该分支实现中,首先会要求用户输入一个群聊昵称,接着需要输入需要拉入群聊的用户名称,多个用户之间使用、
分割,接着会把用户输入的群成员以及自己,全部放入到一个Set
集合中,最终组装成一个拉群消息体,发送给服务端处理,服务端的处理器如下:
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler
extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
GroupCreateRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
Set<String> members = msg.getMembers();
// 群管理器
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.createGroup(groupName, members);
if (group == null) {
// 发生成功消息
ctx.writeAndFlush(new GroupCreateResponseMessage(true,
groupName + "创建成功"));
// 发送拉群消息
List<Channel> channels = groupSession.getMembersChannel(groupName);
for (Channel channel : channels) {
channel.writeAndFlush(new GroupCreateResponseMessage(
true, "您已被拉入" + groupName));
}
} else {
ctx.writeAndFlush(new GroupCreateResponseMessage(
false, groupName + "已经存在"));
}
}
}
这里依旧继承了SimpleChannelInboundHandler
类,只关心拉群的消息,当客户端出现拉群消息时,首先会获取用户输入的群昵称和群成员,接着通过前面提供的创群接口,尝试创建一个群聊,如果群聊已经存在,则会创建失败,反之则会创建成功,在创建群聊成功的情况下,会给所有的群成员发送一条“你已被拉入[XXX]”的消息。
最后,同样需要将该处理器装载到服务端上,如下:
GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER =
new GroupCreateRequestMessageHandler();
ch.pipeline().addLast(GROUP_CREATE_HANDLER);
最后分别启动一个服务端、两个客户端进行效果测试,如下:
从上图的测试结果来看,的确实现了咱们的拉群效果,一个用户拉群之后,被邀请的成员都会收到来自于服务端的拉群提醒,这也就为后续群聊功能奠定了基础。
2.3.4、定义群聊的消息体
这里就不重复赘述了,还是之前的套路,定义一个客户端用的消息体,如下:
public class GroupChatRequestMessage extends Message {
private String content;
private String groupName;
private String from;
public GroupChatRequestMessage(String from, String groupName, String content) {
this.content = content;
this.groupName = groupName;
this.from = from;
}
@Override
public int getMessageType() {
return GroupChatRequestMessage;
}
// 省略其他Get/Settings、toString()方法.....
}
这个是客户端用来发送群聊消息的消息体,其中存在三个成员,发送人、群聊昵称、消息内容,通过这三个成员,可以描述清楚任何一条群聊记录,接着来看看服务端响应时用的消息体,如下:
public class GroupChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
public GroupChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public GroupChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return GroupChatResponseMessage;
}
// 省略其他Get/Settings、toString()方法.....
}
在这个消息体中,就省去了群聊昵称这个成员,因为这个消息体的用处,主要是给服务端转发给客户端时使用的,因此不需要群聊昵称,当然,要也可以,我这里就直接省去了。
2.3.5、实现群聊功能
依旧先来做客户端的实现,实现了客户端之后再去完成服务端的实现,客户端实现如下:
case "2":
System.out.print("请选择你要发送消息的群聊:");
String groupName = scanner.nextLine();
System.out.print("请输入你要发送的消息内容:");
String groupContent = scanner.nextLine();
ctx.writeAndFlush(new GroupChatRequestMessage(username, groupName, groupContent));
break;
因为发送群聊消息对应着之前菜单中的2
,所以这里对该分支进行实现,当用户选择发送群聊消息时,首先会让用户自己先选择一个群聊,接着输入要发送的消息内容,接着组装成一个群聊消息对象,发送给服务端处理,服务端的实现如下:
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler
extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
GroupChatRequestMessage msg) throws Exception {
List<Channel> channels = GroupSessionFactory.getGroupSession()
.getMembersChannel(msg.getGroupName());
for (Channel channel : channels) {
channel.writeAndFlush(new GroupChatResponseMessage(
msg.getFrom(), msg.getContent()));
}
}
}
这里依旧定义了一个处理器,关于原因就不再重复啰嗦了,服务端对于群聊消息的实现额外简单,也就是先根据用户选择的群昵称,找到该群所有的群成员,然后依次遍历成员列表,获取对应的Socket
通道,转发消息即可。
接着将该处理器装载到服务端pipeline
上,然后分别启动一个服务端、两个客户端,进行效果测试,如下:
效果如上图的注释,基于上述的代码测试,效果确实达到了咱们需要的群聊效果~
2.3.6、聊天室的其他功能实现
到这里为止,实现了最基本的建群、群聊的功能,但对于踢人、加群、解散群....等一系列群聊功能还未曾实现,但我这里就不继续重复了,毕竟还是那个套路:
- ①定义对应功能的消息体。
- ②客户端向服务端发送对应格式的消息。
- ③服务端编写处理器,对特定的消息进行处理。
所以大家感兴趣的情况下,可以根据上述步骤继续进行实现,实现的过程没有任何难度,重点就是时间问题罢了。
三、Netty实战篇总结
看到这里,其实Netty
实战篇的内容也就大致结束了,个人对于实战篇的内容并不怎么满意,因为与最初设想的实现存在很大偏差,这是由于近期工作、生活状态不对,所以内容输出也没那么夯实,对于这篇中的完整代码实现,也包括前面两篇中的一些代码实现,这里给出完整的GitHub
链接:>>>戳我访问<<<,大家感兴趣可以自行Down
下去玩玩。
在我所撰写的案例中,自定义协议可以继续优化,选择性能更强的序列化方式,而聊天室也可以进一步拓展,比如将用户信息、群聊信息、联系人信息都结合数据库实现,进一步实现离线消息功能,但由于该案例的设计之初就有问题,所以是存在性能问题的,想要打造一款真正高性能的IM
程序,那诸位可参考《计算机网络综述-腾讯QQ原理》其中的内容。