Redis 设计与实现 (四)--事件、客户端

事件

一、文件事件

  文件事件处理器使用I/O多路复用程序来同时监听多个套接字,

  监听套接字,分配对应的处理事件。

  四个组成部分:套接字 、I/O多路复用 、 文件事件分派器 、 事件处理器

  连接应答处理器:redis服务器初始化,将连接应答处理器和服务器监听套接字的事件惯量,当客户端使用connect 函数链接服务器,套接字产生事件,触发连接应答处理器。

  命令请求处理器:客户端向服务器发送命令请求的时候,套接字产生事件,触发命令请求处理器处理请求。

  命令回复处理器:服务器命令回复传给客户端,服务器将命令回复处理器和事件关联,客户端准备接收回传命令后,触发事件引发回复处理器执行。

二、时间事件

  当被监听的套接字准备好执行链接应答、读取、写入、关闭等操作时,

  与操作相对性的文件事件就会产生。

  定时事件:指定时间执行一次程序

  周期事件:隔一定时间执行一次程序

  id -- 服务器为时间事件创建的全局唯一ID,递增

  when -- 毫秒,记录事件的触发时间

  timeProc -- 时间事件处理器

客户端

struct redisServer {
/* General */
pid_t pid; /* Main process pid. */
char *configfile; /* Absolute config file path, or NULL */
int hz; /* serverCron() calls frequency in hertz */
redisDb *db;
dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */
aeEventLoop *el;
unsigned lruclock:REDIS_LRU_BITS; /* Clock for LRU eviction */
int shutdown_asap; /* SHUTDOWN needed ASAP */
int activerehashing; /* Incremental rehash in serverCron() */
char *requirepass; /* Pass for AUTH command, or NULL */
char *pidfile; /* PID file path */
int arch_bits; /* 32 or 64 depending on sizeof(PORT_LONG) */
int cronloops; /* Number of times the cron function run */
char runid[REDIS_RUN_ID_SIZE+]; /* ID always different at every exec. */
int sentinel_mode; /* True if this instance is a Sentinel. */
/* Networking */
int port; /* TCP listening port */
int tcp_backlog; /* TCP listen() backlog */
char *bindaddr[REDIS_BINDADDR_MAX]; /* Addresses we should bind to */
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
char *unixsocket; /* UNIX socket path */
mode_t unixsocketperm; /* UNIX socket permission */
int ipfd[REDIS_BINDADDR_MAX]; /* TCP socket file descriptors */
int ipfd_count; /* Used slots in ipfd[] */
int sofd; /* Unix socket file descriptor */
int cfd[REDIS_BINDADDR_MAX];/* Cluster bus listening socket */
int cfd_count; /* Used slots in cfd[] */
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *slaves, *monitors; /* List of slaves and MONITORs */
redisClient *current_client; /* Current client, only used on crash report */
int clients_paused;

一、输入缓冲区

  客户端状态的输入缓冲区(sds  querybuf)用于保存客户端发送的命令请求

  命令:robj **argv  命令参数

       int    argc  命令参数个数

     从服务器协议中分析参数和个数,根据argv[0]查找对应的命令函数 

二、输出缓冲区

  执行命令后的回复被保存再客户端状态的输出缓冲区里面。

  --固定缓冲区,长度较小的回复

  --可变缓冲区,长度较大的回复

三、身份验证

  int  authenticated

  ==0 未验证  ; ==1 客户端通过了验证

四、时间

  timt_t    ctime  --创建客户端的时间,可计算连接时长(秒)

  timt_t  lastinteraction  --客户端和服务器最后一次通信时间,可用来计算客户端空转时间

  time_t    obuf_soft_limit_reached_time  --输出缓冲区第一次到达软性限制的时间

五、客户端的创建与关闭

  服务器使用不同的方式来创建和关闭不同类型的客户端

  5.1 创建普通客户端

  5.2 关闭普通客户端

    --客户端进程退出或者杀死

    --客户端向服务器发送不符合协议格式的命令请求

    --客户端设置了timeout 配置选项

    --客户端发送的命令请求大小超过了输出缓冲区限制大小,默认1G  (硬性限制和软性限制)

  5.3 Lua脚本的伪客户端

    服务器初始化创建负责执行Lua脚本中包含redis命令的伪客户端

  5.4 AOF 文件的伪客户端

    服务器再载入AOF文件时,创建用于执行AOF文件包含的redis命令的伪客户端,并在载入完成后关闭。

六、服务器

  6.1 命令请求的执行过程

  客户端发送命令 set key value  收到回复 ok

  1、客户端发送 set key value

  2、服务端接收处理客户端的命令请求,产生回复ok

  3、将命令回复给客户端

  4、客户端接收回复,打印展示

  6.2 发送命令请求

  用户-> 请求-> 客户端-> 协议转换-> 服务器

  6.3读取命令请求

  读取命令->保存到客户状态的缓冲区->分析命令->调用命令执行器

  6.4 将命令回复发给客户端

  6.5客户端接收并打印命令回复

  服务器->客户端->用户

七、serverCron函数

