SpringBoot+Netty开发IM即时通讯系列(一)

简介: 简单来讲,Netty是一个提供了易于使用的API的客户端/服务端框架。Netty并发非常高,一个非阻塞的IO,Netty传输速度也非常快,因为他是0拷贝,什么是零拷贝?NIO中的特性之一就是零拷贝,在Java中,内存分为堆和栈以及字符串常量值等等,如果有一些数据从IO中读取并且放到堆里面,中间会经过一些缓冲区。

SpringBoot+Netty开发IM即时通讯系列(一)


前言


最近项目的需求有IM通讯这个模块,经过与老大商量决定使用SpringBoot+Netty的方式构建。于是,在这个系列中记录下过程中的学习历程以及撸码上线,以供日后参考。


如果文中有不当或错误请指出,虚心接受批评。


Netty


先来看下官方定义:


Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.


'Quick and easy' doesn't mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.


什么意思?


简单来讲,Netty是一个提供了易于使用的API的客户端/服务端框架。Netty并发非常高,一个非阻塞的IO,Netty传输速度也非常快,因为他是0拷贝,什么是零拷贝?NIO中的特性之一就是零拷贝,在Java中,内存分为堆和栈以及字符串常量值等等,如果有一些数据从IO中读取并且放到堆里面,中间会经过一些缓冲区。


具体来讲,如果要从IO中读取数据,分为两个步骤:


(1)从IO流中读取出来放到缓冲区,程序从缓冲区中读取,再放到堆中,此时数据就会被拷贝两次才能到达堆或者堆内存中。如果数据量很大,那么就会造成资源的浪费

(2)Netty其实就是利用NIO中的零拷贝特性,当Netty需要接收数据或者传输数据的时候,就会新开辟一块堆内存,然后数据就直接通过IO读取到了新开辟的堆内存中,这样也就加快了数据传输的速度。


阻塞与非阻塞


线程在访问某一个资源的时候,该资源是否准备就绪的一种处理方式,如果说该资源当前没准备就绪,这个时候就会有两种处理方式:阻塞与非阻塞


   (1)阻塞:这个线程会一直持续等待这个资源就绪并处理完毕,直到响应返回一个结果,这个时候线程是一直阻塞状态,不可以去做任何事情


   (2)非阻塞:这个线程直接返回结果,不会持续等待这个资源处理完毕才响应,它会去请求别的资源。


20181214154115242.png


图摘自:https://blog.csdn.net/zk3326312/article/details/79400805


同步与异步


这里的 “同步与异步” 指的是访问数据的一种机制,类似于Ajax。


   (1)同步:主动请求,并且会等待IO操作完成之后,IO会有一个通知


   (2)异步:当一个线程主动请求数据之后,可以继续处理其他任务,发起其他请求,多个请求完成之后再逐一的通过异步形式通知


20181214154435966.png

图摘自:https://blog.csdn.net/Iamthedoctor123/article/details/79336789?utm_source=blogxgwz2


BIO


(1)同步式阻塞IO:Block IO,IO在进行读取的时候,这个线程是会被阻塞的,无法去做其他操作,传统简单,通信方式也是如此。并发处理能力非常低,线程之间访问资源通信的时候,耗时也非常久,所以会比较依赖于网速与带宽,JDK1.4之前均是如此。

(2)服务器会有一个专门的线程称之为Acceptor,专门用于负责监听来自客户端之间的请求,只要客户端与服务端有建立请求,此时客户端与服务端之间都会创建一个新的线程进行处理,这是一种典型的  一应一答的模式。随着客户端逐渐增多,两者之间会频繁的创建和频繁销毁线程,此时服务器端会有很大的压力,甚至宕机。

(3)改进之后则通过线程池来处理,这种方式也可以称之为伪异步IO。


NIO


(1)同步非阻塞IO:New IO(Non-Block IO):JDK1.4之后。selector(其实就是一个线程,并且会主动轮询),也称之为多路复用器Buffer,它是一个缓冲区;Chanel,是一个双向通道。


