SpringBoot+Netty开发IM即时通讯系列(一)

简介: 简单来讲,Netty是一个提供了易于使用的API的客户端/服务端框架。Netty并发非常高,一个非阻塞的IO,Netty传输速度也非常快,因为他是0拷贝,什么是零拷贝?NIO中的特性之一就是零拷贝,在Java中,内存分为堆和栈以及字符串常量值等等,如果有一些数据从IO中读取并且放到堆里面,中间会经过一些缓冲区。

SpringBoot+Netty开发IM即时通讯系列(一)


前言


最近项目的需求有IM通讯这个模块,经过与老大商量决定使用SpringBoot+Netty的方式构建。于是,在这个系列中记录下过程中的学习历程以及撸码上线,以供日后参考。


如果文中有不当或错误请指出,虚心接受批评。


Netty


先来看下官方定义:


Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.


'Quick and easy' doesn't mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.


什么意思?


简单来讲,Netty是一个提供了易于使用的API的客户端/服务端框架。Netty并发非常高,一个非阻塞的IO,Netty传输速度也非常快,因为他是0拷贝,什么是零拷贝?NIO中的特性之一就是零拷贝,在Java中,内存分为堆和栈以及字符串常量值等等,如果有一些数据从IO中读取并且放到堆里面,中间会经过一些缓冲区。


具体来讲,如果要从IO中读取数据,分为两个步骤:


(1)从IO流中读取出来放到缓冲区,程序从缓冲区中读取,再放到堆中,此时数据就会被拷贝两次才能到达堆或者堆内存中。如果数据量很大,那么就会造成资源的浪费

(2)Netty其实就是利用NIO中的零拷贝特性,当Netty需要接收数据或者传输数据的时候,就会新开辟一块堆内存,然后数据就直接通过IO读取到了新开辟的堆内存中,这样也就加快了数据传输的速度。


阻塞与非阻塞


线程在访问某一个资源的时候,该资源是否准备就绪的一种处理方式,如果说该资源当前没准备就绪,这个时候就会有两种处理方式:阻塞与非阻塞


   (1)阻塞:这个线程会一直持续等待这个资源就绪并处理完毕,直到响应返回一个结果,这个时候线程是一直阻塞状态,不可以去做任何事情


   (2)非阻塞:这个线程直接返回结果,不会持续等待这个资源处理完毕才响应,它会去请求别的资源。


20181214154115242.png


图摘自:https://blog.csdn.net/zk3326312/article/details/79400805


同步与异步


这里的 “同步与异步” 指的是访问数据的一种机制,类似于Ajax。


   (1)同步:主动请求,并且会等待IO操作完成之后,IO会有一个通知


   (2)异步:当一个线程主动请求数据之后,可以继续处理其他任务,发起其他请求,多个请求完成之后再逐一的通过异步形式通知


20181214154435966.png

图摘自:https://blog.csdn.net/Iamthedoctor123/article/details/79336789?utm_source=blogxgwz2


BIO


(1)同步式阻塞IO:Block IO,IO在进行读取的时候,这个线程是会被阻塞的,无法去做其他操作,传统简单,通信方式也是如此。并发处理能力非常低,线程之间访问资源通信的时候,耗时也非常久,所以会比较依赖于网速与带宽,JDK1.4之前均是如此。

(2)服务器会有一个专门的线程称之为Acceptor,专门用于负责监听来自客户端之间的请求,只要客户端与服务端有建立请求,此时客户端与服务端之间都会创建一个新的线程进行处理,这是一种典型的  一应一答的模式。随着客户端逐渐增多,两者之间会频繁的创建和频繁销毁线程,此时服务器端会有很大的压力,甚至宕机。

(3)改进之后则通过线程池来处理,这种方式也可以称之为伪异步IO。


NIO


(1)同步非阻塞IO:New IO(Non-Block IO):JDK1.4之后。selector(其实就是一个线程,并且会主动轮询),也称之为多路复用器Buffer,它是一个缓冲区;Chanel,是一个双向通道。


(2)客户端在与服务端进行通信的时候就会使用到selector,两者要建立链接的时候,客户端到selector进行一个注册,注册完毕之后就会有一个chanel01,每一个客户端和selector建立连接之后都会有一个chanel,chanel是一个双向通道,可以进行一些相应数据的读写,这些数据的读写都会到Buffer缓冲区中


