SpringBoot+Netty开发IM即时通讯系列(一)
前言
最近项目的需求有IM通讯这个模块,经过与老大商量决定使用SpringBoot+Netty的方式构建。于是,在这个系列中记录下过程中的学习历程以及撸码上线,以供日后参考。
如果文中有不当或错误请指出,虚心接受批评。
Netty
先来看下官方定义:
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
'Quick and easy' doesn't mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.
什么意思?
简单来讲,Netty是一个提供了易于使用的API的客户端/服务端框架。Netty并发非常高,一个非阻塞的IO,Netty传输速度也非常快,因为他是0拷贝,什么是零拷贝?NIO中的特性之一就是零拷贝,在Java中,内存分为堆和栈以及字符串常量值等等,如果有一些数据从IO中读取并且放到堆里面,中间会经过一些缓冲区。
具体来讲,如果要从IO中读取数据,分为两个步骤:
(1)从IO流中读取出来放到缓冲区,程序从缓冲区中读取,再放到堆中,此时数据就会被拷贝两次才能到达堆或者堆内存中。如果数据量很大,那么就会造成资源的浪费
(2)Netty其实就是利用NIO中的零拷贝特性,当Netty需要接收数据或者传输数据的时候,就会新开辟一块堆内存,然后数据就直接通过IO读取到了新开辟的堆内存中,这样也就加快了数据传输的速度。
阻塞与非阻塞
线程在访问某一个资源的时候,该资源是否准备就绪的一种处理方式,如果说该资源当前没准备就绪,这个时候就会有两种处理方式:阻塞与非阻塞
(1)阻塞:这个线程会一直持续等待这个资源就绪并处理完毕,直到响应返回一个结果,这个时候线程是一直阻塞状态,不可以去做任何事情
(2)非阻塞:这个线程直接返回结果,不会持续等待这个资源处理完毕才响应,它会去请求别的资源。
图摘自:https://blog.csdn.net/zk3326312/article/details/79400805
同步与异步
这里的 “同步与异步” 指的是访问数据的一种机制,类似于Ajax。
(1)同步:主动请求,并且会等待IO操作完成之后,IO会有一个通知
(2)异步:当一个线程主动请求数据之后,可以继续处理其他任务,发起其他请求,多个请求完成之后再逐一的通过异步形式通知
图摘自:https://blog.csdn.net/Iamthedoctor123/article/details/79336789?utm_source=blogxgwz2
BIO
(1)同步式阻塞IO:Block IO,IO在进行读取的时候,这个线程是会被阻塞的,无法去做其他操作,传统简单,通信方式也是如此。并发处理能力非常低,线程之间访问资源通信的时候,耗时也非常久,所以会比较依赖于网速与带宽,JDK1.4之前均是如此。
(2)服务器会有一个专门的线程称之为Acceptor,专门用于负责监听来自客户端之间的请求,只要客户端与服务端有建立请求,此时客户端与服务端之间都会创建一个新的线程进行处理,这是一种典型的 一应一答的模式。随着客户端逐渐增多,两者之间会频繁的创建和频繁销毁线程,此时服务器端会有很大的压力,甚至宕机。
(3)改进之后则通过线程池来处理,这种方式也可以称之为伪异步IO。
NIO
(1)同步非阻塞IO:New IO(Non-Block IO):JDK1.4之后。selector(其实就是一个线程,并且会主动轮询),也称之为多路复用器Buffer,它是一个缓冲区;Chanel,是一个双向通道。
(2)客户端在与服务端进行通信的时候就会使用到selector,两者要建立链接的时候,客户端到selector进行一个注册,注册完毕之后就会有一个chanel01,每一个客户端和selector建立连接之后都会有一个chanel,chanel是一个双向通道,可以进行一些相应数据的读写,这些数据的读写都会到Buffer缓冲区中
(3)通过selector注册并建立chanel通道就可以实现两者之间的通信,chanel的数据是一种非阻塞的读取,如果没有数据会直接跳过,不会同步等待数据。selector其实一个单线程,整体来讲,线程开销会非常小,光是一个selector就可以处理成千上万个客户端,客户端的增多不会影响它的性能,这也是与BIO的区别所在。
(4)chanel相当于是一个读取的工具,每一个客户端都可以理解为一个单独的chanel,每一个服务端会有一个selector,Buffer的数据会进行读取,数据被读完之后还是会存在Buffer中,不会因为数据被读取之后会被消息,String中的数据读完之后就没有了。
AIO
(1)异步非阻塞IO:AIO其实也是NIO2.0,它是一种异步非阻塞的通信方式,在NIO没有的基础上引入了一个异步概念:在读写的时候所有返回的类型其实就是一个feature对象,这个对象模型其实就是异步的在这个过程中会有一些事件监听
(2)异步阻塞IO:几乎用不到
图摘自:imooc
Netty的三种线程模型
(1)Reactor线程模型:
1)单线程模型:所有的IO操作都由同一个NIO线程处理,仅限于一些小型应用场景。但在高负载、高并发等情况下使用单线程肯定就不太合理,主要是因为NIO的一个线程同时要去处理成千上万的请求 的时候,在性能上会支撑不了,即便CPU负载100%,对于海量消息的处理,编码解码以及读取、发送消息等情况,依然满足不了。
2)当NIO的线程负载过重之后,整体服务性能处理就会变慢,结果就是导致客户端在向服务端发起请求、链接就会超时,由于客户端一般都会有一种超时机制,反复地向服务端再次发起请求,此时就相当于陷入了死循环,更加加重了服务器负载。
(2)多线程模型:由一组NIO线程处理IO操作
(3)主从线程模型:一组线程池接受请求,一组线程池处理IO
HelloWorldNetty
一个最简单的Netty服务端包含了五个步骤:
(1)构建一对主从线程组
(2)定义服务器启动类
(3)为服务器设置Channel
(4)设置处理从线程池的助手类初始化器
(5)监听启动和关闭的服务器
1、引入相关依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.pubing</groupId> <artifactId>helloNetty</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency> </dependencies> </project>
建立HelloNettyServer
package com.phubing.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; //实现客户端发送请求,服务器端会返回Hello Netty public class HelloNettyServer { public static void main(String[] args) throws InterruptedException { /** * 定义一对线程组(两个线程池) * */ //主线程组,用于接收客户端的链接,但不做任何处理 EventLoopGroup bossGroup = new NioEventLoopGroup(); //定义从线程组,主线程组会把任务转给从线程组进行处理 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { /** * 服务启动类,任务分配自动处理 * */ ServerBootstrap serverBootstrap = new ServerBootstrap(); //需要去针对一个之前的线程模型(上面定义的是主从线程) serverBootstrap.group(bossGroup, workerGroup) //设置NIO的双向通道 .channel(NioServerSocketChannel.class) //子处理器,用于处理workerGroup /** * 设置chanel初始化器 * 每一个chanel由多个handler共同组成管道(pipeline) */ .childHandler(new HelloNettyServerInitializer()); /** * 启动 * */ //绑定端口,并设置为同步方式,是一个异步的chanel ChannelFuture future = serverBootstrap.bind(8888).sync(); /** * 关闭 */ //获取某个客户端所对应的chanel,关闭并设置同步方式 future.channel().closeFuture().sync(); }catch (Exception e) { e.printStackTrace(); }finally { //使用一种优雅的方式进行关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
建立初始化器HelloNettyServerInitializer
package com.phubing.netty; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; /** * 这是一个初始化器,chanel注册后会执行里面相应的初始化方法(也就是将handler逐个进行添加) * * @author phubing * */ public class HelloNettyServerInitializer extends ChannelInitializer<SocketChannel>{ //对chanel进行初始化 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //通过socketChannel去获得对应的管道 ChannelPipeline channelPipeline = socketChannel.pipeline(); /** * pipeline中会有很多handler类(也称之拦截器类) * 获得pipeline之后,可以直接.add,添加不管是自己开发的handler还是netty提供的handler * */ //一般来讲添加到last即可,添加了一个handler并且取名为HttpServerCodec //当请求到达服务端,要做解码,响应到客户端做编码 channelPipeline.addLast("HttpServerCodec", new HttpServerCodec()); //添加自定义的CustomHandler这个handler,返回Hello Netty channelPipeline.addLast("customHandler", new CustomHandler()); } }
建立自定义处理器CustomHandler
package com.phubing.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.util.CharsetUtil; /** * 创建自定义助手类 * @author phubing * */ //处理的请求是:客户端向服务端发起送数据,先把数据放在缓冲区,服务器端再从缓冲区读取,类似于[ 入栈, 入境 ] public class CustomHandler extends SimpleChannelInboundHandler<HttpObject>{//Http请求,所以使用HttpObject @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channel注册"); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channel注册"); super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel活跃状态"); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端与服务端断开连接之后"); super.channelInactive(ctx); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channel读取数据完毕"); super.channelReadComplete(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("用户事件触发"); super.userEventTriggered(ctx, evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { System.out.println("channel可写事件更改"); super.channelWritabilityChanged(ctx); } @Override //channel发生异常,若不关闭,随着异常channel的逐渐增多,性能也就随之下降 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("捕获channel异常"); super.exceptionCaught(ctx, cause); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("助手类添加"); super.handlerAdded(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("助手类移除"); super.handlerRemoved(ctx); } /** * ChannelHandlerContext:上下文对象 * * */ @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { //获取当前channel Channel currentChannel = ctx.channel(); //判断msg是否为一个HttpRequest的请求类型 if(msg instanceof HttpRequest) { //客户端远程地址 System.out.println(currentChannel.remoteAddress()); /** * * 未加判断类型,控制台打印的远端地址如下: * /0:0:0:0:0:0:0:1:5501 /0:0:0:0:0:0:0:1:5501 /0:0:0:0:0:0:0:1:5502 /0:0:0:0:0:0:0:1:5502 /0:0:0:0:0:0:0:1:5503 /0:0:0:0:0:0:0:1:5503 * * 原因是接收的MSG没有进行类型判断 * * * 增加了判断,为何还会打印两次? * /0:0:0:0:0:0:0:1:5605 /0:0:0:0:0:0:0:1:5605 * * 打开浏览器的network会发现,客户端对服务端进行了两次请求: * 1、第一次是所需的 * 2、第二次是一个icon * 因为没有加路由(相当于Springmvc中的requestMapping),只要发起请求,就都会到handler中去 * */ /** * 在Linux中也可以通过CURL 本机Ip:端口号 发送请求(只打印一次,干净的请求) */ //定义发送的消息(不是直接发送,而是要把数据拷贝到缓冲区,通过缓冲区) //Unpooed:是一个专门用于拷贝Buffer的深拷贝,可以有一个或多个 //CharsetUtil.UTF_8:Netty提供 ByteBuf content = Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8); //构建一个HttpResponse,响应客户端 FullHttpResponse response = /** * params1:针对Http的版本号 * params2:状态(响应成功或失败) * params3:内容 */ //HttpVersion.HTTP_1_1:默认开启keep-alive new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); //设置当前内容长度、类型等 response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); //readableBytes:可读长度 response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); //通过长下文对象,把响应刷到客户端 ctx.writeAndFlush(response); } } }
Channel生命周期
对于每一个Channel(即客户端)每一次连接到服务端之后都会有一个上下文对象ChannelHandlerContext,对于每一个Channel来讲,都是有生命周期的。
@Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channel注册"); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channel注册"); super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel活跃状态"); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端与服务端断开连接之后"); super.channelInactive(ctx); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channel读取数据完毕"); super.channelReadComplete(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("用户事件触发"); super.userEventTriggered(ctx, evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { System.out.println("channel可写事件更改"); super.channelWritabilityChanged(ctx); } @Override //channel发生异常,若不关闭,随着异常channel的逐渐增多,性能也就随之下降 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("捕获channel异常"); super.exceptionCaught(ctx, cause); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("助手类添加"); super.handlerAdded(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("助手类移除"); super.handlerRemoved(ctx); }
打印结果
助手类添加
助手类添加
channel注册
channel注册
channel活跃状态
channel活跃状态
/0:0:0:0:0:0:0:1:6184
channel读取数据完毕
/0:0:0:0:0:0:0:1:6184
channel读取数据完毕
注:Linux中使用CURL的形式调试不会保持kepp-alive