(2)客户端在与服务端进行通信的时候就会使用到selector,两者要建立链接的时候,客户端到selector进行一个注册,注册完毕之后就会有一个chanel01,每一个客户端和selector建立连接之后都会有一个chanel,chanel是一个双向通道,可以进行一些相应数据的读写,这些数据的读写都会到Buffer缓冲区中


(3)通过selector注册并建立chanel通道就可以实现两者之间的通信,chanel的数据是一种非阻塞的读取,如果没有数据会直接跳过,不会同步等待数据。selector其实一个单线程,整体来讲,线程开销会非常小,光是一个selector就可以处理成千上万个客户端,客户端的增多不会影响它的性能,这也是与BIO的区别所在。


(4)chanel相当于是一个读取的工具,每一个客户端都可以理解为一个单独的chanel,每一个服务端会有一个selector,Buffer的数据会进行读取,数据被读完之后还是会存在Buffer中,不会因为数据被读取之后会被消息,String中的数据读完之后就没有了。        


AIO


(1)异步非阻塞IO:AIO其实也是NIO2.0,它是一种异步非阻塞的通信方式,在NIO没有的基础上引入了一个异步概念:在读写的时候所有返回的类型其实就是一个feature对象,这个对象模型其实就是异步的在这个过程中会有一些事件监听


(2)异步阻塞IO:几乎用不到


20181214155029811.png


图摘自:imooc


Netty的三种线程模型


(1)Reactor线程模型:


       1)单线程模型:所有的IO操作都由同一个NIO线程处理,仅限于一些小型应用场景。但在高负载、高并发等情况下使用单线程肯定就不太合理,主要是因为NIO的一个线程同时要去处理成千上万的请求 的时候,在性能上会支撑不了,即便CPU负载100%,对于海量消息的处理,编码解码以及读取、发送消息等情况,依然满足不了。


       2)当NIO的线程负载过重之后,整体服务性能处理就会变慢,结果就是导致客户端在向服务端发起请求、链接就会超时,由于客户端一般都会有一种超时机制,反复地向服务端再次发起请求,此时就相当于陷入了死循环,更加加重了服务器负载。


(2)多线程模型:由一组NIO线程处理IO操作


(3)主从线程模型:一组线程池接受请求,一组线程池处理IO


20181214160022316.png


HelloWorldNetty


一个最简单的Netty服务端包含了五个步骤:


(1)构建一对主从线程组

(2)定义服务器启动类

(3)为服务器设置Channel

(4)设置处理从线程池的助手类初始化器

(5)监听启动和关闭的服务器


1、引入相关依赖


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.pubing</groupId>
  <artifactId>helloNetty</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.25.Final</version>
    </dependency>
  </dependencies>
</project>


建立HelloNettyServer