  serverCron 默认100毫秒执行一次,管理服务器资源。

  7.1、更新服务器时间缓存

  因为获取系统当前时间操作比较频繁,所以缓存系统时间。

  所以缓存当前时间:

  unixtime //秒级

  mstime  //毫秒

  7.2、更新LRU时钟

  lruclock  服务器的LRU时钟

  7.3、更新服务器每秒执行命令次数

  trackOperationsPerSecond  //估算服务器再最近一秒钟的处理请求数量

  7.4、更新服务器内存峰值记录

  程序会查看当前使用的内存的数量

  7.5、处理sigterm信号

  7.6、管理客户端资源

  超时--释放资源

  超出输入缓冲区长度--释放资源

  7.7、管理服务器资源

  删除过期键,收缩字典

  7.8、执行延迟的bgrewriteaof

  执行延迟的重写aof操作

  7.9、检查持久化操作的运行状态

  rdb_child_pid --  bgsave命令子进程id

  aof_child_pid  --bgrewriteaof命令子进程id

  可以检查是否有正在执行以上命令。

  7.10 AOF缓冲区的内容写入AOF文件

  启动持久化,会将缓冲区内容写入到aof文件

  7.11 关闭异步客户端

  7.12 增加 cronloops 计数器值

  cronloops --记录serverCron函数执行次数

八、初始化服务器

  redis 服务器启动到接受客户端命令,需要一系列初始化和设置过程。

  8.1 初始化服务器状态结构