(3)通过selector注册并建立chanel通道就可以实现两者之间的通信,chanel的数据是一种非阻塞的读取,如果没有数据会直接跳过,不会同步等待数据。selector其实一个单线程,整体来讲,线程开销会非常小,光是一个selector就可以处理成千上万个客户端,客户端的增多不会影响它的性能,这也是与BIO的区别所在。


(4)chanel相当于是一个读取的工具,每一个客户端都可以理解为一个单独的chanel,每一个服务端会有一个selector,Buffer的数据会进行读取,数据被读完之后还是会存在Buffer中,不会因为数据被读取之后会被消息,String中的数据读完之后就没有了。        


AIO


(1)异步非阻塞IO:AIO其实也是NIO2.0,它是一种异步非阻塞的通信方式,在NIO没有的基础上引入了一个异步概念:在读写的时候所有返回的类型其实就是一个feature对象,这个对象模型其实就是异步的在这个过程中会有一些事件监听


(2)异步阻塞IO:几乎用不到


20181214155029811.png


图摘自:imooc


Netty的三种线程模型


(1)Reactor线程模型:


       1)单线程模型:所有的IO操作都由同一个NIO线程处理,仅限于一些小型应用场景。但在高负载、高并发等情况下使用单线程肯定就不太合理,主要是因为NIO的一个线程同时要去处理成千上万的请求 的时候,在性能上会支撑不了,即便CPU负载100%,对于海量消息的处理,编码解码以及读取、发送消息等情况,依然满足不了。


       2)当NIO的线程负载过重之后,整体服务性能处理就会变慢,结果就是导致客户端在向服务端发起请求、链接就会超时,由于客户端一般都会有一种超时机制,反复地向服务端再次发起请求,此时就相当于陷入了死循环,更加加重了服务器负载。


(2)多线程模型:由一组NIO线程处理IO操作


(3)主从线程模型:一组线程池接受请求,一组线程池处理IO


20181214160022316.png


HelloWorldNetty


一个最简单的Netty服务端包含了五个步骤:


(1)构建一对主从线程组

(2)定义服务器启动类

(3)为服务器设置Channel

(4)设置处理从线程池的助手类初始化器

(5)监听启动和关闭的服务器


1、引入相关依赖


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.pubing</groupId>
  <artifactId>helloNetty</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.25.Final</version>
    </dependency>
  </dependencies>
</project>


建立HelloNettyServer