package com.phubing.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
//实现客户端发送请求,服务器端会返回Hello Netty
public class HelloNettyServer {
  public static void main(String[] args) throws InterruptedException {
    /**
     * 定义一对线程组(两个线程池)
     * 
     */
    //主线程组,用于接收客户端的链接,但不做任何处理
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    //定义从线程组,主线程组会把任务转给从线程组进行处理
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      /**
       * 服务启动类,任务分配自动处理
       * 
       */
      ServerBootstrap serverBootstrap = new ServerBootstrap();
      //需要去针对一个之前的线程模型(上面定义的是主从线程)
      serverBootstrap.group(bossGroup, workerGroup)
        //设置NIO的双向通道
        .channel(NioServerSocketChannel.class)
        //子处理器,用于处理workerGroup
        /**
         * 设置chanel初始化器
         * 每一个chanel由多个handler共同组成管道(pipeline)
         */
        .childHandler(new HelloNettyServerInitializer()); 
      /**
       * 启动
       * 
       */
      //绑定端口,并设置为同步方式,是一个异步的chanel
      ChannelFuture future = serverBootstrap.bind(8888).sync();
      /**
       * 关闭
       */
      //获取某个客户端所对应的chanel,关闭并设置同步方式
      future.channel().closeFuture().sync();
    }catch (Exception e) {
      e.printStackTrace();
    }finally {
      //使用一种优雅的方式进行关闭
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
}


建立初始化器HelloNettyServerInitializer


package com.phubing.netty;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
/**
 * 这是一个初始化器,chanel注册后会执行里面相应的初始化方法(也就是将handler逐个进行添加)
 * 
 * @author phubing
 *
 */
public class HelloNettyServerInitializer extends ChannelInitializer<SocketChannel>{
  //对chanel进行初始化
  @Override
  protected void initChannel(SocketChannel socketChannel) throws Exception {
    //通过socketChannel去获得对应的管道
    ChannelPipeline channelPipeline = socketChannel.pipeline();
    /**
     * pipeline中会有很多handler类(也称之拦截器类)
     * 获得pipeline之后,可以直接.add,添加不管是自己开发的handler还是netty提供的handler
     * 
     */
    //一般来讲添加到last即可,添加了一个handler并且取名为HttpServerCodec
    //当请求到达服务端,要做解码,响应到客户端做编码
    channelPipeline.addLast("HttpServerCodec", new HttpServerCodec());
    //添加自定义的CustomHandler这个handler,返回Hello Netty
    channelPipeline.addLast("customHandler", new CustomHandler());
  }
}


建立自定义处理器CustomHandler


package com.phubing.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
/**
 * 创建自定义助手类
 * @author phubing
 *
 */
//处理的请求是:客户端向服务端发起送数据,先把数据放在缓冲区,服务器端再从缓冲区读取,类似于[ 入栈, 入境 ]
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject>{//Http请求,所以使用HttpObject
  @Override
  public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel注册");
    super.channelRegistered(ctx);
  }
  @Override
  public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel注册");
    super.channelUnregistered(ctx);
  }
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel活跃状态");
    super.channelActive(ctx);
  }
  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("客户端与服务端断开连接之后");
    super.channelInactive(ctx);
  }
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel读取数据完毕");
    super.channelReadComplete(ctx);
  }
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    System.out.println("用户事件触发");
    super.userEventTriggered(ctx, evt);
  }
  @Override
  public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel可写事件更改");
    super.channelWritabilityChanged(ctx);
  }
  @Override
  //channel发生异常,若不关闭,随着异常channel的逐渐增多,性能也就随之下降
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("捕获channel异常");
    super.exceptionCaught(ctx, cause);
  }
  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    System.out.println("助手类添加");
    super.handlerAdded(ctx);
  }
  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    System.out.println("助手类移除");
    super.handlerRemoved(ctx);
  }
  /**
   * ChannelHandlerContext:上下文对象
   * 
   * 
   */
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
    //获取当前channel
    Channel currentChannel = ctx.channel();
    //判断msg是否为一个HttpRequest的请求类型
    if(msg instanceof HttpRequest) {
      //客户端远程地址
      System.out.println(currentChannel.remoteAddress());
      /**
       * 
       * 未加判断类型,控制台打印的远端地址如下:
       * 
          /0:0:0:0:0:0:0:1:5501
        /0:0:0:0:0:0:0:1:5501
        /0:0:0:0:0:0:0:1:5502
        /0:0:0:0:0:0:0:1:5502
        /0:0:0:0:0:0:0:1:5503
        /0:0:0:0:0:0:0:1:5503
       * 
       * 原因是接收的MSG没有进行类型判断
       * 
       * 
       * 增加了判断,为何还会打印两次?
       * 
        /0:0:0:0:0:0:0:1:5605
        /0:0:0:0:0:0:0:1:5605
       * 
       * 打开浏览器的network会发现,客户端对服务端进行了两次请求:
       *  1、第一次是所需的
       *  2、第二次是一个icon
       *  因为没有加路由(相当于Springmvc中的requestMapping),只要发起请求,就都会到handler中去
       * 
       */
      /**
       * 在Linux中也可以通过CURL 本机Ip:端口号 发送请求(只打印一次,干净的请求)      
       */
      //定义发送的消息(不是直接发送,而是要把数据拷贝到缓冲区,通过缓冲区)
      //Unpooed:是一个专门用于拷贝Buffer的深拷贝,可以有一个或多个
      //CharsetUtil.UTF_8:Netty提供
      ByteBuf content = Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8);
      //构建一个HttpResponse,响应客户端
      FullHttpResponse response = 
          /**
           * params1:针对Http的版本号
           * params2:状态(响应成功或失败)
           * params3:内容
           */
          //HttpVersion.HTTP_1_1:默认开启keep-alive
          new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
      //设置当前内容长度、类型等
      response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
      //readableBytes:可读长度
      response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
      //通过长下文对象,把响应刷到客户端
      ctx.writeAndFlush(response);
    }
  }
}


