标签:
redis服务器负责与多个客户端建立网络连接,处理客户端发送的命令请求,在数据库中保存客户端执行命令所才参数的数据,并通过资源管理来维持服务器自身的运转。
以SET命令为例:
redis> SET key value
redis> OK
当客户端与服务器之间的连接套接字因为客户端的写入而变的可读时,服务器将调用命令请求处理器来执行如下操作:
①读取套接字中协议格式的命令请求,保存到客户端状态的输入缓冲区。
②对缓冲区中的命令请求进行分析,提取命令参数及个数,保存到客户端状态的argv属性和argc属性。
③调用命令执行器执行客户端指定的命令。
(1)查找命令实现
根据客户端状态的 argv[0] 参数, 在命令表(command table)中查找参数所指定的命令, 并将找到的命令保存到客户端状态的 cmd 属性里面。
命令表是一个字典, 字典的键是一个个命令名字,比如 “set” 、 “get” 、 “del” ,等等; 而字典的值则是一个个 redisCommand 结构, 每个 redisCommand 结构记录了一个 Redis 命令的实现信息。
(2)执行预备操作
(3)调用命令的实现函数
client->cmd->proc(client);
(4)执行后续工作
命令实现函数会将命令回复保存到客户端的输出缓冲区里面, 并为客户端的套接字关联命令回复处理器, 当客户端套接字变为可写状态时, 服务器就会执行命令回复处理器, 将保存在客户端输出缓冲区中的命令回复发送给客户端。
当命令回复发送完毕之后, 回复处理器会清空客户端状态的输出缓冲区, 为处理下一个命令请求做好准备。
redis服务器中的serverCron函数默认每隔100毫秒执行一次,这个函数负责管理服务器的资源,并保持服务器自身的良好运转。
主要工作包括:更新服务器状态信息,处理服务器接收的SIGTERM信号,管理客户端资源和数据库状态,检查并执行持久化操作等。
主要工作包括:
更新服务器的各类统计信息,比如时间,内存占用,数据库占用情况等。
清理数据库中过期键值对。
关闭和清理连接失效的客户端
尝试进行AOF或RDB持久化操作。
如果服务器是主服务器,对从服务器进行定期同步。
如果处理集群模式,对集群进行定期同步和连接测试。
redisServer结构/serverCron函数属性
/* This is our timer interrupt, called server.hz times per second.
*
* 这是 Redis 的时间中断器,每秒调用 server.hz 次。
*
* Here is where we do a number of things that need to be done asynchronously.
* For instance:
*
* 以下是需要异步执行的操作:
*
* - Active expired keys collection (it is also performed in a lazy way on
* lookup).
* 主动清除过期键。
*
* - Software watchdog.
* 更新软件 watchdog 的信息。
*
* - Update some statistic.
* 更新统计信息。
*
* - Incremental rehashing of the DBs hash tables.
* 对数据库进行渐增式 Rehash
*
* - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
* 触发 BGSAVE 或者 AOF 重写,并处理之后由 BGSAVE 和 AOF 重写引发的子进程停止。
*
* - Clients timeout of different kinds.
* 处理客户端超时。
*
* - Replication reconnection.
* 复制重连
*
* - Many more...
* 等等。。。
*
* Everything directly called here will be called server.hz times per second,
* so in order to throttle execution of things we want to do less frequently
* a macro is used: run_with_period(milliseconds) { .... }
*
* 因为 serverCron 函数中的所有代码都会每秒调用 server.hz 次,
* 为了对部分代码的调用次数进行限制,
* 使用了一个宏 run_with_period(milliseconds) { ... } ,
* 这个宏可以将被包含代码的执行次数降低为每 milliseconds 执行一次。
*/
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
REDIS_NOTUSED(eventLoop);
REDIS_NOTUSED(id);
REDIS_NOTUSED(clientData);
/* Software watchdog: deliver the SIGALRM that will reach the signal
* handler if we don‘t return here fast enough. */
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
/* Update the time cache. */
updateCachedTime();
// 记录服务器执行命令的次数
run_with_period(100) trackOperationsPerSecond();
/* We have just REDIS_LRU_BITS bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock.
*
* Note that even if the counter wraps it‘s not a big problem,
* everything will still work but some object will appear younger
* to Redis. However for this to happen a given object should never be
* touched for all the time needed to the counter to wrap, which is
* not likely.
*
* 即使服务器的时间最终比 1.5 年长也无所谓,
* 对象系统仍会正常运作,不过一些对象可能会比服务器本身的时钟更年轻。
* 不过这要这个对象在 1.5 年内都没有被访问过,才会出现这种现象。
*
* Note that you can change the resolution altering the
* REDIS_LRU_CLOCK_RESOLUTION define.
*
* LRU 时间的精度可以通过修改 REDIS_LRU_CLOCK_RESOLUTION 常量来改变。
*/
server.lruclock = getLRUClock();
/* Record the max memory used since the server was started. */
// 记录服务器的内存峰值
if (zmalloc_used_memory() > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used_memory();
/* Sample the RSS here since this is a relatively slow call. */
server.resident_set_size = zmalloc_get_rss();
/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
// 服务器进程收到 SIGTERM 信号,关闭服务器
if (server.shutdown_asap) {
// 尝试关闭服务器
if (prepareForShutdown(0) == REDIS_OK) exit(0);
// 如果关闭失败,那么打印 LOG ,并移除关闭标识
redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
}
/* Show some info about non-empty databases */
// 打印数据库的键值对信息
run_with_period(5000) {
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
// 可用键值对的数量
size = dictSlots(server.db[j].dict);
// 已用键值对的数量
used = dictSize(server.db[j].dict);
// 带有过期时间的键值对数量
vkeys = dictSize(server.db[j].expires);
// 用 LOG 打印数量
if (used || vkeys) {
redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
}
}
}
/* Show information about connected clients */
// 如果服务器没有运行在 SENTINEL 模式下,那么打印客户端的连接信息
if (!server.sentinel_mode) {
run_with_period(5000) {
redisLog(REDIS_VERBOSE,
"%lu clients connected (%lu slaves), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory());
}
}
/* We need to do a few operations on clients asynchronously. */
// 检查客户端,关闭超时客户端,并释放客户端多余的缓冲区
clientsCron();
/* Handle background operations on Redis databases. */
// 对数据库执行各种操作
databasesCron();
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
// 如果 BGSAVE 和 BGREWRITEAOF 都没有在执行
// 并且有一个 BGREWRITEAOF 在等待,那么执行 BGREWRITEAOF
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
/* Check if a background saving or AOF rewrite in progress terminated. */
// 检查 BGSAVE 或者 BGREWRITEAOF 是否已经执行完毕
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
int statloc;
pid_t pid;
// 接收子进程发来的信号,非阻塞
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
// BGSAVE 执行完毕
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
// BGREWRITEAOF 执行完毕
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
} else {
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
updateDictResizePolicy();
}
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now */
// 既然没有 BGSAVE 或者 BGREWRITEAOF 在执行,那么检查是否需要执行它们
// 遍历所有保存条件,看是否需要执行 BGSAVE 命令
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
// 检查是否有某个保存条件已经满足了
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
REDIS_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == REDIS_OK))
{
redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
// 执行 BGSAVE
rdbSaveBackground(server.rdb_filename);
break;
}
}
/* Trigger an AOF rewrite if needed */
// 出发 BGREWRITEAOF
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
// AOF 文件的当前大小大于执行 BGREWRITEAOF 所需的最小大小
server.aof_current_size > server.aof_rewrite_min_size)
{
// 上一次完成 AOF 写入之后,AOF 文件的大小
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
// AOF 文件当前的体积相对于 base 的体积的百分比
long long growth = (server.aof_current_size*100/base) - 100;
// 如果增长体积的百分比超过了 growth ,那么执行 BGREWRITEAOF
if (growth >= server.aof_rewrite_perc) {
redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
// 执行 BGREWRITEAOF
rewriteAppendOnlyFileBackground();
}
}
}
// 根据 AOF 政策,
// 考虑是否需要将 AOF 缓冲区中的内容写入到 AOF 文件中
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of ‘hz‘ is set to
* an higher frequency. */
run_with_period(1000) {
if (server.aof_last_write_status == REDIS_ERR)
flushAppendOnlyFile(0);
}
/* Close clients that need to be closed asynchronous */
// 关闭那些需要异步关闭的客户端
freeClientsInAsyncFreeQueue();
/* Clear the paused clients flag if needed. */
clientsArePaused(); /* Don‘t check return value, just use the side effect. */
/* Replication cron function -- used to reconnect to master and
* to detect transfer failures. */
// 复制函数
// 重连接主服务器、向主服务器发送 ACK 、判断数据发送失败情况、断开本服务器超时的从服务器,等等
run_with_period(1000) replicationCron();
/* Run the Redis Cluster cron. */
// 如果服务器运行在集群模式下,那么执行集群操作
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
/* Run the Sentinel timer if we are in sentinel mode. */
// 如果服务器运行在 sentinel 模式下,那么执行 SENTINEL 的主函数
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
/* Cleanup expired MIGRATE cached sockets. */
// 集群。。。TODO
run_with_period(1000) {
migrateCloseTimedoutSockets();
}
// 增加 loop 计数器
server.cronloops++;
return 1000/server.hz;
}
redis.h/redisServer结构,服务器状态:
struct redisServer {
/* General */
// 配置文件的绝对路径
char *configfile; /* Absolute config file path, or NULL */
// serverCron() 每秒调用的次数
int hz; /* serverCron() calls frequency in hertz */
// 数据库
redisDb *db;
// 命令表(受到 rename 配置选项的作用)
dict *commands; /* Command table */
// 命令表(无 rename 配置选项的作用)
dict *orig_commands; /* Command table before command renaming. */
// 事件状态
aeEventLoop *el;
// 最近一次使用时钟
unsigned lruclock:REDIS_LRU_BITS; /* Clock for LRU eviction */
// 关闭服务器的标识
int shutdown_asap; /* SHUTDOWN needed ASAP */
// 在执行 serverCron() 时进行渐进式 rehash
int activerehashing; /* Incremental rehash in serverCron() */
// 是否设置了密码
char *requirepass; /* Pass for AUTH command, or NULL */
// PID 文件
char *pidfile; /* PID file path */
// 架构类型
int arch_bits; /* 32 or 64 depending on sizeof(long) */
// serverCron() 函数的运行次数计数器
int cronloops; /* Number of times the cron function run */
// 本服务器的 RUN ID
char runid[REDIS_RUN_ID_SIZE+1]; /* ID always different at every exec. */
// 服务器是否运行在 SENTINEL 模式
int sentinel_mode; /* True if this instance is a Sentinel. */
/* Networking */
// TCP 监听端口
int port; /* TCP listening port */
int tcp_backlog; /* TCP listen() backlog */
// 地址
char *bindaddr[REDIS_BINDADDR_MAX]; /* Addresses we should bind to */
// 地址数量
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
// UNIX 套接字
char *unixsocket; /* UNIX socket path */
mode_t unixsocketperm; /* UNIX socket permission */
// 描述符
int ipfd[REDIS_BINDADDR_MAX]; /* TCP socket file descriptors */
// 描述符数量
int ipfd_count; /* Used slots in ipfd[] */
// UNIX 套接字文件描述符
int sofd; /* Unix socket file descriptor */
int cfd[REDIS_BINDADDR_MAX];/* Cluster bus listening socket */
int cfd_count; /* Used slots in cfd[] */
// 一个链表,保存了所有客户端状态结构
list *clients; /* List of active clients */
// 链表,保存了所有待关闭的客户端
list *clients_to_close; /* Clients to close asynchronously */
// 链表,保存了所有从服务器,以及所有监视器
list *slaves, *monitors; /* List of slaves and MONITORs */
// 服务器的当前客户端,仅用于崩溃报告
redisClient *current_client; /* Current client, only used on crash report */
int clients_paused; /* True if clients are currently paused */
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
// 网络错误
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
// MIGRATE 缓存
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
/* RDB / AOF loading information */
// 这个值为真时,表示服务器正在进行载入
int loading; /* We are loading data from disk if true */
// 正在载入的数据的大小
off_t loading_total_bytes;
// 已载入数据的大小
off_t loading_loaded_bytes;
// 开始进行载入的时间
time_t loading_start_time;
off_t loading_process_events_interval_bytes;
/* Fast pointers to often looked up command */
// 常用命令的快捷连接
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
*rpopCommand;
/* Fields used only for stats */
// 服务器启动时间
time_t stat_starttime; /* Server start time */
// 已处理命令的数量
long long stat_numcommands; /* Number of processed commands */
// 服务器接到的连接请求数量
long long stat_numconnections; /* Number of connections received */
// 已过期的键数量
long long stat_expiredkeys; /* Number of expired keys */
// 因为回收内存而被释放的过期键的数量
long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */
// 成功查找键的次数
long long stat_keyspace_hits; /* Number of successful lookups of keys */
// 查找键失败的次数
long long stat_keyspace_misses; /* Number of failed lookups of keys */
// 已使用内存峰值
size_t stat_peak_memory; /* Max used memory record */
// 最后一次执行 fork() 时消耗的时间
long long stat_fork_time; /* Time needed to perform latest fork() */
// 服务器因为客户端数量过多而拒绝客户端连接的次数
long long stat_rejected_conn; /* Clients rejected because of maxclients */
// 执行 full sync 的次数
long long stat_sync_full; /* Number of full resyncs with slaves. */
// PSYNC 成功执行的次数
long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */
// PSYNC 执行失败的次数
long long stat_sync_partial_err;/* Number of unaccepted PSYNC requests. */
/* slowlog */
// 保存了所有慢查询日志的链表
list *slowlog; /* SLOWLOG list of commands */
// 下一条慢查询日志的 ID
long long slowlog_entry_id; /* SLOWLOG current entry ID */
// 服务器配置 slowlog-log-slower-than 选项的值
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
// 服务器配置 slowlog-max-len 选项的值
unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */
size_t resident_set_size; /* RSS sampled in serverCron(). */
/* The following two are used to track instantaneous "load" in terms
* of operations per second. */
// 最后一次进行抽样的时间
long long ops_sec_last_sample_time; /* Timestamp of last sample (in ms) */
// 最后一次抽样时,服务器已执行命令的数量
long long ops_sec_last_sample_ops; /* numcommands in last sample */
// 抽样结果
long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES];
// 数组索引,用于保存抽样结果,并在需要时回绕到 0
int ops_sec_idx;
/* Configuration */
// 日志可见性
int verbosity; /* Loglevel in redis.conf */
// 客户端最大空转时间
int maxidletime; /* Client timeout in seconds */
// 是否开启 SO_KEEPALIVE 选项
int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */
int active_expire_enabled; /* Can be disabled for testing purposes. */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */
int daemonize; /* True if running as a daemon */
// 客户端输出缓冲区大小限制
// 数组的元素有 REDIS_CLIENT_LIMIT_NUM_CLASSES 个
// 每个代表一类客户端:普通、从服务器、pubsub,诸如此类
clientBufferLimitsConfig client_obuf_limits[REDIS_CLIENT_LIMIT_NUM_CLASSES];
/* AOF persistence */
// AOF 状态(开启/关闭/可写)
int aof_state; /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */
// 所使用的 fsync 策略(每个写入/每秒/从不)
int aof_fsync; /* Kind of fsync() policy */
char *aof_filename; /* Name of the AOF file */
int aof_no_fsync_on_rewrite; /* Don‘t fsync if a rewrite is in prog. */
int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */
off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */
// 最后一次执行 BGREWRITEAOF 时, AOF 文件的大小
off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
// AOF 文件的当前字节大小
off_t aof_current_size; /* AOF current size. */
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
// 负责进行 AOF 重写的子进程 ID
pid_t aof_child_pid; /* PID if rewriting process */
// AOF 重写缓存链表,链接着多个缓存块
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
// AOF 缓冲区
sds aof_buf; /* AOF buffer, written before entering the event loop */
// AOF 文件的描述符
int aof_fd; /* File descriptor of currently selected AOF file */
// AOF 的当前目标数据库
int aof_selected_db; /* Currently selected DB in AOF */
// 推迟 write 操作的时间
time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */
// 最后一直执行 fsync 的时间
time_t aof_last_fsync; /* UNIX time of last fsync() */
time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */
// AOF 重写的开始时间
time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */
// 最后一次执行 BGREWRITEAOF 的结果
int aof_lastbgrewrite_status; /* REDIS_OK or REDIS_ERR */
// 记录 AOF 的 write 操作被推迟了多少次
unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */
// 指示是否需要每写入一定量的数据,就主动执行一次 fsync()
int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */
int aof_last_write_status; /* REDIS_OK or REDIS_ERR */
int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */
/* RDB persistence */
// 自从上次 SAVE 执行以来,数据库被修改的次数
long long dirty; /* Changes to DB from the last save */
// BGSAVE 执行前的数据库被修改次数
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
// 负责执行 BGSAVE 的子进程的 ID
// 没在执行 BGSAVE 时,设为 -1
pid_t rdb_child_pid; /* PID of RDB saving child */
struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
int rdb_compression; /* Use compression in RDB? */
int rdb_checksum; /* Use RDB checksum? */
// 最后一次完成 SAVE 的时间
time_t lastsave; /* Unix time of last successful save */
// 最后一次尝试执行 BGSAVE 的时间
time_t lastbgsave_try; /* Unix time of last attempted bgsave */
// 最近一次 BGSAVE 执行耗费的时间
time_t rdb_save_time_last; /* Time used by last RDB save run. */
// 数据库最近一次开始执行 BGSAVE 的时间
time_t rdb_save_time_start; /* Current RDB save start time. */
// 最后一次执行 SAVE 的状态
int lastbgsave_status; /* REDIS_OK or REDIS_ERR */
int stop_writes_on_bgsave_err; /* Don‘t allow writes if can‘t BGSAVE */
/* Propagation of commands in AOF / replication */
redisOpArray also_propagate; /* Additional command to propagate. */
/* Logging */
char *logfile; /* Path of log file */
int syslog_enabled; /* Is syslog enabled? */
char *syslog_ident; /* Syslog ident */
int syslog_facility; /* Syslog facility */
/* Replication (master) */
int slaveseldb; /* Last SELECTed DB in replication output */
// 全局复制偏移量(一个累计值)
long long master_repl_offset; /* Global replication offset */
// 主服务器发送 PING 的频率
int repl_ping_slave_period; /* Master pings the slave every N seconds */
// backlog 本身
char *repl_backlog; /* Replication backlog for partial syncs */
// backlog 的长度
long long repl_backlog_size; /* Backlog circular buffer size */
// backlog 中数据的长度
long long repl_backlog_histlen; /* Backlog actual data length */
// backlog 的当前索引
long long repl_backlog_idx; /* Backlog circular buffer current offset */
// backlog 中可以被还原的第一个字节的偏移量
long long repl_backlog_off; /* Replication offset of first byte in the
backlog buffer. */
// backlog 的过期时间
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
// 距离上一次有从服务器的时间
time_t repl_no_slaves_since; /* We have no slaves since that time.
Only valid if server.slaves len is 0. */
// 是否开启最小数量从服务器写入功能
int repl_min_slaves_to_write; /* Min number of slaves to write. */
// 定义最小数量从服务器的最大延迟值
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
// 延迟良好的从服务器的数量
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
/* Replication (slave) */
// 主服务器的验证密码
char *masterauth; /* AUTH with this password with master */
// 主服务器的地址
char *masterhost; /* Hostname of master */
// 主服务器的端口
int masterport; /* Port of master */
// 超时时间
int repl_timeout; /* Timeout after N seconds of master idle */
// 主服务器所对应的客户端
redisClient *master; /* Client that is master for this slave */
// 被缓存的主服务器,PSYNC 时使用
redisClient *cached_master; /* Cached master to be reused for PSYNC. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
// 复制的状态(服务器是从服务器时使用)
int repl_state; /* Replication status if the instance is a slave */
// RDB 文件的大小
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
// 已读 RDB 文件内容的字节数
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
// 最近一次执行 fsync 时的偏移量
// 用于 sync_file_range 函数
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
// 主服务器的套接字
int repl_transfer_s; /* Slave -> Master SYNC socket */
// 保存 RDB 文件的临时文件的描述符
int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */
// 保存 RDB 文件的临时文件名字
char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
// 最近一次读入 RDB 内容的时间
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
int repl_serve_stale_data; /* Serve stale data when link is down? */
// 是否只读从服务器?
int repl_slave_ro; /* Slave is read only? */
// 连接断开的时长
time_t repl_down_since; /* Unix time at which link with master went down */
// 是否要在 SYNC 之后关闭 NODELAY ?
int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */
// 从服务器优先级
int slave_priority; /* Reported in INFO and used by Sentinel. */
// 本服务器(从服务器)当前主服务器的 RUN ID
char repl_master_runid[REDIS_RUN_ID_SIZE+1]; /* Master run id for PSYNC. */
// 初始化偏移量
long long repl_master_initial_offset; /* Master PSYNC offset. */
/* Replication script cache. */
// 复制脚本缓存
// 字典
dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
// FIFO 队列
list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */
// 缓存的大小
int repl_scriptcache_size; /* Max number of elements. */
/* Synchronous replication. */
list *clients_waiting_acks; /* Clients waiting in WAIT command. */
int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */
/* Limits */
int maxclients; /* Max number of simultaneous clients */
unsigned long long maxmemory; /* Max number of memory bytes to use */
int maxmemory_policy; /* Policy for key eviction */
int maxmemory_samples; /* Pricision of random sampling */
/* Blocked clients */
unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
list *unblocked_clients; /* list of clients to unblock before next loop */
list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
int sort_alpha;
int sort_bypattern;
int sort_store;
/* Zip structure config, see redis.conf for more information */
size_t hash_max_ziplist_entries;
size_t hash_max_ziplist_value;
size_t list_max_ziplist_entries;
size_t list_max_ziplist_value;
size_t set_max_intset_entries;
size_t zset_max_ziplist_entries;
size_t zset_max_ziplist_value;
size_t hll_sparse_max_bytes;
time_t unixtime; /* Unix time sampled every cron cycle. */
long long mstime; /* Like ‘unixtime‘ but with milliseconds resolution. */
/* Pubsub */
// 字典,键为频道,值为链表
// 链表中保存了所有订阅某个频道的客户端
// 新客户端总是被添加到链表的表尾
dict *pubsub_channels; /* Map channels to list of subscribed clients */
// 这个链表记录了客户端订阅的所有模式的名字
list *pubsub_patterns; /* A list of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of REDIS_NOTIFY... flags. */
/* Cluster */
int cluster_enabled; /* Is cluster enabled? */
mstime_t cluster_node_timeout; /* Cluster node timeout. */
char *cluster_configfile; /* Cluster auto-generated config file name. */
struct clusterState *cluster; /* State of the cluster */
int cluster_migration_barrier; /* Cluster replicas migration barrier. */
/* Scripting */
// Lua 环境
lua_State *lua; /* The Lua interpreter. We use just one for all clients */
// 复制执行 Lua 脚本中的 Redis 命令的伪客户端
redisClient *lua_client; /* The "fake client" to query Redis from Lua */
// 当前正在执行 EVAL 命令的客户端,如果没有就是 NULL
redisClient *lua_caller; /* The client running EVAL right now, or NULL */
// 一个字典,值为 Lua 脚本,键为脚本的 SHA1 校验和
dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
// Lua 脚本的执行时限
mstime_t lua_time_limit; /* Script timeout in milliseconds */
// 脚本开始执行的时间
mstime_t lua_time_start; /* Start time of script, milliseconds time */
// 脚本是否执行过写命令
int lua_write_dirty; /* True if a write command was called during the
execution of the current script. */
// 脚本是否执行过带有随机性质的命令
int lua_random_dirty; /* True if a random command was called during the
execution of the current script. */
// 脚本是否超时
int lua_timedout; /* True if we reached the time limit for script
execution. */
// 是否要杀死脚本
int lua_kill; /* Kill the script if true. */
/* Assert & bug reporting */
char *assert_failed;
char *assert_file;
int assert_line;
int bug_report_start; /* True if bug report header was already logged. */
int watchdog_period; /* Software watchdog period in ms. 0 = off */
};
===========================================
初始化服务器的第一步:创建一个struct redisServer 类型的实例变量server作为服务器状态,并为各个属性设置默认值。
//redis.h
extern struct redisServer server;
初始化server变量由redis.c/initServerConfig函数完成:
void initServerConfig() {
int j;
// 服务器状态
// 设置服务器的运行 ID
getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
// 设置默认配置文件路径
server.configfile = NULL;
// 设置默认服务器频率
server.hz = REDIS_DEFAULT_HZ;
// 为运行 ID 加上结尾字符
server.runid[REDIS_RUN_ID_SIZE] = ‘\0‘;
// 设置服务器的运行架构
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
// 设置默认服务器端口号
server.port = REDIS_SERVERPORT;
server.tcp_backlog = REDIS_TCP_BACKLOG;
server.bindaddr_count = 0;
server.unixsocket = NULL;
server.unixsocketperm = REDIS_DEFAULT_UNIX_SOCKET_PERM;
server.ipfd_count = 0;
server.sofd = -1;
server.dbnum = REDIS_DEFAULT_DBNUM;
server.verbosity = REDIS_DEFAULT_VERBOSITY;
server.maxidletime = REDIS_MAXIDLETIME;
server.tcpkeepalive = REDIS_DEFAULT_TCP_KEEPALIVE;
server.active_expire_enabled = 1;
server.client_max_querybuf_len = REDIS_MAX_QUERYBUF_LEN;
server.saveparams = NULL;
server.loading = 0;
server.logfile = zstrdup(REDIS_DEFAULT_LOGFILE);
server.syslog_enabled = REDIS_DEFAULT_SYSLOG_ENABLED;
server.syslog_ident = zstrdup(REDIS_DEFAULT_SYSLOG_IDENT);
server.syslog_facility = LOG_LOCAL0;
server.daemonize = REDIS_DEFAULT_DAEMONIZE;
server.aof_state = REDIS_AOF_OFF;
server.aof_fsync = REDIS_DEFAULT_AOF_FSYNC;
server.aof_no_fsync_on_rewrite = REDIS_DEFAULT_AOF_NO_FSYNC_ON_REWRITE;
server.aof_rewrite_perc = REDIS_AOF_REWRITE_PERC;
server.aof_rewrite_min_size = REDIS_AOF_REWRITE_MIN_SIZE;
server.aof_rewrite_base_size = 0;
server.aof_rewrite_scheduled = 0;
server.aof_last_fsync = time(NULL);
server.aof_rewrite_time_last = -1;
server.aof_rewrite_time_start = -1;
server.aof_lastbgrewrite_status = REDIS_OK;
server.aof_delayed_fsync = 0;
server.aof_fd = -1;
server.aof_selected_db = -1; /* Make sure the first time will not match */
server.aof_flush_postponed_start = 0;
server.aof_rewrite_incremental_fsync = REDIS_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
server.pidfile = zstrdup(REDIS_DEFAULT_PID_FILE);
server.rdb_filename = zstrdup(REDIS_DEFAULT_RDB_FILENAME);
server.aof_filename = zstrdup(REDIS_DEFAULT_AOF_FILENAME);
server.requirepass = NULL;
server.rdb_compression = REDIS_DEFAULT_RDB_COMPRESSION;
server.rdb_checksum = REDIS_DEFAULT_RDB_CHECKSUM;
server.stop_writes_on_bgsave_err = REDIS_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR;
server.activerehashing = REDIS_DEFAULT_ACTIVE_REHASHING;
server.notify_keyspace_events = 0;
server.maxclients = REDIS_MAX_CLIENTS;
server.bpop_blocked_clients = 0;
server.maxmemory = REDIS_DEFAULT_MAXMEMORY;
server.maxmemory_policy = REDIS_DEFAULT_MAXMEMORY_POLICY;
server.maxmemory_samples = REDIS_DEFAULT_MAXMEMORY_SAMPLES;
server.hash_max_ziplist_entries = REDIS_HASH_MAX_ZIPLIST_ENTRIES;
server.hash_max_ziplist_value = REDIS_HASH_MAX_ZIPLIST_VALUE;
server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
server.zset_max_ziplist_entries = REDIS_ZSET_MAX_ZIPLIST_ENTRIES;
server.zset_max_ziplist_value = REDIS_ZSET_MAX_ZIPLIST_VALUE;
server.hll_sparse_max_bytes = REDIS_DEFAULT_HLL_SPARSE_MAX_BYTES;
server.shutdown_asap = 0;
server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
server.repl_timeout = REDIS_REPL_TIMEOUT;
server.repl_min_slaves_to_write = REDIS_DEFAULT_MIN_SLAVES_TO_WRITE;
server.repl_min_slaves_max_lag = REDIS_DEFAULT_MIN_SLAVES_MAX_LAG;
server.cluster_enabled = 0;
server.cluster_node_timeout = REDIS_CLUSTER_DEFAULT_NODE_TIMEOUT;
server.cluster_migration_barrier = REDIS_CLUSTER_DEFAULT_MIGRATION_BARRIER;
server.cluster_configfile = zstrdup(REDIS_DEFAULT_CLUSTER_CONFIG_FILE);
server.lua_caller = NULL;
server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
server.lua_client = NULL;
server.lua_timedout = 0;
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
server.loading_process_events_interval_bytes = (1024*1024*2);
// 初始化 LRU 时间
server.lruclock = getLRUClock();
// 初始化并设置保存条件
resetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
/* Replication related */
// 初始化和复制相关的状态
server.masterauth = NULL;
server.masterhost = NULL;
server.masterport = 6379;
server.master = NULL;
server.cached_master = NULL;
server.repl_master_initial_offset = -1;
server.repl_state = REDIS_REPL_NONE;
server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA;
server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY;
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY;
server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
server.master_repl_offset = 0;
/* Replication partial resync backlog */
// 初始化 PSYNC 命令所使用的 backlog
server.repl_backlog = NULL;
server.repl_backlog_size = REDIS_DEFAULT_REPL_BACKLOG_SIZE;
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
server.repl_backlog_off = 0;
server.repl_backlog_time_limit = REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT;
server.repl_no_slaves_since = time(NULL);
/* Client output buffer limits */
// 设置客户端的输出缓冲区限制
for (j = 0; j < REDIS_CLIENT_LIMIT_NUM_CLASSES; j++)
server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
/* Double constants initialization */
// 初始化浮点常量
R_Zero = 0.0;
R_PosInf = 1.0/R_Zero;
R_NegInf = -1.0/R_Zero;
R_Nan = R_Zero/R_Zero;
/* Command table -- we initiialize it here as it is part of the
* initial configuration, since command names may be changed via
* redis.conf using the rename-command directive. */
// 初始化命令表
// 在这里初始化是因为接下来读取 .conf 文件时可能会用到这些命令
server.commands = dictCreate(&commandTableDictType,NULL);
server.orig_commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop");
/* Slow log */
// 初始化慢查询日志
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
server.slowlog_max_len = REDIS_SLOWLOG_MAX_LEN;
/* Debugging */
// 初始化调试项
server.assert_failed = "<no assertion failed>";
server.assert_file = "<no file>";
server.assert_line = 0;
server.bug_report_start = 0;
server.watchdog_period = 0;
}
redis> redis-server --port 10086 //修改服务器运行端口号
redis> redis-server redis.conf //redis.conf文件
服务器在用initServerConfig函数初始化完server变量后,就开始载入用户给定的配置参数和配置文件,并根据用户设定的配置,对server变量相关属性的值进行修改。
initServerConfig函数主要负责初始化一般属性,而initServer函数主要负责初始化数据结构。
redis.c/initServer
void initServer() {
int j;
// 设置信号处理函数
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
// 设置 syslog
if (server.syslog_enabled) {
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
server.syslog_facility);
}
// 初始化并创建数据结构
server.current_client = NULL;
server.clients = listCreate();
server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.clients_paused = 0;
// 创建共享对象
createSharedObjects();
adjustOpenFilesLimit();
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
/* Open the TCP listening socket for the user commands. */
// 打开 TCP 监听端口,用于等待客户端的命令请求
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
exit(1);
/* Open the listening Unix domain socket. */
// 打开 UNIX 本地端口
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don‘t care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
server.unixsocketperm, server.tcp_backlog);
if (server.sofd == ANET_ERR) {
redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL,server.sofd);
}
/* Abort if there are no listening sockets at all. */
if (server.ipfd_count == 0 && server.sofd < 0) {
redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
exit(1);
}
/* Create the Redis databases, and initialize other internal state. */
// 创建并初始化数据库结构
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&setDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].eviction_pool = evictionPoolAlloc();
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
// 创建 PUBSUB 相关结构
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate();
listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
server.cronloops = 0;
server.rdb_child_pid = -1;
server.aof_child_pid = -1;
aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.dirty = 0;
resetServerStats();
/* A few stats we don‘t want to reset: server startup time, and peak mem. */
server.stat_starttime = time(NULL);
server.stat_peak_memory = 0;
server.resident_set_size = 0;
server.lastbgsave_status = REDIS_OK;
server.aof_last_write_status = REDIS_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;
updateCachedTime();
/* Create the serverCron() time event, that‘s our main way to process
* background operations. */
// 为 serverCron() 创建时间事件
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
redisPanic("Can‘t create the serverCron time event.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
// 为 TCP 连接关联连接应答(accept)处理器
// 用于接受并应答客户端的 connect() 调用
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
// 为本地套接字关联应答处理器
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
/* Open the AOF file if needed. */
// 如果 AOF 持久化功能已经打开,那么打开或创建一个 AOF 文件
if (server.aof_state == REDIS_AOF_ON) {
server.aof_fd = open(server.aof_filename,
O_WRONLY|O_APPEND|O_CREAT,0644);
if (server.aof_fd == -1) {
redisLog(REDIS_WARNING, "Can‘t open the append-only file: %s",
strerror(errno));
exit(1);
}
}
/* 32 bit instances are limited to 4GB of address space, so if there is
* no explicit limit in the user provided configuration we set a limit
* at 3 GB using maxmemory with ‘noeviction‘ policy‘. This avoids
* useless crashes of the Redis instance for out of memory. */
// 对于 32 位实例来说,默认将最大可用内存限制在 3 GB
if (server.arch_bits == 32 && server.maxmemory == 0) {
redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with ‘noeviction‘ policy now.");
server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
}
// 如果服务器以 cluster 模式打开,那么初始化 cluster
if (server.cluster_enabled) clusterInit();
// 初始化复制功能有关的脚本缓存
replicationScriptCacheInit();
// 初始化脚本系统
scriptingInit();
// 初始化慢查询功能
slowlogInit();
// 初始化 BIO 系统
bioInit();
}
在完成对server变量初始化后,服务器载入RDB文件或者AOF文件,还原数据库状态。
最后一步,服务器将打印出
redis> 21 Nov 22:43:49.084 * The server is now ready to accept connections on port 6379
并开始执行服务器的事件循环(loop)
标签:
原文地址:http://blog.csdn.net/yvhqbat/article/details/51513815