大话Redis系列--深入探讨多路复用(上)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 大话Redis系列--深入探讨多路复用(上)

今天让我们一起来探讨一下多路复用在redis中的具体实现原理。


Redis客户端和服务端的连接访问介绍



之前的文章中我们有稍微提到过,Redis的客户端和服务端之间是通过resp协议进行链接通信的。这种通信的本质其实是借助了socket套接字进行网络连接。如下图所示:


网络异常,图片无法展示
|


当客户端的jedis和redis服务器进行连接的时候,首先需要在对应操作系统的内核中建立连接,接着才会将对应的链接分配到指定的redis服务端程序当中。假设redis采用了默认的6379端口,那么对应的链接请求就会发送到对应的进程上。


如何理解多路复用


当请求处理完毕之后,这一整条的链接链路不会立马断开,而为维持长链接的状态,方便后续继续传输数据使用。


为了方便大家对于多路复用能有个更好的认识,我们接下来通过一段Java程序来认识多路复用的实际应用案例。


Java模拟实现服务端链接


这是一段非常简单的BIO程序,大概思路便是接纳外界链接请求,然后打印数据到控制台。


package org.idea.iedis.framework.server.io.core;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
 * @Author linhao
 * @Date created in 11:16 上午 2021/8/15
 */
public class BioServer {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(6378);
        while (true) {
            System.out.println("wait conn --------");
            Socket socket = serverSocket.accept();
            System.out.println("conn ok ---------");
            byte[] bytes = new byte[1024];
            System.out.println("wait data ---------");
            socket.getInputStream().read(bytes);
            System.out.printf("data is {}" + new String(bytes));
        }
    }
}
复制代码


测试的方式我采用了nc命令进行模拟:


【idea @ Mac】>>>>>>nc localhost 6378
a
复制代码


当请求到达服务端的时候,也会在控制台有所打印输出:


网络异常,图片无法展示
|


上边代码的哪些点是堵塞的位置?


serverSocket.accept(); //堵塞部分
socket.getInputStream().read(bytes); //堵塞部分
复制代码


我们通过实操可以发现,在accept的位置会出现第一次堵塞的情况,当没有外界连接接入的时候,程序就会堵塞在该位置。


第二次堵塞的位置是在read函数调用的位置,当连接建立之后,服务端会一直等待客户端发送的数据请求,因此一直处于等待状况。


如果第一个连接建立之后迟迟不发送数据,后续又有第二个连接接入会出现什么问题?

为了模拟这个场景,我们来看下以下测试案例:


首先是服务端程序的部分改造,对于客户端的请求,我这里新增了一个handleData函数,用于模拟处理数据时候的堵塞情况(耗时大约为2秒左右)


package org.idea.iedis.framework.server.io.core;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
 * @Author linhao
 * @Date created in 11:16 上午 2021/8/15
 */
public class BioServer {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(6378);
        while (true) {
            System.out.println("wait conn --------");
            Socket socket = serverSocket.accept();
            System.out.println("conn ok --------- " + System.currentTimeMillis());
            byte[] bytes = new byte[1024];
            System.out.println("wait data ---------");
            socket.getInputStream().read(bytes);
            handleData();
            System.out.printf("data is {}" + new String(bytes));
        }
    }
    /**
     * 处理数据信息
     */
    public static void handleData(){
        System.out.println("handling data begin ----------");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("handling data end ----------");
    }
}
复制代码


接着便是客户端部分的模拟,模拟五个客户端的连接服务端,并且发送数据:


package org.idea.iedis.framework.server.io.core;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * @Author linhao
 * @Date created in 11:17 上午 2021/8/15
 */
public class BioClient {
    public static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    Socket socket = new Socket();
                    try {
                        socket.connect(new InetSocketAddress("localhost", 6378));
                        socket.getOutputStream().write("this is test".getBytes());
                        System.out.println("send data -----------" + System.currentTimeMillis());
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
            threadPoolExecutor.submit(t);
        }
        Thread.yield();
    }
}
复制代码


通过请求的日志记录可以发现,服务端对于数据的处理时间点如下:


网络异常,图片无法展示
|


也就是说当上一个连接在处理请求的时候,下一个连接发送的数据会处于一个就绪状态,单线程依次执行。


多线程异步等待模型改造


将服务端处理线程请求的模块开启单独的线程进行处理。


服务端程序进行改造:


package org.idea.iedis.framework.server.io.core;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
 * @Author linhao
 * @Date created in 11:16 上午 2021/8/15
 */
public class BioServer {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(6378);
        while (true) {
            System.out.println("wait conn --------");
            Socket socket = serverSocket.accept();
            System.out.println("conn ok --------- " + System.currentTimeMillis());
            byte[] bytes = new byte[1024];
            System.out.println("wait data ---------");
            socket.getInputStream().read(bytes);
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    handleData();
                }
            });
            t.start();
            System.out.printf("data is {}" + new String(bytes));
        }
    }
    /**
     * 处理数据信息
     */
    public static void handleData(){
        System.out.println("handling data begin ----------");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("handling data end ----------");
    }
}
复制代码