  创建一个 redisServer结构,并赋值默认值initServerConfig函数。

struct redisServer
void initServerConfig(void) {
int j; getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
server.configfile = NULL;
server.hz = REDIS_DEFAULT_HZ;
server.runid[REDIS_RUN_ID_SIZE] = '\0';
server.arch_bits = (sizeof(PORT_LONG) == ) ? : ;
server.port = REDIS_SERVERPORT;
server.tcp_backlog = REDIS_TCP_BACKLOG;
server.bindaddr_count = ;
server.unixsocket = NULL;
server.unixsocketperm = REDIS_DEFAULT_UNIX_SOCKET_PERM;
server.ipfd_count = ;
server.sofd = -;
server.dbnum = REDIS_DEFAULT_DBNUM;
server.verbosity = REDIS_DEFAULT_VERBOSITY;
WIN32_ONLY(setLogVerbosityLevel(server.verbosity);)
server.maxidletime = REDIS_MAXIDLETIME;
server.tcpkeepalive = REDIS_DEFAULT_TCP_KEEPALIVE;
server.active_expire_enabled = ;
server.client_max_querybuf_len = REDIS_MAX_QUERYBUF_LEN;
server.saveparams = NULL;
server.loading = ;
server.logfile = zstrdup(REDIS_DEFAULT_LOGFILE);
server.syslog_enabled = REDIS_DEFAULT_SYSLOG_ENABLED;
server.syslog_ident = zstrdup(REDIS_DEFAULT_SYSLOG_IDENT);
POSIX_ONLY(server.syslog_facility = LOG_LOCAL0;)
server.daemonize = REDIS_DEFAULT_DAEMONIZE;
server.aof_state = REDIS_AOF_OFF;
server.aof_fsync = REDIS_DEFAULT_AOF_FSYNC;
server.aof_no_fsync_on_rewrite = REDIS_DEFAULT_AOF_NO_FSYNC_ON_REWRITE;
server.aof_rewrite_perc = REDIS_AOF_REWRITE_PERC;
server.aof_rewrite_min_size = REDIS_AOF_REWRITE_MIN_SIZE;
server.aof_rewrite_base_size = ;
server.aof_rewrite_scheduled = ;
server.aof_last_fsync = time(NULL);
server.aof_rewrite_time_last = -;
server.aof_rewrite_time_start = -;
server.aof_lastbgrewrite_status = REDIS_OK;
server.aof_delayed_fsync = ;
server.aof_fd = -;
server.aof_selected_db = -; /* Make sure the first time will not match */
server.aof_flush_postponed_start = ;
server.aof_rewrite_incremental_fsync = REDIS_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
server.aof_load_truncated = REDIS_DEFAULT_AOF_LOAD_TRUNCATED;
server.pidfile = zstrdup(REDIS_DEFAULT_PID_FILE);
server.rdb_filename = zstrdup(REDIS_DEFAULT_RDB_FILENAME);
server.aof_filename = zstrdup(REDIS_DEFAULT_AOF_FILENAME);
server.requirepass = NULL;
server.rdb_compression = REDIS_DEFAULT_RDB_COMPRESSION;
server.rdb_checksum = REDIS_DEFAULT_RDB_CHECKSUM;
server.stop_writes_on_bgsave_err = REDIS_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR;
server.activerehashing = REDIS_DEFAULT_ACTIVE_REHASHING;
server.notify_keyspace_events = ;
server.maxclients = REDIS_MAX_CLIENTS;
server.bpop_blocked_clients = ;
server.maxmemory = REDIS_DEFAULT_MAXMEMORY;
server.maxmemory_policy = REDIS_DEFAULT_MAXMEMORY_POLICY;
server.maxmemory_samples = REDIS_DEFAULT_MAXMEMORY_SAMPLES;
server.hash_max_ziplist_entries = REDIS_HASH_MAX_ZIPLIST_ENTRIES;
server.hash_max_ziplist_value = REDIS_HASH_MAX_ZIPLIST_VALUE;
server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
server.zset_max_ziplist_entries = REDIS_ZSET_MAX_ZIPLIST_ENTRIES;
server.zset_max_ziplist_value = REDIS_ZSET_MAX_ZIPLIST_VALUE;
server.hll_sparse_max_bytes = REDIS_DEFAULT_HLL_SPARSE_MAX_BYTES;
server.shutdown_asap = ;
server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
server.repl_timeout = REDIS_REPL_TIMEOUT;
server.repl_min_slaves_to_write = REDIS_DEFAULT_MIN_SLAVES_TO_WRITE;
server.repl_min_slaves_max_lag = REDIS_DEFAULT_MIN_SLAVES_MAX_LAG;
server.cluster_enabled = ;
server.cluster_node_timeout = REDIS_CLUSTER_DEFAULT_NODE_TIMEOUT;
server.cluster_migration_barrier = REDIS_CLUSTER_DEFAULT_MIGRATION_BARRIER;
server.cluster_slave_validity_factor = REDIS_CLUSTER_DEFAULT_SLAVE_VALIDITY;
server.cluster_require_full_coverage = REDIS_CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE;
server.cluster_configfile = zstrdup(REDIS_DEFAULT_CLUSTER_CONFIG_FILE);
server.lua_caller = NULL;
server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
server.lua_client = NULL;
server.lua_timedout = ;
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
server.next_client_id = ; /* Client IDs, start from 1 .*/
server.loading_process_events_interval_bytes = (**); server.lruclock = getLRUClock();
resetServerSaveParams(); appendServerSaveParams(*,); /* save after 1 hour and 1 change */
appendServerSaveParams(,); /* save after 5 minutes and 100 changes */
appendServerSaveParams(,); /* save after 1 minute and 10000 changes */
/* Replication related */
server.masterauth = NULL;
server.masterhost = NULL;
server.masterport = ;
server.master = NULL;
server.cached_master = NULL;
server.repl_master_initial_offset = -;
server.repl_state = REDIS_REPL_NONE;
server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA;
server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY;
server.repl_down_since = ; /* Never connected, repl is down since EVER. */
server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY;
server.repl_diskless_sync = REDIS_DEFAULT_REPL_DISKLESS_SYNC;
server.repl_diskless_sync_delay = REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
server.master_repl_offset = ; /* Replication partial resync backlog */
server.repl_backlog = NULL;
server.repl_backlog_size = REDIS_DEFAULT_REPL_BACKLOG_SIZE;
server.repl_backlog_histlen = ;
server.repl_backlog_idx = ;
server.repl_backlog_off = ;
server.repl_backlog_time_limit = REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT;
server.repl_no_slaves_since = time(NULL); /* Client output buffer limits */
for (j = ; j < REDIS_CLIENT_TYPE_COUNT; j++)
server.client_obuf_limits[j] = clientBufferLimitsDefaults[j]; /* Double constants initialization */
R_Zero = 0.0;
R_PosInf = 1.0/R_Zero;
R_NegInf = -1.0/R_Zero;
R_Nan = R_Zero/R_Zero; /* Command table -- we initiialize it here as it is part of the
* initial configuration, since command names may be changed via
* redis.conf using the rename-command directive. */
server.commands = dictCreate(&commandTableDictType,NULL);
server.orig_commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop"); /* Slow log */
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
server.slowlog_max_len = REDIS_SLOWLOG_MAX_LEN; /* Latency monitor */
server.latency_monitor_threshold = REDIS_DEFAULT_LATENCY_MONITOR_THRESHOLD; /* Debugging */
server.assert_failed = "<no assertion failed>";
server.assert_file = "<no file>";
server.assert_line = ;
server.bug_report_start = ;
server.watchdog_period = ;
}

  设置服务器运行ID

  设置服务器的默认运行频率

  设置服务器的默认配置文件路径

  设置服务器运行架构

  设置服务器默认的端口号

  设置服务器的默认rdb持久化条件和aof持久化条件

  初始化服务器的LRU时钟

  创建命令表

  8.2 载入配置选项

  用户可以通过给定的配置参数或者指定配置文件路径覆盖服务器默认配置

  8.3 初始化服务器的数据结构

    server.clients  链表 , 记录所有与服务器连接的客户端状态结构

    server.db 数组,包含所有数据库

    server.pubsub_channels 字典,保存频道订阅信息

    server.pubsub_patterns 字段,保存模式订阅信息

    server.lua 执行lua脚本的lua环境

    server.slowlog 属性,保存慢查询的日志

  8.4 还原数据库状态    

    开启AOF持久化,aof文件来还原

    没有开启AOF,使用本地RDB文件还原

上一篇:去除DataTable重复数据的三种方法


下一篇:redis接入sentinelPool的配置