SpringBoot+Netty开发IM即时通讯系列(一)

简介: 简单来讲,Netty是一个提供了易于使用的API的客户端/服务端框架。Netty并发非常高,一个非阻塞的IO,Netty传输速度也非常快,因为他是0拷贝,什么是零拷贝?NIO中的特性之一就是零拷贝,在Java中,内存分为堆和栈以及字符串常量值等等,如果有一些数据从IO中读取并且放到堆里面,中间会经过一些缓冲区。

SpringBoot+Netty开发IM即时通讯系列(一)


前言


最近项目的需求有IM通讯这个模块,经过与老大商量决定使用SpringBoot+Netty的方式构建。于是,在这个系列中记录下过程中的学习历程以及撸码上线,以供日后参考。


如果文中有不当或错误请指出,虚心接受批评。


Netty


先来看下官方定义:


Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.


'Quick and easy' doesn't mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.


什么意思?


简单来讲,Netty是一个提供了易于使用的API的客户端/服务端框架。Netty并发非常高,一个非阻塞的IO,Netty传输速度也非常快,因为他是0拷贝,什么是零拷贝?NIO中的特性之一就是零拷贝,在Java中,内存分为堆和栈以及字符串常量值等等,如果有一些数据从IO中读取并且放到堆里面,中间会经过一些缓冲区。


具体来讲,如果要从IO中读取数据,分为两个步骤:


(1)从IO流中读取出来放到缓冲区,程序从缓冲区中读取,再放到堆中,此时数据就会被拷贝两次才能到达堆或者堆内存中。如果数据量很大,那么就会造成资源的浪费

(2)Netty其实就是利用NIO中的零拷贝特性,当Netty需要接收数据或者传输数据的时候,就会新开辟一块堆内存,然后数据就直接通过IO读取到了新开辟的堆内存中,这样也就加快了数据传输的速度。


阻塞与非阻塞


线程在访问某一个资源的时候,该资源是否准备就绪的一种处理方式,如果说该资源当前没准备就绪,这个时候就会有两种处理方式:阻塞与非阻塞


   (1)阻塞:这个线程会一直持续等待这个资源就绪并处理完毕,直到响应返回一个结果,这个时候线程是一直阻塞状态,不可以去做任何事情


   (2)非阻塞:这个线程直接返回结果,不会持续等待这个资源处理完毕才响应,它会去请求别的资源。


20181214154115242.png


图摘自:https://blog.csdn.net/zk3326312/article/details/79400805


同步与异步


这里的 “同步与异步” 指的是访问数据的一种机制,类似于Ajax。


   (1)同步:主动请求,并且会等待IO操作完成之后,IO会有一个通知


   (2)异步:当一个线程主动请求数据之后,可以继续处理其他任务,发起其他请求,多个请求完成之后再逐一的通过异步形式通知


20181214154435966.png

图摘自:https://blog.csdn.net/Iamthedoctor123/article/details/79336789?utm_source=blogxgwz2


BIO


(1)同步式阻塞IO:Block IO,IO在进行读取的时候,这个线程是会被阻塞的,无法去做其他操作,传统简单,通信方式也是如此。并发处理能力非常低,线程之间访问资源通信的时候,耗时也非常久,所以会比较依赖于网速与带宽,JDK1.4之前均是如此。

(2)服务器会有一个专门的线程称之为Acceptor,专门用于负责监听来自客户端之间的请求,只要客户端与服务端有建立请求,此时客户端与服务端之间都会创建一个新的线程进行处理,这是一种典型的  一应一答的模式。随着客户端逐渐增多,两者之间会频繁的创建和频繁销毁线程,此时服务器端会有很大的压力,甚至宕机。

(3)改进之后则通过线程池来处理,这种方式也可以称之为伪异步IO。


NIO


(1)同步非阻塞IO:New IO(Non-Block IO):JDK1.4之后。selector(其实就是一个线程,并且会主动轮询),也称之为多路复用器Buffer,它是一个缓冲区;Chanel,是一个双向通道。