此时控制台打印出来的结果如下所示:


网络异常,图片无法展示
|


看起来似乎是请求处理的效率变高了,但是实际上这种方案对于性能的开销是比较大的。


假设我们同时有1000个请求发送过来,那么服务端就得启动1000个线程处理。


假设建立了1000个连接,但是其中只有10条连接发送了数据,那么这个开销从利用率来说更加不划算。


其实在tomcat7之前,其服务端的程序一直都是采用bio的模式来进行处理请求数据,每次发送请求之后都需要额外创建一个线程处理数据,因此其并发处理能力并不友好。


改善思路

  • 当没有建立服务端连接的时候,accept函数调用的时候不要出现阻塞,而是直接跳过。
  • 在没有数据抵达服务端的时候,read函数调用采用非阻塞的方式执行。


有了这两个思路点之后,我们再来深入思考:


假设我们是jdk的开发者,需要对现有的代码做怎样的调整?


这里分享一下我自己的思路:

首先ServerSocket是一个已经存在于JDK内部的对象,而且不建议随意对已有的api进行调整,否则会对业界使用者造成一个不兼容的情况。


因此不妨可以通过一些setter方法去修改阻塞的设置项:


网络异常,图片无法展示
|


来看看实际JDK开发者是如何改良这部分设计的:


首先是使用nio搭建一个简单的服务端代码案例:


package org.idea.iedis.framework.server.io.core.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
/**
 * @Author linhao
 * @Date created in 8:41 上午 2021/8/16
 */
public class NioServer {
    static ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    static List<SocketChannel> channelList = new ArrayList<>();
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress("localhost", 6666);
        serverSocketChannel.bind(socketAddress);
        serverSocketChannel.configureBlocking(false);
        //有点类似于poll模型
        while (true) {
            for (SocketChannel socketChannel : channelList) {
                int read = socketChannel.read(byteBuffer);
                if (read > 0) {
                    System.out.println("read ... " + read);
                    byteBuffer.flip();
                    byte[] bs = new byte[read];
                    byteBuffer.get(bs);
                    String content = new String(bs);
                    System.out.println(content);
                    byteBuffer.flip();
                }
            }
            SocketChannel accept = serverSocketChannel.accept();
            if (accept != null) {
                System.out.println("conn success -----");
                accept.configureBlocking(false);
                channelList.add(accept);
                System.out.println(channelList.size() + "---- list --- size");
            }
        }
    }
}
复制代码


接着是基于这套服务端代码去编写相关的客户端程序脚本:


package org.idea.iedis.framework.server.io.core.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
 * @Author linhao
 * @Date created in 8:40 上午 2021/8/16
 */
public class NioClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 6666));
        socketChannel.configureBlocking(false);
        socketChannel.write(ByteBuffer.wrap("this is a test".getBytes()));
        System.out.println("-----");
    }
}
复制代码


上边的这段代码总体的执行思路其实可以抽像化为以下模式:(假设有三个链接请求)


select (3 sockets){
for(Socket socket: sockets) {
int read = socket.getInputStream();    
 if(read >0){  
     // 对应处理代码   
       } else {  
    // 对应处理代码  
       }  
  }
}
复制代码


但是这样的代码我们在进行实际落地实现的时候不可能采用Java程序去实现,通常都是在os底层来进行封装,然后供高级语言去调用。


思考:

假设构建了100w条链接,那么对于这一批的链接采用暴力的list遍历方式去询问是否有新数据的写入就会显得效率异常低下。


改善点:

假设当我们建立好了链接请求之后,每个链接都会订阅某个事件,假设是有数据写入的时候,主动去发布这个事件,这样就能避免轮训操作了。


NIO内部如何解决多路复用机制



从这个角度思考出发,我们一起来看下nio内部是如何实现多路复用机制的,

当有新的请求过来的时候,server会提前将accept事件委托给selector处理。

selector会根据连接的事件类型(例如read,write,accept)去通知到具体的socketserver。


来看看案例代码:


服务端新增一个selector代码


package org.idea.iedis.framework.server.io.core.nio.selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
/**
 * @Author linhao
 * @Date created in 8:27 上午 2021/8/17
 */
public class NioSelectorServer {
    static ArrayList<SocketChannel> socketChannels = new ArrayList<>();
    static ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress("localhost", 6555);
        serverSocket.configureBlocking(false);
        serverSocket.bind(socketAddress);
        Selector selector = Selector.open();
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("start select!");
        while (true) {
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isAcceptable()) {
                    SocketChannel socketChannel = serverSocket.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                    System.out.println("client is connected!");
                } else if (selectionKey.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    int len = socketChannel.read(byteBuffer);
                    if (len > 0) {
                        System.out.println("receive data!" + new String(byteBuffer.array(),0,len));
                    } else if (len == -1) {
                        System.out.println("client is close");
                        socketChannel.close();
                    }
                }
                iterator.remove();
            }
        }
    }
}
复制代码


