redis6.0.5之BIO阅读笔记-后台IO操作

#ifndef __BIO_H
#define __BIO_H

/* Exported API */  供调用的API
void bioInit(void); 后台IO初始化
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3); 创建后台IO任务
unsigned long long bioPendingJobsOfType(int type); 根据类型挂起后台任务
unsigned long long bioWaitStepOfType(int type); 根据类型等待任务
time_t bioOlderJobOfType(int type);
void bioKillThreads(void);停止IO线程

/* Background job opcodes */  后台作业操作码
#define BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */ 延迟进行关闭的系统调用
#define BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */ 延迟将内存中的已修改的数据保存到存储设备
#define BIO_LAZY_FREE     2 /* Deferred objects freeing. */ 延迟对象释放
#define BIO_NUM_OPS       3

#endif
******************************************************************************************

/* Background I/O service for Redis. Redis的后台I/O服务
 *
 * This file implements operations that we need to perform in the background.
 * Currently there is only a single operation, that is a background close(2)
 * system call. This is needed as when the process is the last owner of a
 * reference to a file closing it means unlinking it, and the deletion of the
 * file is slow, blocking the server.
这个文件实现了那些我们需要在后台执行的操作。当前只实现了一个操作,就是后台关闭的系统调用。
当进程是最后一个该文件的拥有者,当关闭该进程时意味着失去该文件的联系,直接删除该文件比较慢,会阻塞服务,
这行情况下就需要这个后台关闭系统调用。
 * In the future we'll either continue implementing new things we need or
 * we'll switch to libeio. However there are probably long term uses for this
 * file as we may want to put here Redis specific background tasks (for instance
 * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
 * implementation).
将来我们或者会继续实现我们需要的新接口,或者转而使用libeio库。然而很长一个时期我们将使用这个文件
因为我们可能希望在这里放置特定于Redis的后台任务(例如,我们不可能需要非阻塞FLUSHDB/FLUSHALL实现)

 * DESIGN 设计
 * ------
 *
 * The design is trivial, we have a structure representing a job to perform
 * and a different thread and job queue for every job type.
这个设计很平凡,我们有一个表示要执行的作业的结构,每个作业类型有一个不同的线程和作业队列。
 * Every thread waits for new jobs in its queue, and process every job
 * sequentially.
每个线程在其队列中等待新作业,并按顺序处理每个作业
 * Jobs of the same type are guaranteed to be processed from the least
 * recently inserted to the most recently inserted (older jobs processed
 * first).
同一类型的作业保证先进先出(先处理较旧的作业)
 * Currently there is no way for the creator of the job to be notified about
 * the completion of the operation, this will only be added when/if needed.
目前无法通知任务创建者操作已完成,如果有有需要可以添加
 
#include "server.h"
#include "bio.h"

static pthread_t bio_threads[BIO_NUM_OPS];  io线程数组
static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; 线程互斥锁数组
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; 新任务条件变量数组
static pthread_cond_t bio_step_cond[BIO_NUM_OPS]; 步骤的条件变量数组
static list *bio_jobs[BIO_NUM_OPS]; 任务列表数组

/* The following array is used to hold the number of pending jobs for every
 * OP type. This allows us to export the bioPendingJobsOfType() API that is
 * useful when the main thread wants to perform some operation that may involve
 * objects shared with the background thread. The main thread will just wait
 * that there are no longer jobs of this type to be executed before performing
 * the sensible operation. This data is also useful for reporting. */
下面这个数组是用来保持每种不同类型等待任务的数量。
这允许我们使用API bioPendingJobsOfType,当主线程想要执行一些可能涉及与后台线程共享的对象的操作时,该API非常有用。
static unsigned long long bio_pending[BIO_NUM_OPS]; 保存每种类型等待任务数量的数组

/* This structure represents a background Job. It is only used locally to this
 * file as the API does not expose the internals at all. */
