写在前面
这一章节涉及的内容比较简单,主要是为了讲清楚redis server端如何维持和client相关的连接的,基本从以下几个方面展开描述:
- redis server端采用何种数据结构维护连接信息
- redis server端创建和client相关连接的过程
- redis client相关的数据结构定义
redis server核心数据结构
redis server作为redis最核心的数据结构,基本上redis所有相关的信息都会在这里有体现,这里列出几大块。
- 服务本身运行信息
- 网络监听连接等
- AOF/RDB持久化信息
- redis慢查询日志
- 主从同步信息
- 集群cluster信息
redis客户端管理相关的数据结构主要有
- list *clients; // 一个链表,保存了所有客户端状态结构
- list *clients_to_close; // 链表,保存了所有待关闭的客户端
- list *slaves, *monitors; // 链表,保存了所有从服务器,以及所有监视器
redis server端使用列表来管理客户端发起的连接,以list保存所有客户端发起的连接,以及待关闭的客户端连接。
struct redisServer {
// serverCron() 每秒调用的次数
int hz; /* serverCron() calls frequency in hertz */
// 数据库
redisDb *db;
// 事件状态
aeEventLoop *el;
/* redis 网络相关的数据结构 */
// TCP 监听端口
int port; /* TCP listening port */
int tcp_backlog; /* TCP listen() backlog */
// 地址
char *bindaddr[REDIS_BINDADDR_MAX]; /* Addresses we should bind to */
// 地址数量
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
// 一个链表,保存了所有客户端状态结构
list *clients; /* List of active clients */
// 链表,保存了所有待关闭的客户端
list *clients_to_close; /* Clients to close asynchronously */
// 链表,保存了所有从服务器,以及所有监视器
list *slaves, *monitors; /* List of slaves and MONITORs */
/* RDB / AOF 载入信息相关数据结构 */
// 这个值为真时,表示服务器正在进行载入
int loading; /* We are loading data from disk if true */
// 正在载入的数据的大小
off_t loading_total_bytes;
// 已载入数据的大小
off_t loading_loaded_bytes;
// 开始进行载入的时间
time_t loading_start_time;
off_t loading_process_events_interval_bytes;
/* Fast pointers to often looked up command */
// 常用命令的快捷连接
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
*rpopCommand;
/* server端相关的统计信息 */
// 服务器启动时间
time_t stat_starttime; /* Server start time */
// 已处理命令的数量
long long stat_numcommands; /* Number of processed commands */
// 服务器接到的连接请求数量
long long stat_numconnections; /* Number of connections received */
// 已过期的键数量
long long stat_expiredkeys; /* Number of expired keys */
/* 慢查询日志 */
// 保存了所有慢查询日志的链表
list *slowlog; /* SLOWLOG list of commands */
// 下一条慢查询日志的 ID
long long slowlog_entry_id; /* SLOWLOG current entry ID */
// 服务器配置 slowlog-log-slower-than 选项的值
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
// 服务器配置 slowlog-max-len 选项的值
unsigned long slowlog_max_len;
size_t resident_set_size;
// 最后一次进行抽样的时间
long long ops_sec_last_sample_time;
// 最后一次抽样时,服务器已执行命令的数量
long long ops_sec_last_sample_ops;
// 抽样结果
long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES];
// 数组索引,用于保存抽样结果,并在需要时回绕到 0
int ops_sec_idx;
/* AOF 持久化信息*/
// AOF 状态(开启/关闭/可写)
int aof_state; /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */
// 所使用的 fsync 策略(每个写入/每秒/从不)
int aof_fsync; /* Kind of fsync() policy */
char *aof_filename; /* Name of the AOF file */
int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */
int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */
off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */
// 最后一次执行 BGREWRITEAOF 时, AOF 文件的大小
off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
// AOF 文件的当前字节大小
off_t aof_current_size; /* AOF current size. */
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
/* RDB 持久化信息 */
// 自从上次 SAVE 执行以来,数据库被修改的次数
long long dirty; /* Changes to DB from the last save */
// BGSAVE 执行前的数据库被修改次数
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
// 负责执行 BGSAVE 的子进程的 ID
// 没在执行 BGSAVE 时,设为 -1
pid_t rdb_child_pid; /* PID of RDB saving child */
struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
int rdb_compression; /* Use compression in RDB? */
int rdb_checksum; /* Use RDB checksum? */
/* 日志相关信息 */
char *logfile; /* Path of log file */
int syslog_enabled; /* Is syslog enabled? */
char *syslog_ident; /* Syslog ident */
int syslog_facility; /* Syslog facility */
/* 主从复制 master的配置信息 */
int slaveseldb; /* Last SELECTed DB in replication output */
// 全局复制偏移量(一个累计值)
long long master_repl_offset; /* Global replication offset */
// 主服务器发送 PING 的频率
int repl_ping_slave_period; /* Master pings the slave every N seconds */
/* 主从赋值 slave的配置信息 */
// 主服务器的验证密码
char *masterauth; /* AUTH with this password with master */
// 主服务器的地址
char *masterhost; /* Hostname of master */
// 主服务器的端口
int masterport; /* Port of master */
// 超时时间
int repl_timeout; /* Timeout after N seconds of master idle */
/* 发布订阅信息 */
// 字典,键为频道,值为链表
// 链表中保存了所有订阅某个频道的客户端
// 新客户端总是被添加到链表的表尾
dict *pubsub_channels; /* Map channels to list of subscribed clients */
// 这个链表记录了客户端订阅的所有模式的名字
list *pubsub_patterns; /* A list of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of REDIS_NOTIFY... flags. */
/* 集群信息 */
int cluster_enabled; /* Is cluster enabled? */
mstime_t cluster_node_timeout; /* Cluster node timeout. */
char *cluster_configfile; /* Cluster auto-generated config file name. */
struct clusterState *cluster; /* State of the cluster */
int cluster_migration_barrier; /* Cluster replicas migration barrier. */
/* 脚本信息 */
// Lua 环境
lua_State *lua; /* The Lua interpreter. We use just one for all clients */
// 复制执行 Lua 脚本中的 Redis 命令的伪客户端
redisClient *lua_client; /* The "fake client" to query Redis from Lua */
// 当前正在执行 EVAL 命令的客户端,如果没有就是 NULL
redisClient *lua_caller; /* The client running EVAL right now, or NULL */
};
redis 客户端创建过程
当客户端第一次向server发起连接的时候,也就是在监听socket的accept的处理函数当中acceptCommonHandler的内部通过createClient创建对应client连接的socket。
/*
* TCP 连接 accept 处理器
*/
#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags) {
// 创建客户端
redisClient *c;
if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
// 如果新添加的客户端令服务器的最大客户端数量达到了
// 那么向新客户端写入错误信息,并关闭新客户端
// 先创建客户端,再进行数量检查是为了方便地进行错误信息写入
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
// 更新拒绝连接数
server.stat_rejected_conn++;
freeClient(c);
return;
}
// 更新连接次数
server.stat_numconnections++;
// 设置 FLAG
c->flags |= flags;
}
createClient内部进行按照以下步骤创建了client的socket并添加list列表当中。
- 分配redisClient的内存大小
- 根据fd执行aeCreateFileEvent将socket添加到对应的eventloop当中实现事件监听
- 初始化一堆client相关的属性
- 执行listAddNodeTail方法将client添加到客户端相关的列表当中
/*
* 创建一个新客户端
*/
redisClient *createClient(int fd) {
// 分配空间
redisClient *c = zmalloc(sizeof(redisClient));
// 当 fd 不为 -1 时,创建带网络连接的客户端
// 如果 fd 为 -1 ,那么创建无网络连接的伪客户端
// 因为 Redis 的命令必须在客户端的上下文中使用,所以在执行 Lua 环境中的命令时
// 需要用到这种伪终端
if (fd != -1) {
// 非阻塞
anetNonBlock(NULL,fd);
// 禁用 Nagle 算法
anetEnableTcpNoDelay(NULL,fd);
// 设置 keep alive
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 绑定读事件到事件 loop (开始接收命令请求)
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
// 初始化各个属性
// 默认数据库
selectDb(c,0);
// 套接字
c->fd = fd;
// 名字
c->name = NULL;
// 回复缓冲区的偏移量
c->bufpos = 0;
// 查询缓冲区
c->querybuf = sdsempty();
// 查询缓冲区峰值
c->querybuf_peak = 0;
// 命令请求的类型
c->reqtype = 0;
// 命令参数数量
c->argc = 0;
// 命令参数
c->argv = NULL;
// 当前执行的命令和最近一次执行的命令
c->cmd = c->lastcmd = NULL;
// 查询缓冲区中未读入的命令内容数量
c->multibulklen = 0;
// 读入的参数的长度
c->bulklen = -1;
// 已发送字节数
c->sentlen = 0;
// 状态 FLAG
c->flags = 0;
// 创建时间和最后一次互动时间
c->ctime = c->lastinteraction = server.unixtime;
// 认证状态
c->authenticated = 0;
// 复制状态
c->replstate = REDIS_REPL_NONE;
// 复制偏移量
c->reploff = 0;
// 通过 ACK 命令接收到的偏移量
c->repl_ack_off = 0;
// 通过 AKC 命令接收到偏移量的时间
c->repl_ack_time = 0;
// 客户端为从服务器时使用,记录了从服务器所使用的端口号
c->slave_listening_port = 0;
// 回复链表
c->reply = listCreate();
// 回复链表的字节量
c->reply_bytes = 0;
// 回复缓冲区大小达到软限制的时间
c->obuf_soft_limit_reached_time = 0;
// 回复链表的释放和复制函数
listSetFreeMethod(c->reply,decrRefCountVoid);
listSetDupMethod(c->reply,dupClientReplyValue);
// 阻塞类型
c->btype = REDIS_BLOCKED_NONE;
// 阻塞超时
c->bpop.timeout = 0;
// 造成客户端阻塞的列表键
c->bpop.keys = dictCreate(&setDictType,NULL);
// 在解除阻塞时将元素推入到 target 指定的键中
// BRPOPLPUSH 命令时使用
c->bpop.target = NULL;
c->bpop.numreplicas = 0;
c->bpop.reploffset = 0;
c->woff = 0;
// 进行事务时监视的键
c->watched_keys = listCreate();
// 订阅的频道和模式
c->pubsub_channels = dictCreate(&setDictType,NULL);
c->pubsub_patterns = listCreate();
c->peerid = NULL;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
// 如果不是伪客户端,那么添加到服务器的客户端链表中
if (fd != -1) listAddNodeTail(server.clients,c);
// 初始化客户端的事务状态
initClientMultiState(c);
// 返回客户端
return c;
}
redisClient对应的数据结构定义,很多变量我们看着都很熟悉。
- 跟请求相关的变量包括:(redisDb *db)、(int argc)、( robj **argv)
- 跟订阅相关的变量包括:(dict *pubsub_channels)、(list *pubsub_patterns)。
/*
*
* 因为 I/O 复用的缘故,需要为每个客户端维持一个状态。
*
* 多个客户端状态被服务器用链表连接起来。
*/
typedef struct redisClient {
// 套接字描述符
int fd;
// 当前正在使用的数据库
redisDb *db;
// 当前正在使用的数据库的 id (号码)
int dictid;
// 客户端的名字
robj *name; /* As set by CLIENT SETNAME */
// 查询缓冲区
sds querybuf;
// 查询缓冲区长度峰值
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */
// 参数数量
int argc;
// 参数对象数组
robj **argv;
// 记录被客户端执行的命令
struct redisCommand *cmd, *lastcmd;
// 请求的类型:内联命令还是多条命令
int reqtype;
// 剩余未读取的命令内容数量
int multibulklen; /* number of multi bulk arguments left to read */
// 命令内容的长度
long bulklen; /* length of bulk argument in multi bulk request */
// 回复链表
list *reply;
// 回复链表中对象的总大小
unsigned long reply_bytes; /* Tot bytes of objects in reply list */
// 已发送字节,处理 short write 用
int sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
// 创建客户端的时间
time_t ctime; /* Client creation time */
// 客户端最后一次和服务器互动的时间
time_t lastinteraction; /* time of the last interaction, used for timeout */
// 客户端的输出缓冲区超过软性限制的时间
time_t obuf_soft_limit_reached_time;
// 客户端状态标志
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
// 当 server.requirepass 不为 NULL 时
// 代表认证的状态
// 0 代表未认证, 1 代表已认证
int authenticated; /* when requirepass is non-NULL */
// 复制状态
int replstate; /* replication state if this is a slave */
// 用于保存主服务器传来的 RDB 文件的文件描述符
int repldbfd; /* replication DB file descriptor */
// 读取主服务器传来的 RDB 文件的偏移量
off_t repldboff; /* replication DB file offset */
// 主服务器传来的 RDB 文件的大小
off_t repldbsize; /* replication DB file size */
sds replpreamble; /* replication DB preamble. */
// 主服务器的复制偏移量
long long reploff; /* replication offset if this is our master */
// 从服务器最后一次发送 REPLCONF ACK 时的偏移量
long long repl_ack_off; /* replication ack offset, if this is a slave */
// 从服务器最后一次发送 REPLCONF ACK 的时间
long long repl_ack_time;/* replication ack time, if this is a slave */
// 主服务器的 master run ID
// 保存在客户端,用于执行部分重同步
char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
// 从服务器的监听端口号
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
// 事务状态
multiState mstate; /* MULTI/EXEC state */
// 阻塞类型
int btype; /* Type of blocking op if REDIS_BLOCKED. */
// 阻塞状态
blockingState bpop; /* blocking state */
// 最后被写入的全局复制偏移量
long long woff; /* Last write global replication offset. */
// 被监视的键
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
// 这个字典记录了客户端所有订阅的频道
// 键为频道名字,值为 NULL
// 也即是,一个频道的集合
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
// 链表,包含多个 pubsubPattern 结构
// 记录了所有订阅频道的客户端的信息
// 新 pubsubPattern 结构总是被添加到表尾
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
/* Response buffer */
// 回复偏移量
int bufpos;
// 回复缓冲区
char buf[REDIS_REPLY_CHUNK_BYTES];
}