redis 学习笔记——数据同步、事务

redis主从同步
     redis支持简单易用的主从复制(master-slave replication)功能,该功能也是redis高可用性实现的基础。
 
  • redis复制原理
     redis的节点都会有一个backlog内存缓冲区用于数据同步,其中slave的backlog缓冲区会一直存在,master的backlog缓冲区当master与最后一个slave断开连接一段时间后就会被free掉。
 
     redis的backlog是一个环形缓冲区,feedReplicationBacklog函数由master调用,负责将数据写入到backlog缓冲区中。
///********* redis/src/server.h  ****************
struct redisServer {
.....
    char *repl_backlog;             /* Replication backlog for partial syncs */
    long long repl_backlog_size;    /* Backlog circular buffer size */
    long long repl_backlog_histlen; /* Backlog actual data length */
    long long repl_backlog_idx;     /* Backlog circular buffer current offset,
                                       that is the next byte will'll write to.*/
    long long repl_backlog_off;     /* Replication "master offset" of first
                                       byte in the replication backlog buffer.*/
.....
}
 
///********* redis/src/replication.c  ****************
void feedReplicationBacklog(void *ptr, size_t len) {
    unsigned char *p = ptr;
 
    server.master_repl_offset += len;
 
    /* This is a circular buffer, so write as much data we can at every
     * iteration and rewind the "idx" index if we reach the limit. */
    while(len) {
        size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
        if (thislen > len) thislen = len;
        memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
        server.repl_backlog_idx += thislen;
        if (server.repl_backlog_idx == server.repl_backlog_size)
            server.repl_backlog_idx = 0;
        len -= thislen;
        p += thislen;
        server.repl_backlog_histlen += thislen;
    }
    if (server.repl_backlog_histlen > server.repl_backlog_size)
        server.repl_backlog_histlen = server.repl_backlog_size;
    /* Set the offset of the first byte we have in the backlog. */
    server.repl_backlog_off = server.master_repl_offset -
                              server.repl_backlog_histlen + 1;
}

  

     master当收到写操作时,就会调用replicationFeedSlaves函数将这类操作写入到backlog中同时推送给各个slave。
///********* redis/src/replication.c  ****************
 
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
     .....
 /* Write the command to the replication backlog if any. */
    if (server.repl_backlog) {
        char aux[LONG_STR_SIZE+3];
 
        /* Add the multi bulk reply length. */
        aux[0] = '*';
        len = ll2string(aux+1,sizeof(aux)-1,argc);
        aux[len+1] = '\r';
        aux[len+2] = '\n';
        feedReplicationBacklog(aux,len+3);
 
        for (j = 0; j < argc; j++) {
            long objlen = stringObjectLen(argv[j]);
 
            /* We need to feed the buffer with the object as a bulk reply
             * not just as a plain string, so create the $..CRLF payload len
             * and add the final CRLF */
            aux[0] = '$';
            len = ll2string(aux+1,sizeof(aux)-1,objlen);
            aux[len+1] = '\r';
            aux[len+2] = '\n';
            feedReplicationBacklog(aux,len+3);
            feedReplicationBacklogWithObject(argv[j]);
            feedReplicationBacklog(aux+len+1,2);
        }
    }
     /* Write the command to every slave. */
    listRewind(slaves,&li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;
 
        /* Don't feed slaves that are still waiting for BGSAVE to start */
        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
 
        /* Feed slaves that are waiting for the initial SYNC (so these commands
         * are queued in the output buffer until the initial SYNC completes),
         * or are already in sync with the master. */
 
        /* Add the multi bulk length. */
        addReplyMultiBulkLen(slave,argc);
 
        /* Finally any additional argument that was not stored inside the
         * static buffer if any (from j to argc). */
        for (j = 0; j < argc; j++)
            addReplyBulk(slave,argv[j]);
    }
}
     slave节点向master节点申请数据同步时,会附带一个master_replid(避免master重启,master重启后master_replid 会改变,保证数据源唯一)以及 已接收数据的offset。
char master_replid[CONFIG_RUN_ID_SIZE+1];  /* Master PSYNC runid. */
long long master_initial_offset;           /* Master PSYNC offset. */
    • 如果slave节点是新添加或者重启后的,那么就会将offset设置为-1,发送“PSYNC ? -1”给master,master会返回master_replid 、全局的复制offset。然后slave和master就会进行全量重同步。
    • 如果slave与master短时间(在master的backlog没有free之前)断开连接又重新连接,slave会将已获取数据的offset和master_replid通过PSYNC命令发送过去,master收到后会比较接收到master_replid与自身的server.replid是否相同以及请求的psync_offset是否在master保存的backlog的缓冲区范围内;
///********* redis/src/replication.c  ****************  
 