这个结构表示一个后台任务。它只是用在这个文件api中,不对外暴露
struct bio_job {
    time_t time; /* Time at which the job was created. */ 任务创建的时间
    /* Job specific arguments pointers. If we need to pass more than three
     * arguments we can just pass a pointer to a structure or alike. */
任务特定的参数指针。如果我们需要传入超过三个参数,我们可以传入一个指向结构或者类似结构的指针
    void *arg1, *arg2, *arg3;
};

void *bioProcessBackgroundJobs(void *arg);
void lazyfreeFreeObjectFromBioThread(robj *o);
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2);
void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl);

/* Make sure we have enough stack to perform all the things we do in the
 * main thread. */
确保我们有足够的堆栈来执行我们在主线程中所做的所有事情。
#define REDIS_THREAD_STACK_SIZE (1024*1024*4)

/* Initialize the background system, spawning the thread. */
初始化后台系统,生成线程
void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

    /* Initialization of state vars and objects */ 初始化状态变量和对象
    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL); 初始化锁
        pthread_cond_init(&bio_newjob_cond[j],NULL); 新任务条件变量
        pthread_cond_init(&bio_step_cond[j],NULL); 步骤条件变量
        bio_jobs[j] = listCreate();  任务列表
        bio_pending[j] = 0;
    }

    /* Set the stack size as by default it may be small in some system */
    设置栈的默认大小(4M),因为在一些系统中栈可能太小
    pthread_attr_init(&attr); 初始化线程属性
    pthread_attr_getstacksize(&attr,&stacksize); 获取线程栈的大小
    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ 吐槽,这个世界到处都是Solaris系统的补丁
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; 如果线程的栈空间小于4M,那么一直翻倍,直到大于等于4M为止
    pthread_attr_setstacksize(&attr, stacksize); 重置线程栈空间

    /* Ready to spawn our threads. We use the single argument the thread
     * function accepts in order to pass the job ID the thread is
     * responsible of. */
准备生产我们的线程,我们使用接受单个参数的线程函数,目的是为了传递任务线程负责的任务ID.
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { 创建新线程
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread; 获取新线程ID
    }
}

创建单个后台任务
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]); 针对这个类型加线程互斥锁
    listAddNodeTail(bio_jobs[type],job); 将创建的新任务挂到对应类型的任务列表上
    bio_pending[type]++; 该类型挂起任务+1
    pthread_cond_signal(&bio_newjob_cond[type]);给该种类型的发送信号,解除阻塞
    pthread_mutex_unlock(&bio_mutex[type]); 解锁,这样可以继续执行
}

处理后台任务
void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    /* Check that the type is within the right interval. */
    检查类型是否在规定的范围内
    if (type >= BIO_NUM_OPS) { 只有3种类型,超过就是有问题
        serverLog(LL_WARNING,
            "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }

    switch (type) {
    case BIO_CLOSE_FILE:
        redis_set_thread_title("bio_close_file");
        break;
    case BIO_AOF_FSYNC:
        redis_set_thread_title("bio_aof_fsync");
        break;
    case BIO_LAZY_FREE:
        redis_set_thread_title("bio_lazy_free");
        break;
    }

    redisSetCpuAffinity(server.bio_cpulist); 设置亲和CPU

    /* Make the thread killable at any time, so that bioKillThreads()
     * can work reliably. */ 确认线程可以在任何时候被停止,这样函数bioKillThreads就能可靠的工作
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); 
提出请求。线程在取消请求(pthread_cancel)发出后会继续运行,直到到达某个取消点(CancellationPoint)
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); 设置本线程取消动作的执行时机,立即执行取消动作(退出)

    pthread_mutex_lock(&bio_mutex[type]);
    /* Block SIGALRM so we are sure that only the main thread will
     * receive the watchdog signal. */
