#ifndef __BIO_H #define __BIO_H /* Exported API */ 供调用的API void bioInit(void); 后台IO初始化 void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3); 创建后台IO任务 unsigned long long bioPendingJobsOfType(int type); 根据类型挂起后台任务 unsigned long long bioWaitStepOfType(int type); 根据类型等待任务 time_t bioOlderJobOfType(int type); void bioKillThreads(void);停止IO线程 /* Background job opcodes */ 后台作业操作码 #define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */ 延迟进行关闭的系统调用 #define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */ 延迟将内存中的已修改的数据保存到存储设备 #define BIO_LAZY_FREE 2 /* Deferred objects freeing. */ 延迟对象释放 #define BIO_NUM_OPS 3 #endif ****************************************************************************************** /* Background I/O service for Redis. Redis的后台I/O服务 * * This file implements operations that we need to perform in the background. * Currently there is only a single operation, that is a background close(2) * system call. This is needed as when the process is the last owner of a * reference to a file closing it means unlinking it, and the deletion of the * file is slow, blocking the server. 这个文件实现了那些我们需要在后台执行的操作。当前只实现了一个操作,就是后台关闭的系统调用。 当进程是最后一个该文件的拥有者,当关闭该进程时意味着失去该文件的联系,直接删除该文件比较慢,会阻塞服务, 这行情况下就需要这个后台关闭系统调用。 * In the future we'll either continue implementing new things we need or * we'll switch to libeio. However there are probably long term uses for this * file as we may want to put here Redis specific background tasks (for instance * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL * implementation). 将来我们或者会继续实现我们需要的新接口,或者转而使用libeio库。然而很长一个时期我们将使用这个文件 因为我们可能希望在这里放置特定于Redis的后台任务(例如,我们不可能需要非阻塞FLUSHDB/FLUSHALL实现) * DESIGN 设计 * ------ * * The design is trivial, we have a structure representing a job to perform * and a different thread and job queue for every job type. 这个设计很平凡,我们有一个表示要执行的作业的结构,每个作业类型有一个不同的线程和作业队列。 * Every thread waits for new jobs in its queue, and process every job * sequentially. 每个线程在其队列中等待新作业,并按顺序处理每个作业 * Jobs of the same type are guaranteed to be processed from the least * recently inserted to the most recently inserted (older jobs processed * first). 同一类型的作业保证先进先出(先处理较旧的作业) * Currently there is no way for the creator of the job to be notified about * the completion of the operation, this will only be added when/if needed. 目前无法通知任务创建者操作已完成,如果有有需要可以添加 #include "server.h" #include "bio.h" static pthread_t bio_threads[BIO_NUM_OPS]; io线程数组 static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; 线程互斥锁数组 static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; 新任务条件变量数组 static pthread_cond_t bio_step_cond[BIO_NUM_OPS]; 步骤的条件变量数组 static list *bio_jobs[BIO_NUM_OPS]; 任务列表数组 /* The following array is used to hold the number of pending jobs for every * OP type. This allows us to export the bioPendingJobsOfType() API that is * useful when the main thread wants to perform some operation that may involve * objects shared with the background thread. The main thread will just wait * that there are no longer jobs of this type to be executed before performing * the sensible operation. This data is also useful for reporting. */ 下面这个数组是用来保持每种不同类型等待任务的数量。 这允许我们使用API bioPendingJobsOfType,当主线程想要执行一些可能涉及与后台线程共享的对象的操作时,该API非常有用。 static unsigned long long bio_pending[BIO_NUM_OPS]; 保存每种类型等待任务数量的数组 /* This structure represents a background Job. It is only used locally to this * file as the API does not expose the internals at all. */ 这个结构表示一个后台任务。它只是用在这个文件api中,不对外暴露 struct bio_job { time_t time; /* Time at which the job was created. */ 任务创建的时间 /* Job specific arguments pointers. If we need to pass more than three * arguments we can just pass a pointer to a structure or alike. */ 任务特定的参数指针。如果我们需要传入超过三个参数,我们可以传入一个指向结构或者类似结构的指针 void *arg1, *arg2, *arg3; }; void *bioProcessBackgroundJobs(void *arg); void lazyfreeFreeObjectFromBioThread(robj *o); void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2); void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl); /* Make sure we have enough stack to perform all the things we do in the * main thread. */ 确保我们有足够的堆栈来执行我们在主线程中所做的所有事情。 #define REDIS_THREAD_STACK_SIZE (1024*1024*4) /* Initialize the background system, spawning the thread. */ 初始化后台系统,生成线程 void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; int j; /* Initialization of state vars and objects */ 初始化状态变量和对象 for (j = 0; j < BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); 初始化锁 pthread_cond_init(&bio_newjob_cond[j],NULL); 新任务条件变量 pthread_cond_init(&bio_step_cond[j],NULL); 步骤条件变量 bio_jobs[j] = listCreate(); 任务列表 bio_pending[j] = 0; } /* Set the stack size as by default it may be small in some system */ 设置栈的默认大小(4M),因为在一些系统中栈可能太小 pthread_attr_init(&attr); 初始化线程属性 pthread_attr_getstacksize(&attr,&stacksize); 获取线程栈的大小 if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ 吐槽,这个世界到处都是Solaris系统的补丁 while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; 如果线程的栈空间小于4M,那么一直翻倍,直到大于等于4M为止 pthread_attr_setstacksize(&attr, stacksize); 重置线程栈空间 /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is * responsible of. */ 准备生产我们的线程,我们使用接受单个参数的线程函数,目的是为了传递任务线程负责的任务ID. for (j = 0; j < BIO_NUM_OPS; j++) { void *arg = (void*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { 创建新线程 serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } bio_threads[j] = thread; 获取新线程ID } } 创建单个后台任务 void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { struct bio_job *job = zmalloc(sizeof(*job)); job->time = time(NULL); job->arg1 = arg1; job->arg2 = arg2; job->arg3 = arg3; pthread_mutex_lock(&bio_mutex[type]); 针对这个类型加线程互斥锁 listAddNodeTail(bio_jobs[type],job); 将创建的新任务挂到对应类型的任务列表上 bio_pending[type]++; 该类型挂起任务+1 pthread_cond_signal(&bio_newjob_cond[type]);给该种类型的发送信号,解除阻塞 pthread_mutex_unlock(&bio_mutex[type]); 解锁,这样可以继续执行 } 处理后台任务 void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; unsigned long type = (unsigned long) arg; sigset_t sigset; /* Check that the type is within the right interval. */ 检查类型是否在规定的范围内 if (type >= BIO_NUM_OPS) { 只有3种类型,超过就是有问题 serverLog(LL_WARNING, "Warning: bio thread started with wrong type %lu",type); return NULL; } switch (type) { case BIO_CLOSE_FILE: redis_set_thread_title("bio_close_file"); break; case BIO_AOF_FSYNC: redis_set_thread_title("bio_aof_fsync"); break; case BIO_LAZY_FREE: redis_set_thread_title("bio_lazy_free"); break; } redisSetCpuAffinity(server.bio_cpulist); 设置亲和CPU /* Make the thread killable at any time, so that bioKillThreads() * can work reliably. */ 确认线程可以在任何时候被停止,这样函数bioKillThreads就能可靠的工作 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); 提出请求。线程在取消请求(pthread_cancel)发出后会继续运行,直到到达某个取消点(CancellationPoint) pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); 设置本线程取消动作的执行时机,立即执行取消动作(退出) pthread_mutex_lock(&bio_mutex[type]); /* Block SIGALRM so we are sure that only the main thread will * receive the watchdog signal. */ 阻止SIGALRM,这样我们就可以确保只有主线程才会收到看门狗信号 sigemptyset(&sigset);清空信号集 sigaddset(&sigset, SIGALRM);添加警告信号 if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); while(1) { listNode *ln; /* The loop always starts with the lock hold. */ 循环开始的时候需要锁住 if (listLength(bio_jobs[type]) == 0) { 如果没有挂起的任务,就开始等待 pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); 设置等待条件(这里函数pthread_cond_wait会释放锁) continue; 继续循环,直到等待条件满足 } /* Pop the job from the queue. */ 从挂起队列中弹出任务 ln = listFirst(bio_jobs[type]); job = ln->value; /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ 现在可以解锁后台系统,因为我们知道有一个独立的作业结构要处理 pthread_mutex_unlock(&bio_mutex[type]);解锁 /* Process the job accordingly to its type. */ 根据任务类型处理任务 if (type == BIO_CLOSE_FILE) { close((long)job->arg1); } else if (type == BIO_AOF_FSYNC) { redis_fsync((long)job->arg1); } else if (type == BIO_LAZY_FREE) { /* What we free changes depending on what arguments are set: 我们释放的内容取决于设置的参数 * arg1 -> free the object at pointer. 参数1 在指针处释放对象 * arg2 & arg3 -> free two dictionaries (a Redis DB). 参数2和3 释放两个字典(一个redis数据库) * only arg3 -> free the skiplist. */ 只有参数3 释放跳表 if (job->arg1) lazyfreeFreeObjectFromBioThread(job->arg1); else if (job->arg2 && job->arg3) lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3); else if (job->arg3) lazyfreeFreeSlotsMapFromBioThread(job->arg3); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } zfree(job); 处理了任务,就可以释放了 /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ 在进入循环迭代之前再次锁住,如果没有挂起的任务需要处理,我们将在函数pthread_cond_wait中再次阻塞 pthread_mutex_lock(&bio_mutex[type]); listDelNode(bio_jobs[type],ln); 删除节点 bio_pending[type]--;挂起任务减少1个 /* Unblock threads blocked on bioWaitStepOfType() if any. */ 解除在函数bioWaitStepOfType中阻塞的线程 pthread_cond_broadcast(&bio_step_cond[type]); } } /* Return the number of pending jobs of the specified type. */ 返回特定类型挂起任务的数量 unsigned long long bioPendingJobsOfType(int type) { unsigned long long val; pthread_mutex_lock(&bio_mutex[type]); 先锁住,不锁住值会变化 val = bio_pending[type]; 再获取值 pthread_mutex_unlock(&bio_mutex[type]); 解锁 return val; } /* If there are pending jobs for the specified type, the function blocks * and waits that the next job was processed. Otherwise the function * does not block and returns ASAP. 如果存在指定类型的挂起任务,函数阻塞和等待下个任务的处理。否则不阻塞并且尽快返回。 * The function returns the number of jobs still to process of the * requested type. 函数返回仍要处理的请求类型的作业数 * This function is useful when from another thread, we want to wait * a bio.c thread to do more work in a blocking way. 当我们从另一个线程等待bio.c线程以阻塞方式执行更多工作时,此函数非常有用 */ unsigned long long bioWaitStepOfType(int type) { unsigned long long val; pthread_mutex_lock(&bio_mutex[type]); 锁定 val = bio_pending[type]; if (val != 0) { pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]); 等待步骤的条件变量 val = bio_pending[type]; } pthread_mutex_unlock(&bio_mutex[type]); 解锁 return val; } /* Kill the running bio threads in an unclean way. This function should be * used only when it's critical to stop the threads for some reason. * Currently Redis does this only on crash (for instance on SIGSEGV) in order * to perform a fast memory check without other threads messing with memory. */ 用粗鲁的方式终止运行的bio线程。仅当出于某种原因停止线程非常迫切时,才应使用此函数(这个函数是迫不得已而用的) 目前,Redis仅在崩溃时(例如在SIGSEGV上)执行此操作,以便执行快速内存检查,而不让其他线程干扰内存 void bioKillThreads(void) { int err, j; for (j = 0; j < BIO_NUM_OPS; j++) { if (bio_threads[j] && pthread_cancel(bio_threads[j]) == 0) { 取消线程 if ((err = pthread_join(bio_threads[j],NULL)) != 0) { 等待线程结束 serverLog(LL_WARNING, "Bio thread for job type #%d can be joined: %s", j, strerror(err)); } else { serverLog(LL_WARNING, "Bio thread for job type #%d terminated",j); } } } } ***************************************************************************************************