Channel生命周期


对于每一个Channel(即客户端)每一次连接到服务端之后都会有一个上下文对象ChannelHandlerContext,对于每一个Channel来讲,都是有生命周期的。


@Override
  public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel注册");
    super.channelRegistered(ctx);
  }
  @Override
  public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel注册");
    super.channelUnregistered(ctx);
  }
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel活跃状态");
    super.channelActive(ctx);
  }
  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("客户端与服务端断开连接之后");
    super.channelInactive(ctx);
  }
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel读取数据完毕");
    super.channelReadComplete(ctx);
  }
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    System.out.println("用户事件触发");
    super.userEventTriggered(ctx, evt);
  }
  @Override
  public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel可写事件更改");
    super.channelWritabilityChanged(ctx);
  }
  @Override
  //channel发生异常,若不关闭,随着异常channel的逐渐增多,性能也就随之下降
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("捕获channel异常");
    super.exceptionCaught(ctx, cause);
  }
  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    System.out.println("助手类添加");
    super.handlerAdded(ctx);
  }
  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    System.out.println("助手类移除");
    super.handlerRemoved(ctx);
  }


打印结果


   助手类添加

   助手类添加

   channel注册

   channel注册

   channel活跃状态

   channel活跃状态

   /0:0:0:0:0:0:0:1:6184

   channel读取数据完毕

   /0:0:0:0:0:0:0:1:6184

   channel读取数据完毕


注:Linux中使用CURL的形式调试不会保持kepp-alive


推荐博文:https://blog.csdn.net/anxpp/article/details/51512200