阻止SIGALRM,这样我们就可以确保只有主线程才会收到看门狗信号
    sigemptyset(&sigset);清空信号集
    sigaddset(&sigset, SIGALRM);添加警告信号
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        serverLog(LL_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

    while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */ 循环开始的时候需要锁住
        if (listLength(bio_jobs[type]) == 0) { 如果没有挂起的任务,就开始等待
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); 设置等待条件(这里函数pthread_cond_wait会释放锁)
            continue; 继续循环,直到等待条件满足
        }
        /* Pop the job from the queue. */ 从挂起队列中弹出任务
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
现在可以解锁后台系统,因为我们知道有一个独立的作业结构要处理
        pthread_mutex_unlock(&bio_mutex[type]);解锁

        /* Process the job accordingly to its type. */ 根据任务类型处理任务
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            redis_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            /* What we free changes depending on what arguments are set:  
            我们释放的内容取决于设置的参数
             * arg1 -> free the object at pointer. 参数1 在指针处释放对象
             * arg2 & arg3 -> free two dictionaries (a Redis DB). 参数2和3 释放两个字典(一个redis数据库)
             * only arg3 -> free the skiplist. */ 只有参数3  释放跳表
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job); 处理了任务,就可以释放了

        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
在进入循环迭代之前再次锁住,如果没有挂起的任务需要处理,我们将在函数pthread_cond_wait中再次阻塞
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln); 删除节点
        bio_pending[type]--;挂起任务减少1个

        /* Unblock threads blocked on bioWaitStepOfType() if any. */
        解除在函数bioWaitStepOfType中阻塞的线程
        pthread_cond_broadcast(&bio_step_cond[type]);
    }
}

/* Return the number of pending jobs of the specified type. */
返回特定类型挂起任务的数量
unsigned long long bioPendingJobsOfType(int type) {
    unsigned long long val;
    pthread_mutex_lock(&bio_mutex[type]); 先锁住,不锁住值会变化
    val = bio_pending[type]; 再获取值
    pthread_mutex_unlock(&bio_mutex[type]); 解锁
    return val;
}

/* If there are pending jobs for the specified type, the function blocks
 * and waits that the next job was processed. Otherwise the function
 * does not block and returns ASAP.
如果存在指定类型的挂起任务,函数阻塞和等待下个任务的处理。否则不阻塞并且尽快返回。
 * The function returns the number of jobs still to process of the
 * requested type.
函数返回仍要处理的请求类型的作业数
 * This function is useful when from another thread, we want to wait
 * a bio.c thread to do more work in a blocking way.
当我们从另一个线程等待bio.c线程以阻塞方式执行更多工作时,此函数非常有用
 */
unsigned long long bioWaitStepOfType(int type) {
    unsigned long long val;
    pthread_mutex_lock(&bio_mutex[type]); 锁定
    val = bio_pending[type];
    if (val != 0) {
        pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]); 等待步骤的条件变量
        val = bio_pending[type];
    }
    pthread_mutex_unlock(&bio_mutex[type]); 解锁
    return val;
}

/* Kill the running bio threads in an unclean way. This function should be
 * used only when it's critical to stop the threads for some reason.
 * Currently Redis does this only on crash (for instance on SIGSEGV) in order
 * to perform a fast memory check without other threads messing with memory. */
用粗鲁的方式终止运行的bio线程。仅当出于某种原因停止线程非常迫切时,才应使用此函数(这个函数是迫不得已而用的)
目前,Redis仅在崩溃时(例如在SIGSEGV上)执行此操作,以便执行快速内存检查,而不让其他线程干扰内存
void bioKillThreads(void) {
    int err, j;

    for (j = 0; j < BIO_NUM_OPS; j++) {
        if (bio_threads[j] && pthread_cancel(bio_threads[j]) == 0) {  取消线程
            if ((err = pthread_join(bio_threads[j],NULL)) != 0) { 等待线程结束
                serverLog(LL_WARNING,
                    "Bio thread for job type #%d can be joined: %s",
                        j, strerror(err));
            } else {
                serverLog(LL_WARNING,
                    "Bio thread for job type #%d terminated",j);
            }
        }
    }
}
***************************************************************************************************

 

上一篇:c++项目中 锁的使用


下一篇:Android手机做无线中继路由器