int masterTryPartialResynchronization(client *c) {
......
if (strcasecmp(master_replid, server.replid) &&
        (strcasecmp(master_replid, server.replid2) ||
         psync_offset > server.second_replid_offset))
    {
        /* Run id "?" is used by slaves that want to force a full resync. */
        if (master_replid[0] != '?') {
            if (strcasecmp(master_replid, server.replid) &&
                strcasecmp(master_replid, server.replid2))
            {
                serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                    "Replication ID mismatch (Slave asked for '%s', my "
                    "replication IDs are '%s' and '%s')",
                    master_replid, server.replid, server.replid2);
            } else {
                serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                    "Requested offset for second ID was %lld, but I can reply "
                    "up to %lld", psync_offset, server.second_replid_offset);
            }
        } else {
            serverLog(LL_NOTICE,"Full resync requested by slave %s",
                replicationGetSlaveName(c));
        }
        goto need_full_resync;
    }
 
    /* We still have the data our slave is asking for? */
    if (!server.repl_backlog ||
        psync_offset < server.repl_backlog_off ||
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
    {
        serverLog(LL_NOTICE,
            "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
        if (psync_offset > server.master_repl_offset) {
            serverLog(LL_WARNING,
                "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
        }
        goto need_full_resync;
    }
    ........
     //进行部分同步
}

  

  • 全量重同步(SYNC)
          这种同步方式一般是在PSYNC执行失败后触发的。SYNC有两种方式:Disk-backed和Diskless。
    • disk-backed:在接到slave的SYNC请求后,会fork一个子进程用来将内存中的数据写入RDB文件,同时会将新来的请求保存在一个临时的内存缓冲区中。待RDB文件完成后,将临时缓冲区的数据与原有的内存数据进行合并并释放临时缓冲区。在写RDB文件的过程中,新来的SYNC请求都会被放到一个队列中,当RDB文件完成后将RDB文件内容发送给队列中的所有slave。
    • diskless:在接收到slave的SYNC的请求后,会等待一段时间(也可以配置不等待),等待过程中新来的SYNC请求也都会被放到等待队列中。master会与等待队列中的slave建立连接,将数据直接发送给这些slave。(这种方式的优点在于不写RDB文件,避免了磁盘I/O开销,提升了效率)
 
  • 部分同步(PSYNC)

master在收到slave发来的PSYNC请求(异常情况上面已经讨论过了,这里不再考虑),master会比较slave发来offset与master当前backlog中的offset,将backlog中比slave多出的数据传输给slave。(注:目前redis 4.0 提出的PSYNC 2.0本人还没有深入研究,回头有时间将相关的知识分享出来)

 
     为了保证数据的安全、一致性,可以通过配置当slave满足一定条件时才进行set操作。因为redis使用异步写的方式复制,master发送的写数据不一定能够被slave接收到。redis有以下特性:
    • slave每秒都会ping master一次,并report 复制的情况
    • master记录各个slave最后一次ping的timestamp
    • 用户配置允许的网络延迟最大值min-slaves-max-lag,以及执行写操作的slave的数量 min-slaves-to-write
     如果min-slaves-to-write个slave的网络延迟低于min-slaves-max-lag,master就会进行写操作,否则就返回客户端写失败。
 
 
redis 事务(transaction)
     事务可以一次执行多个命令,有两个重要的特征:1、事务是一个单独的隔离操作:事务中的所有命令都会序列化、顺序的执行;事务在执行的过程中不会被其他客户端发来的命令中断。2、事务是一个原子操作,即要么执行完毕要么全部都不执行。
     redis的事务涉及到的命令有:MULTI, EXEC, DISCARD, WATCH。
    • EXEC: 负责触发并执行事务中的所有命令。EXEC命令的回复是一个数组,数组中的每一个元素都是执行事务中的命令所产生的回复,其中回复的顺序和命令发送、执行的先后顺序是一致的。当客户端处于事务状态时,所有传入的命令都会返回一个内容为QUEUED的状态回复。
    • MULTI:用于开启一个事务,总是返回OK;MULTI执行之后,客户端可以继续向服务器发送任意多条命令,这些命令被放到一个队列中,不会被立刻执行,当EXEC命令被调用时,队列中的命令依次被执行。
    • DISCARD:清空任务队列,并放弃执行事务,并且客户端从事务状态中退出。
    • WATCH:为redis事务提供check-and-set(CAS)行为;如果一些key被WATCH,那么对这些key的操作会被监视;如果一个被监视的key在EXEC执行前被修改了,那么整个事务都会被取消,EXEC批量返回空回复(null multi-bulk reply)来表示事务已经失败。单个WATCH命令可以监控多个key。
 
  • 事务错误
               使用事务的过程中可能会遇到错误:
    1. 事务在执行EXEC前,入队的命令可能出错。例如,命令出现语法错误或者其他更为严重的错误,诸如内存超过最大限制之类的错误。
    2. 命令可能在EXEC调用后失败。例如,事务中的命令可能处理了错误类型的键导致事务失败。
       对于第一种事务失败,可以检查命令入队列时的返回值,如果发现有命令在入队时失败,那么大部分客户端就会停止并取消这个事务的。服务器会对命令入队失败的情况进行记录,并在客户端调用EXEC命令时,拒绝执行并自动放弃这个事务。对于EXEC命令执行后产生的错误,并没有对对其进行特殊的处理:即使某个/些命令在执行时产生错误,事务中的其他命令依然会继续执行。
     
  • 回滚(roll back)
           redis并不支持操作回滚。1、redis命令的错误只会因为错误的命令(语法)才失败,这些问题是因为编程错误造成的,应该有开发人员来解决。2、不对回滚进行支持,可以是redis内部保持简单、快速。
 
  • 用CAS(check-and-set)实现乐观锁
          用WATCH来监控要修改的key,然后通过EXEC来执行事务。如果WATCH执行后、事务EXEC前,key被修改,则当前客户端的事务就会失败。程序接下来就会不断重复这个过程,知道事务成功执行为止。这种形式的锁被成为乐观锁。对key的监视从WATCH命令执行开始,到EXEC被调用时(不考虑EXEC的执行结果)结束。UNWATCH可以手动取消对key的监控。
 
  • 脚本和事务
          redis中的脚本也是一种事务,而且比事务来的更简单,并且速度更快。当需要用事务的,推荐用脚本的方式。
上一篇:咏南中间件新增MORMOT移动端演示


下一篇:Echars 6大公共组件详解