redis aof持久化

redis aof缓存数据结构

 redis用于存储aof内存数据的数据结构是aof_buf数据结构,所有数据先追加到内存的aof_buf后,再通过定时任务检查是否能够持久化到磁盘文件当中。

struct redisServer {
    // AOF 缓冲区
    sds aof_buf;      /* AOF buffer, written before entering the event loop */


redis aof内存化

 redis aof内存化的操作主要有以下三部曲:

  • 执行redis命令后开始保存数据至内存当中的aof_buf当中
  • 将执行的命令解析成redis的命令格式// 例如 $3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nVALUE\r\n
  • 保存数据至aof_buf当中
/* 
 * 将指定命令(以及执行该命令的上下文,比如数据库 id 等信息)传播到 AOF 和 slave 。
  * FLAG 可以是以下标识的 xor :
 * + REDIS_PROPAGATE_NONE (no propagation of command at all)
 *   不传播
 * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
 *   传播到 AOF
 * + REDIS_PROPAGATE_REPL (propagate into the replication link)
 *   传播到 slave
 */
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    // 传播到 AOF
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
}



 执行的操作主要是解析成redis的命令格式并保存到内存的aof_buf当中。

/*
 * 将命令追加到 AOF 文件中,
 * 如果 AOF 重写正在进行,那么也将命令追加到 AOF 重写缓存中。
 */
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];

    /* 
     * 使用 SELECT 命令,显式设置数据库,确保之后的命令被设置到正确的数据库
     */
    if (dictid != server.aof_selected_db) {
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);

        server.aof_selected_db = dictid;
    }

    // EXPIRE 、 PEXPIRE 和 EXPIREAT 命令
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT 
         *
         * 将 EXPIRE 、 PEXPIRE 和 EXPIREAT 都翻译成 PEXPIREAT
         */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);

    // SETEX 和 PSETEX 命令
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT 
         *
         * 将两个命令都翻译成 SET 和 PEXPIREAT
         */

        // SET
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);

        // PEXPIREAT
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);

    // 其他命令
    } else {
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

    /* 
     * 将命令追加到 AOF 缓存中,
     * 在重新进入事件循环之前,这些命令会被冲洗到磁盘上,
     * 并向客户端返回一个回复。
     */
    if (server.aof_state == REDIS_AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

    /* 
     * 如果 BGREWRITEAOF 正在进行,
     * 那么我们还需要将命令追加到重写缓存中,
     * 从而记录当前正在重写的 AOF 文件和数据库当前状态的差异。
     */
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

    // 释放
    sdsfree(buf);
}



 解析成redis的命令格式:例如 $3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nVALUE\r\n

/*
 * 根据传入的命令和命令参数,将它们还原成协议格式。
 */
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
    char buf[32];
    int len, j;
    robj *o;

    // 重建命令的个数,格式为 *<count>\r\n
    // 例如 *3\r\n
    buf[0] = '*';
    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
    buf[len++] = '\r';
    buf[len++] = '\n';
    dst = sdscatlen(dst,buf,len);

    // 重建命令和命令参数,格式为 $<length>\r\n<content>\r\n
    // 例如 $3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nVALUE\r\n
    for (j = 0; j < argc; j++) {
        o = getDecodedObject(argv[j]);

        // 组合 $<length>\r\n
        buf[0] = '$';
        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
        buf[len++] = '\r';
        buf[len++] = '\n';
        dst = sdscatlen(dst,buf,len);

        // 组合 <content>\r\n
        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
        dst = sdscatlen(dst,"\r\n",2);

        decrRefCount(o);
    }

    // 返回重建后的协议内容
    return dst;
}


redis aof持久化

 serverCron内部定期执行flushAppendOnlyFile,这里的if判断是判断是否延迟执行,暂且忽略这个判断而认为每次都会进行aof持久化。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // 根据 AOF 政策,
    // 考虑是否需要将 AOF 缓冲区中的内容写入到 AOF 文件中
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
}



 执行aof内存数据持久化的过程,考虑异常情况的情况主要分为以下步骤:

  • 写入aof_buf数据到aof_fd代表的aof持久化文件当中
  • 处理aof写入异常的情况,尝试修复失败后会移除失败的命令
  • 更新aof相关的统计参数
  • 如果aof_buf数据过大那么就情况aof_buf的内容