(2)客户端在与服务端进行通信的时候就会使用到selector,两者要建立链接的时候,客户端到selector进行一个注册,注册完毕之后就会有一个chanel01,每一个客户端和selector建立连接之后都会有一个chanel,chanel是一个双向通道,可以进行一些相应数据的读写,这些数据的读写都会到Buffer缓冲区中


(3)通过selector注册并建立chanel通道就可以实现两者之间的通信,chanel的数据是一种非阻塞的读取,如果没有数据会直接跳过,不会同步等待数据。selector其实一个单线程,整体来讲,线程开销会非常小,光是一个selector就可以处理成千上万个客户端,客户端的增多不会影响它的性能,这也是与BIO的区别所在。


(4)chanel相当于是一个读取的工具,每一个客户端都可以理解为一个单独的chanel,每一个服务端会有一个selector,Buffer的数据会进行读取,数据被读完之后还是会存在Buffer中,不会因为数据被读取之后会被消息,String中的数据读完之后就没有了。        


AIO


(1)异步非阻塞IO:AIO其实也是NIO2.0,它是一种异步非阻塞的通信方式,在NIO没有的基础上引入了一个异步概念:在读写的时候所有返回的类型其实就是一个feature对象,这个对象模型其实就是异步的在这个过程中会有一些事件监听


(2)异步阻塞IO:几乎用不到


20181214155029811.png


图摘自:imooc


Netty的三种线程模型


(1)Reactor线程模型:


       1)单线程模型:所有的IO操作都由同一个NIO线程处理,仅限于一些小型应用场景。但在高负载、高并发等情况下使用单线程肯定就不太合理,主要是因为NIO的一个线程同时要去处理成千上万的请求 的时候,在性能上会支撑不了,即便CPU负载100%,对于海量消息的处理,编码解码以及读取、发送消息等情况,依然满足不了。


       2)当NIO的线程负载过重之后,整体服务性能处理就会变慢,结果就是导致客户端在向服务端发起请求、链接就会超时,由于客户端一般都会有一种超时机制,反复地向服务端再次发起请求,此时就相当于陷入了死循环,更加加重了服务器负载。


(2)多线程模型:由一组NIO线程处理IO操作


(3)主从线程模型:一组线程池接受请求,一组线程池处理IO


20181214160022316.png


HelloWorldNetty


一个最简单的Netty服务端包含了五个步骤:


(1)构建一对主从线程组

(2)定义服务器启动类

(3)为服务器设置Channel

(4)设置处理从线程池的助手类初始化器

(5)监听启动和关闭的服务器


1、引入相关依赖


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.pubing</groupId>
  <artifactId>helloNetty</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.25.Final</version>
    </dependency>
  </dependencies>
</project>


建立HelloNettyServer


