Bio多线程消费者模式撑起Redis后台任务的半壁江上,这篇我干了,大家随意!

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
日志服务 SLS,月写入数据量 50GB 1个月
简介: Bio多线程消费者模式撑起Redis后台任务的半壁江上,这篇我干了,大家随意!

640.png

Bio如何撑起Redis后台半壁江山



干货:

  1. Bio设计模式-----生产者消费者模式
  2. Aof异步刷盘-----Bio关键先生
  3. Bio引入Lazy free-----大Key删除


Redis引入Bio这套设计可以说支撑起了异步后台任务处理的半壁江山,为什么这么说呢?我们先来看下redis阻塞点:

  1. 集合全量查询和聚合操作;
  2. bigkey 删除;
  3. 清空数据库;
  4. AOF 日志同步写;
  5. 从库加载 RDB 文件


适合后台能处理的就有三个,2,3,4都适合后台子线程去处理,而且随着redis的发展,可能需要后台处理的越来越多,如何设计优秀的架构模式来支持现有和将来的需要异步处理的任务呢?答案就是:Bio系统 基于锁和共享变量来实现多线程生产者消费者模型


1. BIO系统设计与实现


1.1 流程图

640.png


1.2 流程介绍


Redis 主线程启动后,会使用操作系统提供的 pthread_create 函数创建 3 个子线程,分别 由它们负责 AOF 日志写操作、键值对删除以及文件关闭的异步执行。主线程通过一个链表形式的任务队列和子线程进行交互。当收到键值对删除和清空数据库 的操作时,主线程会把这个操作封装成一个任务,放入到任务队列中,然后给客户端返回 一个完成信息,表明删除已经完成。但实际上,这个时候删除还没有执行,等到后台子线程从任务队列中读取任务后,才开始 实际删除键值对,并释放相应的内存空间。因此,我们把这种异步删除也称为惰性删除 (lazy free)。此时,删除或清空操作不会阻塞主线程,这就避免了对主线程的性能影 响。和惰性删除类似,当 AOF 日志配置成 everysec 选项后,主线程会把 AOF 写日志操作封 装成一个任务,也放到任务队列中。后台子线程读取任务后,开始自行写入 AOF 日志,这 样主线程就不用一直等待 AOF 日志写完了。


这里有个地方需要你注意一下,异步的键值对删除和数据库清空操作是 Redis 4.0 后提供 的功能,Redis 也提供了新的命令来执行这两个操作


  • 键值对删除:当你的集合类型中有大量元素(例如有百万级别或千万级别元素)需要删除时,我建议你使用 UNLINK 命令
  • 清空数据库:可以在 FLUSHDB 和 FLUSHALL 命令后加上 ASYNC 选项,这样就可以让后台子线程异步地清空数据库,如下所示:
FLUSHDB ASYNC
FLUSHALL AYSNC


2. 源码分析更清楚


因为涉及到的源码还算可以,所以想黏贴出来让大家直接看,加油哇。


1. 任务的创建和初始化


对于一个任务,比如aof持久化任务,首先要初始化一个队列,在redis里面使用了redis本身的链表结构创建这个队列。这个队列须要满足如下特点:


  • 生产者发送任务到队列中。
  • 若是队列不为空,消费者从队列中取任务;不然消费者进入等待状态。


这里的消费者就是后台线程,而为了完成队列为空则等待的功能,redis使用了条件变量机制,其初始化代码以下:


//后台线程数组 大小是BIO_NUM_OPS 在redis的3.2.3版本中是常量2, 现在版本中是BIO_NUM_OPS=3
//#define BIO_NUM_OPS       3
//表示支持三种任务。对于每种任务,对应一个list用于存储任务
static pthread_t bio_threads[BIO_NUM_OPS]; 
//bio_mutex和bio_condvar用于控制并发
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
//存储任务
static list *bio_jobs[BIO_NUM_OPS];
//存储pending状态的任务数量
static unsigned long long bio_pending[BIO_NUM_OPS];


创建好变量之后就开始初始化bioInit


初始化函数bioInit


//初始化锁与条件变量
    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        pthread_cond_init(&bio_step_cond[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }
    ......
    ......
    //初始化线程 生成制定BIO_NUM_OPS数量的线程 线程传入一个编号j,0表明关闭文件,1表明aof初始化,2就是删除大Key
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }


在完成初始化任务之后,Redis就有了BIO_NUM_OPS个链表来表示任务队列,有BIO_NUM_OPS个线程调用bioProcessBackgroundJobs函数进行任务处理,参数是一个编号j,而且每一个队列都初始化了锁与条件变量作并发控制。


2. 任务入队列


任务入队列就是把一个任务放到链表的头部,而且把相应任务的pending值+1,表示这个队列里面未完成的任务多了一个。其中任务的结构如下:


struct bio_job {
    //job任务结构
    int fd; /* Fd for file based background jobs */
    lazy_free_fn *free_fn; /* Function that will free the provided arguments */
    void *free_args[]; /* List of arguments to be passed to the free function */
};
//提交Aof任务到队列
void bioCreateFsyncJob(int fd) {
    //申请任务job空间
    struct bio_job *job = zmalloc(sizeof(*job));
    job->fd = fd;
    //提交任务到队列
    bioSubmitJob(BIO_AOF_FSYNC, job);
}
//提交Lazy free任务到队列
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
    va_list valist;
    /* Allocate memory for the job structure and all required
     * arguments */
    struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));
    job->free_fn = free_fn;
    va_start(valist, arg_count);
    for (int i = 0; i < arg_count; i++) {
        job->free_args[i] = va_arg(valist, void *);
    }
    va_end(valist);
    bioSubmitJob(BIO_LAZY_FREE, job);
}
//提交关闭文件任务到队列
void bioCreateCloseJob(int fd) {
    struct bio_job *job = zmalloc(sizeof(*job));
    job->fd = fd;
    bioSubmitJob(BIO_CLOSE_FILE, job);
}
//提交任务
void bioSubmitJob(int type, struct bio_job *job) {
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    pthread_cond_signal(&bio_newjob_cond[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}
//返回指定类型的peding状态的作业的数量
unsigned long long bioPendingJobsOfType(int type) {
    unsigned long long val;
    pthread_mutex_lock(&bio_mutex[type]);
    val = bio_pending[type];
    pthread_mutex_unlock(&bio_mutex[type]);
    return val;
}


先拿Aof入队列作为事例讲解:先调用bioCreateFsyncJob为任务结构分配空间,而后调用bioSubmitJob函数使用listAddNodeTail函数把任务放到链表的头部。这里使用的是redis本身实现的链表。能够看到,进行链表操做的时候,要先加锁,这是由于这里的链表是共享资源。在任务成功加入队列之后,调用pthread_cond_signal函数,通知阻塞等待的线程继续执行。上面这个过程是共享变量使用的基本模式: 加锁、置条件为真(这里是任务入队列)、通知、解锁


3. 任务出队列


Redis已经作好了任务初始化的工做,而且能够在队列里面放置新的任务,那么当队列里面有任务的时候,第一步初始化的时候开启的后台线程就会调用bioProcessBackgroundJobs函数进行出队处理任务,其处理主要代码如下:


void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    //一堆校验 省略
    。。。。。。
    //加锁
    pthread_mutex_lock(&bio_mutex[type]);
    while(1) {
        listNode *ln;
        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            //条件不成立,等待
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            //被通知之后,中止阻塞,从新判断条件
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        //取走值之后,解锁
        pthread_mutex_unlock(&bio_mutex[type]);
        //完成出队之后,根据类型调用close、redis_fsync或者free_fn函数。
        if (type == BIO_CLOSE_FILE) {
            close(job->fd);
        } else if (type == BIO_AOF_FSYNC) {
            redis_fsync(job->fd)
        } else if (type == BIO_LAZY_FREE) {
            job->free_fn(job->free_args);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        //释放任务空间
        zfree(job);
        // 加锁删除队列中这个任务
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--; //队列中pending的数量减一
        //如果bioWaitStepOfType()上有阻塞的线程,则解除阻塞。
        pthread_cond_broadcast(&bio_step_cond[type]);
    }
}
线程类型 三种 0 关闭文件 1 aof 2 lazy free
#define BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE     2 /* Deferred objects freeing. */
//代表有上面三种类型的线程
#define BIO_NUM_OPS       3


上面的代码主要流程是,先判断当前的队列是否是空的,若是空的,则等待。不然,从队列中取出一个job结构,而且根据线程的类型决定调用什么函数。这里的类型是在建立线程的时候传入的参数得到的,是0 或者 1或者2。得到类型之后,从job里面取出fd做为参数,调用close函数或者redis_fsync函数。fd是一个文件描述符,因此,在任务加入队列的时候,只是把一个文件描述符入队列而已,这也就是为何bio_job结构体会设计的如此简单。但是lasy free的参数是free_args,是一个数组,注意区分。


3. Aof引用Bio


Aof 持久化是redis的两大持久化方式之一,其会以字符串的形式把对redis的每个操做都先记录在内存的一个buffer中,而后写入文件,而且在适当的时间使用fsync将数据刷入磁盘,而调用fsync的其中一种方式就是使用上面介绍的bio系统其使用的方式遵循了上面说的三个步骤


首先,在server.c中的main函数里面,有一个InitServerLast函数,其内部调用了bioInit函数,完成了bio系统的初始化,这样,相关的队列结构被创建,后台线程也被建立了。在redis主循环被启动之后,会进入持久化的时机,调用flushAppendOnlyFile函数,完成aof持久化工做。这个函数会处理aof相关的配置以及优化等各种问题,在本文只关注对bio系统的使用,其相关代码如下:


void flushAppendOnlyFile(int force) {
    int sync_in_progress = 0;
    ...
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
       sync_in_progress = aofFsyncInProgress();
    ...
    /* Perform the fsync if needed. */
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        ....
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) {
            //后台执行fsync
            aof_background_fsync(server.aof_fd);
            server.aof_fsync_offset = server.aof_current_size;
        }
        server.aof_last_fsync = server.unixtime;
    }
    ...
}
//如果一个AOf fsync当前已经在Bio后台线程中执行,返回true
int aofFsyncInProgress(void) {
    return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
}
//启动后台线程 执行aof的fsync操作
void aof_background_fsync(int fd) {
    //这里就是调用Bio的创建异步任务的核心函数 发起入队操作
    bioCreateFsyncJob(fd);
}