package com.phubing.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
//实现客户端发送请求,服务器端会返回Hello Netty
public class HelloNettyServer {
  public static void main(String[] args) throws InterruptedException {
    /**
     * 定义一对线程组(两个线程池)
     * 
     */
    //主线程组,用于接收客户端的链接,但不做任何处理
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    //定义从线程组,主线程组会把任务转给从线程组进行处理
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      /**
       * 服务启动类,任务分配自动处理
       * 
       */
      ServerBootstrap serverBootstrap = new ServerBootstrap();
      //需要去针对一个之前的线程模型(上面定义的是主从线程)
      serverBootstrap.group(bossGroup, workerGroup)
        //设置NIO的双向通道
        .channel(NioServerSocketChannel.class)
        //子处理器,用于处理workerGroup
        /**
         * 设置chanel初始化器
         * 每一个chanel由多个handler共同组成管道(pipeline)
         */
        .childHandler(new HelloNettyServerInitializer()); 
      /**
       * 启动
       * 
       */
      //绑定端口,并设置为同步方式,是一个异步的chanel
      ChannelFuture future = serverBootstrap.bind(8888).sync();
      /**
       * 关闭
       */
      //获取某个客户端所对应的chanel,关闭并设置同步方式
      future.channel().closeFuture().sync();
    }catch (Exception e) {
      e.printStackTrace();
    }finally {
      //使用一种优雅的方式进行关闭
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
}


建立初始化器HelloNettyServerInitializer


package com.phubing.netty;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
/**
 * 这是一个初始化器,chanel注册后会执行里面相应的初始化方法(也就是将handler逐个进行添加)
 * 
 * @author phubing
 *
 */
public class HelloNettyServerInitializer extends ChannelInitializer<SocketChannel>{
  //对chanel进行初始化
  @Override
  protected void initChannel(SocketChannel socketChannel) throws Exception {
    //通过socketChannel去获得对应的管道
    ChannelPipeline channelPipeline = socketChannel.pipeline();
    /**
     * pipeline中会有很多handler类(也称之拦截器类)
     * 获得pipeline之后,可以直接.add,添加不管是自己开发的handler还是netty提供的handler
     * 
     */
    //一般来讲添加到last即可,添加了一个handler并且取名为HttpServerCodec
    //当请求到达服务端,要做解码,响应到客户端做编码
    channelPipeline.addLast("HttpServerCodec", new HttpServerCodec());
    //添加自定义的CustomHandler这个handler,返回Hello Netty
    channelPipeline.addLast("customHandler", new CustomHandler());
  }
}


建立自定义处理器CustomHandler


package com.phubing.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
/**
 * 创建自定义助手类
 * @author phubing
 *
 */
//处理的请求是:客户端向服务端发起送数据,先把数据放在缓冲区,服务器端再从缓冲区读取,类似于[ 入栈, 入境 ]
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject>{//Http请求,所以使用HttpObject
  @Override
  public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel注册");
    super.channelRegistered(ctx);
  }
  @Override
  public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel注册");
    super.channelUnregistered(ctx);
  }
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel活跃状态");
    super.channelActive(ctx);
  }
  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("客户端与服务端断开连接之后");
    super.channelInactive(ctx);
  }
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel读取数据完毕");
    super.channelReadComplete(ctx);
  }
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    System.out.println("用户事件触发");
    super.userEventTriggered(ctx, evt);
  }
  @Override
  public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel可写事件更改");
    super.channelWritabilityChanged(ctx);
  }
  @Override
  //channel发生异常,若不关闭,随着异常channel的逐渐增多,性能也就随之下降
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("捕获channel异常");
    super.exceptionCaught(ctx, cause);
  }
  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    System.out.println("助手类添加");
    super.handlerAdded(ctx);
  }
  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    System.out.println("助手类移除");
    super.handlerRemoved(ctx);
  }
  /**
   * ChannelHandlerContext:上下文对象
   * 
   * 
   */
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
    //获取当前channel
    Channel currentChannel = ctx.channel();
    //判断msg是否为一个HttpRequest的请求类型
    if(msg instanceof HttpRequest) {
      //客户端远程地址
      System.out.println(currentChannel.remoteAddress());
      /**
       * 
       * 未加判断类型,控制台打印的远端地址如下:
       * 
          /0:0:0:0:0:0:0:1:5501
        /0:0:0:0:0:0:0:1:5501
        /0:0:0:0:0:0:0:1:5502
        /0:0:0:0:0:0:0:1:5502
        /0:0:0:0:0:0:0:1:5503
        /0:0:0:0:0:0:0:1:5503
       * 
       * 原因是接收的MSG没有进行类型判断
       * 
       * 
       * 增加了判断,为何还会打印两次?
       * 
        /0:0:0:0:0:0:0:1:5605
        /0:0:0:0:0:0:0:1:5605
       * 
       * 打开浏览器的network会发现,客户端对服务端进行了两次请求:
       *  1、第一次是所需的
       *  2、第二次是一个icon
       *  因为没有加路由(相当于Springmvc中的requestMapping),只要发起请求,就都会到handler中去
       * 
       */
      /**
       * 在Linux中也可以通过CURL 本机Ip:端口号 发送请求(只打印一次,干净的请求)      
       */
      //定义发送的消息(不是直接发送,而是要把数据拷贝到缓冲区,通过缓冲区)
      //Unpooed:是一个专门用于拷贝Buffer的深拷贝,可以有一个或多个
      //CharsetUtil.UTF_8:Netty提供
      ByteBuf content = Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8);
      //构建一个HttpResponse,响应客户端
      FullHttpResponse response = 
          /**
           * params1:针对Http的版本号
           * params2:状态(响应成功或失败)
           * params3:内容
           */
          //HttpVersion.HTTP_1_1:默认开启keep-alive
          new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
      //设置当前内容长度、类型等
      response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
      //readableBytes:可读长度
      response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
      //通过长下文对象,把响应刷到客户端
      ctx.writeAndFlush(response);
    }
  }
}


Channel生命周期


对于每一个Channel(即客户端)每一次连接到服务端之后都会有一个上下文对象ChannelHandlerContext,对于每一个Channel来讲,都是有生命周期的。