目录
相关文章
|
3月前
|
安全 前端开发 关系型数据库
IM即时通讯系统开发技术规则
IM即时通讯系统开发涵盖客户端与服务器端,涉及前端、后端、网络通信及多媒体处理等技术领域,支持文字、语音、图片、视频等多种实时交流方式。开发流程包括需求分析、技术选型、系统设计、开发实现、测试优化及部署维护等阶段,需关注网络通信、多媒体处理、安全性及可扩展性等关键技术点,广泛应用于社交、客服、团队协作及游戏等领域。
|
2月前
|
存储 网络协议 前端开发
基于开源IM即时通讯框架MobileIMSDK:RainbowChat v11.7版已发布
Android端主要更新内容: 1)[优化] 优化了首页“消息”列表中单聊类型未正确同步时的收发消息和点击后的处理逻辑; 2)[优化] 优化了首页“消息”列表中同一好友和陌生人会话不能自动合并的问题;
71 2
|
27天前
|
存储 自然语言处理 关系型数据库
基于阿里云通义千问开发智能客服与问答系统
在企业的数字化转型过程中,智能客服系统已成为提高客户满意度和降低运营成本的重要手段。阿里云的通义千问作为一款强大的大语言模型,具有自然语言理解、对话生成、知识检索等能力,非常适合用来开发智能客服与问答系统。 通过本博客,我们将演示如何基于阿里云的通义千问模型,结合阿里云相关产品如函数计算(FC)、API网关、RDS等,搭建一个功能齐全的智能客服系统。
86 5
|
1月前
|
Rust 前端开发 JavaScript
Wasm在即时通讯IM场景下的Web端应用性能提升初探
简单的来说,Wasm就是使用C/C++/Rust等语言编写的代码,经过编译后得到汇编指令,再通过JavaScript相关API将文件加载到Web容器中(即运行在Web容器中的汇编代码)。Wasm是一种可移植、体积小、加载快速的二进制格式,可以将各种编程语言的代码编译成Wasm模块,这些模块可以在现代浏览器中直接运行。尤其在涉及到GPU或CPU计算时优势相对比较明显。
36 0
|
2月前
|
移动开发 网络协议 小程序
基于开源IM即时通讯框架MobileIMSDK:RainbowChat-iOS端v9.1版已发布
RainbowChat是一套基于开源IM聊天框架 MobileIMSDK 的产品级移动端IM系统。RainbowChat源于真实运营的产品,解决了大量的屏幕适配、细节优化、机器兼容问题
67 5
|
3月前
|
移动开发 前端开发 JavaScript
开源即时通讯IM框架MobileIMSDK的H5端技术概览
开源即时通讯IM框架MobileIMSDK的H5端技术概览
72 2
开源即时通讯IM框架MobileIMSDK的H5端技术概览
|
4月前
|
前端开发 网络协议
Netty实战巅峰:从零构建高性能IM即时通讯系统,解锁并发通信新境界
【8月更文挑战第3天】Netty是一款高性能、异步事件驱动的网络框架,适用于开发高并发网络应用,如即时通讯(IM)系统。本文将指导你利用Netty从零构建高性能IM程序,介绍Netty基础及服务器/客户端设计。服务器端使用`ServerBootstrap`启动,客户端通过`Bootstrap`连接服务器。示例展示了简单的服务器启动过程。通过深入学习,可进一步实现用户认证等功能,打造出更完善的IM系统。
200 1
|
15天前
|
机器学习/深度学习 自然语言处理 搜索推荐
深度分析 | 2024主流的智能客服系统有哪些?他们是怎么实现的?
本文深入探讨了智能客服系统的使用方法和相关技术实现逻辑,涵盖前端交互、服务接入、逻辑处理、数据存储四大层面,以及自然语言处理、机器学习、语音识别与合成、数据分析与挖掘、知识库管理和智能推荐系统等核心技术,帮助企业更好地理解和应用智能客服系统,提升服务效率和客户满意度。
85 1
|
2月前
|
存储 自然语言处理 机器人
实战揭秘:当RAG遇上企业客服系统——从案例出发剖析Retrieval-Augmented Generation技术的真实表现与应用局限,带你深入了解背后的技术细节与解决方案
【10月更文挑战第3天】随着自然语言处理技术的进步,结合检索与生成能力的RAG技术被广泛应用于多个领域,通过访问外部知识源提升生成内容的准确性和上下文一致性。本文通过具体案例探讨RAG技术的优势与局限,并提供实用建议。例如,一家初创公司利用LangChain框架搭建基于RAG的聊天机器人,以自动化FAQ系统减轻客服团队工作负担。尽管该系统在处理简单问题时表现出色,但在面对复杂或多步骤问题时存在局限。此外,RAG系统的性能高度依赖于训练数据的质量和范围。因此,企业在采用RAG技术时需综合评估需求和技术局限性,合理规划技术栈,并辅以必要的人工干预和监督机制。
168 3
|
25天前
|
存储 人工智能 运维
最新榜单 | 盘点2024年10大主流工单系统
随着互联网的发展,工单系统因其多样化功能和高效管理能力,成为企业运营的重要工具。本文介绍了10大主流工单系统,包括合力亿捷、阿里云服务中台、华为云ROMA ServiceCore等,它们各具特色,帮助企业提升服务质量和运营效率,实现数字化转型。
45 7