能够看到,其经过bioPendingJobsOfType来检查当前队列处理的状况,如果返回false代表没有Bio线程在处理,那么会调用aof_background_fsync函数,而这个函数又会调用bioCreateFsyncJob来将aof任务加入队列。因为在前面已经完成了线程的建立,在队列中有任务的时候,线程就会启动,而且经过上面讲的redis_fsync函数完成持久化操做。


4. 大Key删除用Bio(Bio新增lazy free)


大Key删除或者flushdb清空数据库等操作性能非常低下,而且会阻塞主线程。目前有一种方案就是渐进式rehash,但是Redis作者并没有考虑渐进式rehash方案,而采用在Bio中新增一个成员lazyfree。为何没有采用渐进式rehash方案呢?作者说当我们删除一个集合的时候,可能删除集合中元素的速度尚未客户端向集合中添加元素的速度快,那我们删除的工做看起来是永远也没法完成了。


为了解决以上问题, redis 4.0 引入了lazyfree的机制,它能够将删除键或数据库的操做放在后台线程里执行, 从而尽量地避免服务器阻塞。


lazyfree的原理不难想象,就是在删除对象时只是进行逻辑删除,而后把对象丢给后台,让后台线程去执行真正的删除,避免因为对象体积过大而形成阻塞。


源码:

源码太多 影响到阅读了 这里只是把流程串起来哈
使用unlink删除大key 逻辑是:
void unlinkCommand(client *c) {
    delGenericCommand(c, 1);
}
delGenericCommand->dbAsyncDelete->freeObjAsync->bioCreateLazyFreeJob
看到了吧 最后调用的就是Bio系统中的bioCreateLazyFreeJob去创建任务并且入队列


5. 小结


能看到这里我相信您对这篇文章很感兴趣了,写这篇文章也是鼓足了很大勇气,因为要分析很多源码并且贴出来,第一得花大量时间去研究,第二贴出来害怕占用空间影响阅读,这俩对于我来说是挑战,但是对于你们读者来说只要能接受并且好好消化掉也是挑战。但是我相信只要付出,总有柳暗花明的那一天。加油。


Bio设计模式-基于锁和共享变量实现的多线程生产者消费者模型大家应该记住,这些也都是咱们在系统设计和编码中值得学习和借鉴的地方。最后,再次感谢坚持到最后的读者!


参考:

https://github.com/redis/redis/blob/c1718f9d862267bc44b2a326cdc8cb1ca5b81a39/src/lazyfree.c

https://github.com/redis/redis/blob/64f6159646337b4a3b56a400522ad4d028d55dac/src/db.c

https://github.com/redis/redis/blob/7ff7536e2c55a8a624eb52ffc35c08441425e683/src/bio.c

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
监控 NoSQL 安全
如何在 Redis 中正确使用多线程?
【10月更文挑战第16天】正确使用 Redis 多线程需要综合考虑多个因素,并且需要在实践中不断摸索和总结经验。通过合理的配置和运用,多线程可以为 Redis 带来性能上的提升,同时也要注意避免可能出现的问题,以保障系统的稳定和可靠运行。
77 2
|
3月前
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
84 1
|
19天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
21天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
51 1
|
19天前
|
缓存 安全 Java
【JavaEE】——单例模式引起的多线程安全问题:“饿汉/懒汉”模式,及解决思路和方法(面试高频)
单例模式下,“饿汉模式”,“懒汉模式”,单例模式下引起的线程安全问题,解锁思路和解决方法
|
3月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
28 1
|
2月前
|
消息中间件 NoSQL Redis
【赵渝强老师】Redis消息的生产者消费者模式
消息队列在Redis中可通过List数据结构实现,支持发布者订阅者和生产者消费者两种模式。生产者通过`lpush`向List添加消息,消费者通过`rpop`或`brpop`消费消息,后者支持阻塞等待。示例代码展示了如何使用Redis的生产者消费者模式。
|
3月前
|
存储 运维 NoSQL
Redis为什么最开始被设计成单线程而不是多线程
总之,Redis采用单线程设计是基于对系统特性的深刻洞察和权衡的结果。这种设计不仅保持了Redis的高性能,还确保了其代码的简洁性、可维护性以及部署的便捷性,使之成为众多应用场景下的首选数据存储解决方案。
48 1
|
3月前
|
存储 缓存 NoSQL
大数据-46 Redis 持久化 RDB AOF 配置参数 混合模式 具体原理 触发方式 优点与缺点
大数据-46 Redis 持久化 RDB AOF 配置参数 混合模式 具体原理 触发方式 优点与缺点
81 1
|
3月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
68 1