客户端向服务端发送数据的案例代码:


package org.idea.iedis.framework.server.io.core.nio.selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
 * @Author linhao
 * @Date created in 8:53 上午 2021/8/17
 */
public class NioSelectorClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost",6555));
        socketChannel.configureBlocking(false);
        while (true) {
            System.out.println("client send data");
            socketChannel.write(ByteBuffer.wrap(("this is success " + Thread.currentThread().getName()).getBytes()));
            Thread.sleep(2500);
        }
    }
}
复制代码


从上边服务端的程序案例可以看出,首先获取到链接之后,会将一个接收事件(accept)给注册到一个叫做selector的组件上。接下来后续这个链接如果有发送请求到数据过来就会被识别是read事件,此时selector再会去通知具体到服务线程处理。大概到思路如下图所示:


网络异常,图片无法展示
|


思考


selector在进行select的时候,会发生什么情况?


如果外界没有额外请求进入的话,那么select函数会进入一个堵塞等待的状态。看起来效果有些类似于bio的accept函数。


selector调用open函数,它会发生了什么变化?


我们对selector的open函数源代码进行深入挖掘,可以发现具体如下所示:

网络异常,图片无法展示
|


在其源代码的底部调用了JDK内部的sun.nio.ch.DefaultSelectorProvider.create()函数。这个函数的代码底层实际上会调用到jdk内部的一些函数,再由jdk内部去调用一些native方法,最终会调用到os中的接口,假设该程序所运作的os环境是linux操作系统,那么最底层就会调用到epool相关的三个函数。


epoll_create,epoll_ctl,epoll_wait。


Redis底层源代码中是如何实现多路复用的



这里我打开的是redis6.0的源代码进行阅读:

文件名称:ae_epoll.c


ps:由于自己对于C源代码并不是特别熟悉,所以这里只能简单说下

下边的这张截图中体现了使用epoll_create函数取创建一个链接。


网络异常,图片无法展示
|


这段代码则是使用了epollo_ctl函数来处理一些事件的注册处理。


网络异常,图片无法展示
|


下边的epoll_wait看起来则像是监听某些兴趣事件。


网络异常,图片无法展示
|


由于文章篇幅有限,后边会接着出一篇文章继续深入研究下多路复用的内容。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
5月前
|
存储 缓存 NoSQL
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
redis分布式锁、redisson、可重入、主从一致性、WatchDog、Redlock红锁、zookeeper;Redis集群、主从复制,全量同步、增量同步;哨兵,分片集群,Redis为什么这么快,I/O多路复用模型——用户空间和内核空间、阻塞IO、非阻塞IO、IO多路复用,Redis网络模型
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
|
2月前
|
设计模式 NoSQL 网络协议
大数据-48 Redis 通信协议原理RESP 事件处理机制原理 文件事件 时间事件 Reactor多路复用
大数据-48 Redis 通信协议原理RESP 事件处理机制原理 文件事件 时间事件 Reactor多路复用
44 2
|
7月前
|
存储 NoSQL Linux
Redis入门到通关之多路复用详解
Redis入门到通关之多路复用详解
71 1
|
7月前
|
NoSQL 架构师 网络协议
Redis系列-15.Redis的IO多路复用原理解析(上)
Redis系列-15.Redis的IO多路复用原理解析
313 1
|
7月前
|
监控 NoSQL Unix
谈谈Redis中的多路复用
谈谈Redis中的多路复用
|
7月前
|
NoSQL Linux 应用服务中间件
Redis系列-15.Redis的IO多路复用原理解析(下)
Redis系列-15.Redis的IO多路复用原理解析
138 0
|
存储 设计模式 监控
【Redis原理机制 一】Redis高性能原因、单线程模型及多路复用技术
【Redis原理机制 一】Redis高性能原因、单线程模型及多路复用技术
268 0
|
NoSQL Linux API
一文搞懂 Redis高性能之IO多路复用
相信大家在面试过程中经常会被问到:“单线程的Redis为啥这么快?” 哈哈,反正我在面试时候经常会问候选人这个问题,这个问题其实是对redis内部机制的一个考察,可以牵扯出好多涉及底层深入原理的一些列问题。
一文搞懂 Redis高性能之IO多路复用
|
NoSQL 安全 大数据
REDIS01_单线程的概述、多线程的引入、概述IO多路复用、如何开启多线程(三)
REDIS01_单线程的概述、多线程的引入、概述IO多路复用、如何开启多线程(三)
164 0
REDIS01_单线程的概述、多线程的引入、概述IO多路复用、如何开启多线程(三)
|
设计模式 NoSQL Unix
REDIS01_单线程的概述、多线程的引入、概述IO多路复用、如何开启多线程(二)
REDIS01_单线程的概述、多线程的引入、概述IO多路复用、如何开启多线程(二)
169 0
REDIS01_单线程的概述、多线程的引入、概述IO多路复用、如何开启多线程(二)
下一篇
DataWorks