package com.phubing.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
//实现客户端发送请求,服务器端会返回Hello Netty
public class HelloNettyServer {
  public static void main(String[] args) throws InterruptedException {
    /**
     * 定义一对线程组(两个线程池)
     * 
     */
    //主线程组,用于接收客户端的链接,但不做任何处理
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    //定义从线程组,主线程组会把任务转给从线程组进行处理
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      /**
       * 服务启动类,任务分配自动处理
       * 
       */
      ServerBootstrap serverBootstrap = new ServerBootstrap();
      //需要去针对一个之前的线程模型(上面定义的是主从线程)
      serverBootstrap.group(bossGroup, workerGroup)
        //设置NIO的双向通道
        .channel(NioServerSocketChannel.class)
        //子处理器,用于处理workerGroup
        /**
         * 设置chanel初始化器
         * 每一个chanel由多个handler共同组成管道(pipeline)
         */
        .childHandler(new HelloNettyServerInitializer()); 
      /**
       * 启动
       * 
       */
      //绑定端口,并设置为同步方式,是一个异步的chanel
      ChannelFuture future = serverBootstrap.bind(8888).sync();
      /**
       * 关闭
       */
      //获取某个客户端所对应的chanel,关闭并设置同步方式
      future.channel().closeFuture().sync();
    }catch (Exception e) {
      e.printStackTrace();
    }finally {
      //使用一种优雅的方式进行关闭
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
}


建立初始化器HelloNettyServerInitializer


package com.phubing.netty;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
/**
 * 这是一个初始化器,chanel注册后会执行里面相应的初始化方法(也就是将handler逐个进行添加)
 * 
 * @author phubing
 *
 */
public class HelloNettyServerInitializer extends ChannelInitializer<SocketChannel>{
  //对chanel进行初始化
  @Override
  protected void initChannel(SocketChannel socketChannel) throws Exception {
    //通过socketChannel去获得对应的管道
    ChannelPipeline channelPipeline = socketChannel.pipeline();
    /**
     * pipeline中会有很多handler类(也称之拦截器类)
     * 获得pipeline之后,可以直接.add,添加不管是自己开发的handler还是netty提供的handler
     * 
     */
    //一般来讲添加到last即可,添加了一个handler并且取名为HttpServerCodec
    //当请求到达服务端,要做解码,响应到客户端做编码
    channelPipeline.addLast("HttpServerCodec", new HttpServerCodec());
    //添加自定义的CustomHandler这个handler,返回Hello Netty
    channelPipeline.addLast("customHandler", new CustomHandler());
  }
}


建立自定义处理器CustomHandler


package com.phubing.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
/**
 * 创建自定义助手类
 * @author phubing
 *
 */
//处理的请求是:客户端向服务端发起送数据,先把数据放在缓冲区,服务器端再从缓冲区读取,类似于[ 入栈, 入境 ]
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject>{//Http请求,所以使用HttpObject
  @Override
  public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel注册");
    super.channelRegistered(ctx);
  }
  @Override
  public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel注册");
    super.channelUnregistered(ctx);
  }
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel活跃状态");
    super.channelActive(ctx);
  }
  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("客户端与服务端断开连接之后");
    super.channelInactive(ctx);
  }
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel读取数据完毕");
    super.channelReadComplete(ctx);
  }
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    System.out.println("用户事件触发");
    super.userEventTriggered(ctx, evt);
  }
  @Override
  public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel可写事件更改");
    super.channelWritabilityChanged(ctx);
  }
  @Override
  //channel发生异常,若不关闭,随着异常channel的逐渐增多,性能也就随之下降
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("捕获channel异常");
    super.exceptionCaught(ctx, cause);
  }
  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    System.out.println("助手类添加");
    super.handlerAdded(ctx);
  }
  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    System.out.println("助手类移除");
    super.handlerRemoved(ctx);
  }
  /**
   * ChannelHandlerContext:上下文对象
   * 
   * 
   */
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
    //获取当前channel
    Channel currentChannel = ctx.channel();
    //判断msg是否为一个HttpRequest的请求类型
    if(msg instanceof HttpRequest) {
      //客户端远程地址
      System.out.println(currentChannel.remoteAddress());
      /**
       * 
       * 未加判断类型,控制台打印的远端地址如下:
       * 
          /0:0:0:0:0:0:0:1:5501
        /0:0:0:0:0:0:0:1:5501
        /0:0:0:0:0:0:0:1:5502
        /0:0:0:0:0:0:0:1:5502
        /0:0:0:0:0:0:0:1:5503
        /0:0:0:0:0:0:0:1:5503
       * 
       * 原因是接收的MSG没有进行类型判断
       * 
       * 
       * 增加了判断,为何还会打印两次?
       * 
        /0:0:0:0:0:0:0:1:5605
        /0:0:0:0:0:0:0:1:5605
       * 
       * 打开浏览器的network会发现,客户端对服务端进行了两次请求:
       *  1、第一次是所需的
       *  2、第二次是一个icon
       *  因为没有加路由(相当于Springmvc中的requestMapping),只要发起请求,就都会到handler中去
       * 
       */
      /**
       * 在Linux中也可以通过CURL 本机Ip:端口号 发送请求(只打印一次,干净的请求)      
       */
      //定义发送的消息(不是直接发送,而是要把数据拷贝到缓冲区,通过缓冲区)
      //Unpooed:是一个专门用于拷贝Buffer的深拷贝,可以有一个或多个
      //CharsetUtil.UTF_8:Netty提供
      ByteBuf content = Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8);
      //构建一个HttpResponse,响应客户端
      FullHttpResponse response = 
          /**
           * params1:针对Http的版本号
           * params2:状态(响应成功或失败)
           * params3:内容
           */
          //HttpVersion.HTTP_1_1:默认开启keep-alive
          new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
      //设置当前内容长度、类型等
      response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
      //readableBytes:可读长度
      response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
      //通过长下文对象,把响应刷到客户端
      ctx.writeAndFlush(response);
    }
  }
}