@Override
  public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel注册");
    super.channelRegistered(ctx);
  }
  @Override
  public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel注册");
    super.channelUnregistered(ctx);
  }
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel活跃状态");
    super.channelActive(ctx);
  }
  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("客户端与服务端断开连接之后");
    super.channelInactive(ctx);
  }
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel读取数据完毕");
    super.channelReadComplete(ctx);
  }
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    System.out.println("用户事件触发");
    super.userEventTriggered(ctx, evt);
  }
  @Override
  public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channel可写事件更改");
    super.channelWritabilityChanged(ctx);
  }
  @Override
  //channel发生异常,若不关闭,随着异常channel的逐渐增多,性能也就随之下降
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("捕获channel异常");
    super.exceptionCaught(ctx, cause);
  }
  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    System.out.println("助手类添加");
    super.handlerAdded(ctx);
  }
  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    System.out.println("助手类移除");
    super.handlerRemoved(ctx);
  }


打印结果


   助手类添加

   助手类添加

   channel注册

   channel注册

   channel活跃状态

   channel活跃状态

   /0:0:0:0:0:0:0:1:6184

   channel读取数据完毕

   /0:0:0:0:0:0:0:1:6184

   channel读取数据完毕


注:Linux中使用CURL的形式调试不会保持kepp-alive


推荐博文:https://blog.csdn.net/anxpp/article/details/51512200

目录
相关文章
|
2月前
|
安全 前端开发 关系型数据库
IM即时通讯系统开发技术规则
IM即时通讯系统开发涵盖客户端与服务器端,涉及前端、后端、网络通信及多媒体处理等技术领域,支持文字、语音、图片、视频等多种实时交流方式。开发流程包括需求分析、技术选型、系统设计、开发实现、测试优化及部署维护等阶段,需关注网络通信、多媒体处理、安全性及可扩展性等关键技术点,广泛应用于社交、客服、团队协作及游戏等领域。
|
29天前
|
开发框架 前端开发 网络协议
Spring Boot结合Netty和WebSocket,实现后台向前端实时推送信息
【10月更文挑战第18天】 在现代互联网应用中,实时通信变得越来越重要。WebSocket作为一种在单个TCP连接上进行全双工通信的协议,为客户端和服务器之间的实时数据传输提供了一种高效的解决方案。Netty作为一个高性能、事件驱动的NIO框架,它基于Java NIO实现了异步和事件驱动的网络应用程序。Spring Boot是一个基于Spring框架的微服务开发框架,它提供了许多开箱即用的功能和简化配置的机制。本文将详细介绍如何使用Spring Boot集成Netty和WebSocket,实现后台向前端推送信息的功能。
294 1
|
1月前
|
存储 网络协议 前端开发
基于开源IM即时通讯框架MobileIMSDK:RainbowChat v11.7版已发布
Android端主要更新内容: 1)[优化] 优化了首页“消息”列表中单聊类型未正确同步时的收发消息和点击后的处理逻辑; 2)[优化] 优化了首页“消息”列表中同一好友和陌生人会话不能自动合并的问题;
55 2
|
28天前
|
移动开发 网络协议 小程序
基于开源IM即时通讯框架MobileIMSDK:RainbowChat-iOS端v9.1版已发布
RainbowChat是一套基于开源IM聊天框架 MobileIMSDK 的产品级移动端IM系统。RainbowChat源于真实运营的产品,解决了大量的屏幕适配、细节优化、机器兼容问题
55 5
|
1月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
48 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
2月前
|
移动开发 前端开发 JavaScript
开源即时通讯IM框架MobileIMSDK的H5端技术概览
开源即时通讯IM框架MobileIMSDK的H5端技术概览
63 2
开源即时通讯IM框架MobileIMSDK的H5端技术概览
|
3月前
|
API 开发者
Netty运行原理问题之Netty实现低开发门槛的问题如何解决
Netty运行原理问题之Netty实现低开发门槛的问题如何解决
|
3月前
|
前端开发 Java Spring
springboot 整合 netty框架, 实现 心跳检测,自动重连
springboot 整合 netty框架, 实现 心跳检测,自动重连
|
3月前
|
编解码 NoSQL Redis
(十一)Netty实战篇:基于Netty框架打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。
|
3月前
|
前端开发 网络协议
Netty实战巅峰:从零构建高性能IM即时通讯系统,解锁并发通信新境界
【8月更文挑战第3天】Netty是一款高性能、异步事件驱动的网络框架,适用于开发高并发网络应用,如即时通讯(IM)系统。本文将指导你利用Netty从零构建高性能IM程序,介绍Netty基础及服务器/客户端设计。服务器端使用`ServerBootstrap`启动,客户端通过`Bootstrap`连接服务器。示例展示了简单的服务器启动过程。通过深入学习,可进一步实现用户认证等功能,打造出更完善的IM系统。
163 1
下一篇
无影云桌面