void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;

    // 缓冲区中没有任何内容,直接返回
    if (sdslen(server.aof_buf) == 0) return;

    // 策略为每秒 FSYNC 
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        // 是否有 SYNC 正在后台进行?
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

    // 每秒 fsync ,并且强制写入为假
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {

        /* 
         * 当 fsync 策略为每秒钟一次时, fsync 在后台执行。
         *
         * 如果后台仍在执行 FSYNC ,那么我们可以延迟写操作一两秒
         * (如果强制执行 write 的话,服务器主线程将阻塞在 write 上面)
         */
        if (sync_in_progress) {

            // 有 fsync 正在后台进行 。。。

            if (server.aof_flush_postponed_start == 0) {
                /* 
                 * 前面没有推迟过 write 操作,这里将推迟写操作的时间记录下来
                 * 然后就返回,不执行 write 或者 fsync
                 */
                server.aof_flush_postponed_start = server.unixtime;
                return;

            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* 
                 * 如果之前已经因为 fsync 而推迟了 write 操作
                 * 但是推迟的时间不超过 2 秒,那么直接返回
                 * 不执行 write 或者 fsync
                 */
                return;

            }

            /* 
             * 如果后台还有 fsync 在执行,并且 write 已经推迟 >= 2 秒
             * 那么执行写操作(write 将被阻塞)
             */
            server.aof_delayed_fsync++;
        }
    }

    /* 
     * 执行到这里,程序会对 AOF 文件进行写入。
     *
     * 清零延迟 write 的时间记录
     */
    server.aof_flush_postponed_start = 0;

    /* 
     * 执行单个 write 操作,如果写入设备是物理的话,那么这个操作应该是原子的
     *
     * 当然,如果出现像电源中断这样的不可抗现象,那么 AOF 文件也是可能会出现问题的
     * 这时就要用 redis-check-aof 程序来进行修复。
     */
    nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    if (nwritten != (signed)sdslen(server.aof_buf)) {

        static time_t last_write_error_log = 0;
        int can_log = 0;

        // 将日志的记录频率限制在每行 AOF_WRITE_LOG_ERROR_RATE 秒
        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
            can_log = 1;
            last_write_error_log = server.unixtime;
        }

        // 如果写入出错,那么尝试将该情况写入到日志里面
        if (nwritten == -1) {
            if (can_log) {
                redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",
                    strerror(errno));
                server.aof_last_write_errno = errno;
            }
        } else {
            if (can_log) {
                redisLog(REDIS_WARNING,"Short write while writing to "
                                       "the AOF file: (nwritten=%lld, "
                                       "expected=%lld)",
                                       (long long)nwritten,
                                       (long long)sdslen(server.aof_buf));
            }

            // 尝试移除新追加的不完整内容
            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                if (can_log) {
                    redisLog(REDIS_WARNING, "Could not remove short write "
                             "from the append-only file.  Redis may refuse "
                             "to load the AOF the next time it starts.  "
                             "ftruncate: %s", strerror(errno));
                }
            } else {
                /* If the ftrunacate() succeeded we can set nwritten to
                 * -1 since there is no longer partial data into the AOF. */
                nwritten = -1;
            }
            server.aof_last_write_errno = ENOSPC;
        }

        // 处理写入 AOF 文件时出现的错误
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
            exit(1);
        } else {
            server.aof_last_write_status = REDIS_ERR;

            /* Trim the sds buffer if there was a partial write, and there
             * was no way to undo it with ftruncate(2). */
            if (nwritten > 0) {
                server.aof_current_size += nwritten;
                sdsrange(server.aof_buf,nwritten,-1);
            }
            return; /* We'll try again on the next call... */
        }
    } else {
        /* Successful write(2). If AOF was in error state, restore the
         * OK state and log the event. */
        // 写入成功,更新最后写入状态
        if (server.aof_last_write_status == REDIS_ERR) {
            redisLog(REDIS_WARNING,
                "AOF write error looks solved, Redis can write again.");
            server.aof_last_write_status = REDIS_OK;
        }
    }

    // 更新写入后的 AOF 文件大小
    server.aof_current_size += nwritten;

    /* 
     * 如果 AOF 缓存的大小足够小的话,那么重用这个缓存,
     * 否则的话,释放 AOF 缓存。
     */
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        // 清空缓存中的内容,等待重用
        sdsclear(server.aof_buf);
    } else {
        // 释放缓存
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }

    /* 
     * 如果 no-appendfsync-on-rewrite 选项为开启状态,
     * 并且有 BGSAVE 或者 BGREWRITEAOF 正在进行的话,
     * 那么不执行 fsync 
     */
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
            return;

    // 总是执行 fsnyc
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
         aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */

        // 更新最后一次执行 fsnyc 的时间
        server.aof_last_fsync = server.unixtime;

    // 策略为每秒 fsnyc ,并且距离上次 fsync 已经超过 1 秒
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        // 放到后台执行
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        // 更新最后一次执行 fsync 的时间
        server.aof_last_fsync = server.unixtime;
    }
}
上一篇:一张图看懂DTS数据同步最新价格调整


下一篇:java调用com组件将office文件转换成pdf (同发csdn)