Channel生命周期


对于每一个Channel(即客户端)每一次连接到服务端之后都会有一个上下文对象ChannelHandlerContext,对于每一个Channel来讲,都是有生命周期的。


@Override
  public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel注册");
    super.channelRegistered(ctx);
  }
  @Override
  public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel注册");
    super.channelUnregistered(ctx);
  }
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel活跃状态");
    super.channelActive(ctx);
  }
  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("客户端与服务端断开连接之后");
    super.channelInactive(ctx);
  }
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel读取数据完毕");
    super.channelReadComplete(ctx);
  }
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    System.out.println("用户事件触发");
    super.userEventTriggered(ctx, evt);
  }
  @Override
  public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel可写事件更改");
    super.channelWritabilityChanged(ctx);
  }
  @Override
  //channel发生异常,若不关闭,随着异常channel的逐渐增多,性能也就随之下降
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("捕获channel异常");
    super.exceptionCaught(ctx, cause);
  }
  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    System.out.println("助手类添加");
    super.handlerAdded(ctx);
  }
  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    System.out.println("助手类移除");
    super.handlerRemoved(ctx);
  }


打印结果


   助手类添加

   助手类添加

   channel注册

   channel注册

   channel活跃状态

   channel活跃状态

   /0:0:0:0:0:0:0:1:6184

   channel读取数据完毕

   /0:0:0:0:0:0:0:1:6184

   channel读取数据完毕


注:Linux中使用CURL的形式调试不会保持kepp-alive


推荐博文:https://blog.csdn.net/anxpp/article/details/51512200

目录
相关文章
|
3月前
|
前端开发 JavaScript 网络安全
Web网页端即时通讯源码/IM聊天源码RainbowChat-Web
RainbowChat-Web是一套基于MobileIMSDK-Web的网页端IM系统。不同于市面上某些开源练手或淘宝售卖的demo级代码,RainbowChat-Web的产品级代码演化自真正运营过的商业产品,其所依赖的通信层核心SDK已在数年内经过大量客户及其辐射的最终用户的使用和验证。RainbowChat-Web同时也是移动端IM应用RainbowChat的姊妹产品。
124 0
|
移动开发 网络协议 小程序
基于开源IM即时通讯框架MobileIMSDK:RainbowChat-iOS端v10.0版已发布
RainbowChat是一套基于开源IM即时通讯聊天框架 MobileIMSDK 的产品级移动端IM系统。RainbowChat源于真实运营的产品,解决了大量的屏幕适配、细节优化、机器兼容问题。RainbowChat可能是市面上提供im即时通讯聊天源码的,唯一一款同时支持TCP、UDP两种通信协议的IM产品。与姊妹产品RainbowTalk和RainbowChat-Web 技术同源,历经考验。
21 0
基于开源IM即时通讯框架MobileIMSDK:RainbowChat-iOS端v10.0版已发布
|
1月前
|
数据安全/隐私保护 容器 Go
开源IM即时通讯系统调研
Lumen IM 是一款企业级开源即时通讯工具,前端采用 Vue3 + Naive UI,后端基于 Go 语言,使用 WebSocket 协议。支持 Docker + Nginx 快速部署,适合私有化环境。功能包括文本、图片、文件消息,内置笔记、群聊及消息历史记录。界面美观、功能完善,适用于企业沟通、团队协作及开发者学习。提供前后端源码,便于快速搭建 IM 系统。
开源IM即时通讯系统调研
|
1月前
|
移动开发 网络协议 小程序
鸿蒙NEXT即时通讯/IM系统RinbowTalk v2.4版发布,基于MobileIMSDK框架、ArkTS编写
RainbowTalk是一套基于开源即时通讯讯IM框架 MobileIMSDK 的产品级鸿蒙NEXT端IM系统。纯ArkTS编写、全新开发,没有套壳、也没走捷径,每一行代码都够“纯血”。与姊妹产品RainbowChat和RainbowChat-Web 技术同源,历经考验。
82 1
|
2月前
|
缓存 移动开发 网络协议
纯血鸿蒙NEXT即时通讯/IM系统:RinbowTalk正式发布,全源码、纯ArkTS编写
RainbowTalk是一套基于MobileIMSDK的产品级鸿蒙NEXT端IM系统,目前已正式发布。纯ArkTS、从零编写,无套壳、没走捷径,每一行代码都够“纯”(详见:《RainbowTalk详细介绍》)。 MobileIMSDK是一整套开源IM即时通讯框架,历经10年,超轻量级、高度提炼,一套API优雅支持 UDP 、TCP 、WebSocket 三种协议,支持 iOS、Android、H5、标准Java、小程序、Uniapp、鸿蒙NEXT,服务端基于Netty编写。
200 1
|
2月前
|
测试技术 开发工具 git
基于WebSocket即时通讯im源码| uniapp即时通讯源码| 私有化部署SDK视频安装教程
本项目是基于 ThinkPHP7 和 Swoole 构建的即时通讯 IM 源码,打造了一个简洁美观、移动优先的渐进式 Web 应用。支持从源码构建,并提供详细的安装、配置与使用说明。仓库地址:im.jstxym.top。
|
11天前
|
前端开发 安全 Java
基于springboot+vue开发的会议预约管理系统
一个完整的会议预约管理系统,包含前端用户界面、管理后台和后端API服务。 ### 后端 - **框架**: Spring Boot 2.7.18 - **数据库**: MySQL 5.6+ - **ORM**: MyBatis Plus 3.5.3.1 - **安全**: Spring Security + JWT - **Java版本**: Java 11 ### 前端 - **框架**: Vue 3.3.4 - **UI组件**: Element Plus 2.3.8 - **构建工具**: Vite 4.4.5 - **状态管理**: Pinia 2.1.6 - **HTTP客户端
97 4
基于springboot+vue开发的会议预约管理系统
|
4月前
|
JavaScript 前端开发 Java
制造业ERP源码,工厂ERP管理系统,前端框架:Vue,后端框架:SpringBoot
这是一套基于SpringBoot+Vue技术栈开发的ERP企业管理系统,采用Java语言与vscode工具。系统涵盖采购/销售、出入库、生产、品质管理等功能,整合客户与供应商数据,支持在线协同和业务全流程管控。同时提供主数据管理、权限控制、工作流审批、报表自定义及打印、在线报表开发和自定义表单功能,助力企业实现高效自动化管理,并通过UniAPP实现移动端支持,满足多场景应用需求。
446 1
|
5月前
|
前端开发 Java 关系型数据库
基于Java+Springboot+Vue开发的鲜花商城管理系统源码+运行
基于Java+Springboot+Vue开发的鲜花商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的鲜花商城管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。技术学习共同进步
421 7