标签:释放 table 转换 date update 缓冲区 entry ash strerror
服务器执行写命令后,会将执行的写指令追加到 aof_buf 缓冲区:
struct redisServer{ // AOF 缓冲区 sds aof_buf; // ...... }
追加到 aof_buf 缓冲区的命令是按照一定的协议格式保存的,catAppendOnlyGenericCommand 函数负责将命令转换为协议格式。从这个函数的实现可以清楚的看出协议格式是如何生成的。其具体格式为:
以 SET msg "hello" 为例,生成的协议格式应为:*3\r\n$3\r\nSET\r\n$3\r\nmsg\r\n$5\r\nhello\r\n
/* * 根据传入的命令和命令参数,将它们还原成协议格式。 */ sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) { char buf[32]; int len, j; robj *o; // 重建命令的个数,格式为 *<count>\r\n // 例如 *3\r\n buf[0] = ‘*‘; len = 1+ll2string(buf+1,sizeof(buf)-1,argc); buf[len++] = ‘\r‘; buf[len++] = ‘\n‘; dst = sdscatlen(dst,buf,len); // 重建命令和命令参数,格式为 $<length>\r\n<content>\r\n // 例如 $3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nVALUE\r\n for (j = 0; j < argc; j++) { o = getDecodedObject(argv[j]); // 组合 $<length>\r\n buf[0] = ‘$‘; len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr)); buf[len++] = ‘\r‘; buf[len++] = ‘\n‘; dst = sdscatlen(dst,buf,len); // 组合 <content>\r\n dst = sdscatlen(dst,o->ptr,sdslen(o->ptr)); dst = sdscatlen(dst,"\r\n",2); decrRefCount(o); } // 返回重建后的协议内容 return dst; }
Redis服务器进程是一个事件循环,在每一个事件循环中,都会调用 flushAppendOnlyFile 来决定是否将 aof_buf 缓冲区中的数据写入AOF文件中,而 flushAppendOnlyFile 函数的行为取决于服务器配置 appendfsync 选项。
appendfsync 选项 | 函数行为 | 出现故障时丢失的数据量 |
always | aof_buf内容写入并同步到AOF文件 | 一个事件循环产生的命令数据 |
everysec | aof_buf内容写入AOF文件,每隔1秒同步AOF文件 | 一秒钟的命令数据 |
no | aof_buf内容写入AOF文件,何时同步由操作系统决定 | 上次同步AOF文件之后的所有命令数据 |
载入AOF文件时,服务器通过 createFakeClient 来创建一个伪客户端执行AOF文件中保存的写命令。
struct redisClient *createFakeClient(void) { struct redisClient *c = zmalloc(sizeof(*c)); selectDb(c,0); c->fd = -1; c->name = NULL; c->querybuf = sdsempty(); c->querybuf_peak = 0; c->argc = 0; c->argv = NULL; c->bufpos = 0; c->flags = 0; c->btype = REDIS_BLOCKED_NONE; /* * 将客户端设置为正在等待同步的附属节点,这样客户端就不会发送回复了。 */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; c->reply = listCreate(); c->reply_bytes = 0; c->obuf_soft_limit_reached_time = 0; c->watched_keys = listCreate(); c->peerid = NULL; listSetFreeMethod(c->reply,decrRefCountVoid); listSetDupMethod(c->reply,dupClientReplyValue); initClientMultiState(c); return c; }
AOF重写由 rewriteAppendOnlyFile 实现:
int rewriteAppendOnlyFile(char *filename) { dictIterator *di = NULL; dictEntry *de; rio aof; FILE *fp; char tmpfile[256]; int j; long long now = mstime(); /* * 创建临时文件 * 注意这里创建的文件名和 rewriteAppendOnlyFileBackground() 创建的文件名稍有不同 */ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); return REDIS_ERR; } // 初始化文件 io rioInitWithFile(&aof,fp); // 设置每写入 REDIS_AOF_AUTOSYNC_BYTES 字节 // 就执行一次 FSYNC // 防止缓存中积累太多命令内容,造成 I/O 阻塞时间过长 if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES); // 遍历所有数据库 for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; // 指向键空间 dict *d = db->dict; if (dictSize(d) == 0) continue; // 创建键空间迭代器 di = dictGetSafeIterator(d); if (!di) { fclose(fp); return REDIS_ERR; } /* * 首先写入 SELECT 命令,确保之后的数据会被插入到正确的数据库上 */ if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWriteBulkLongLong(&aof,j) == 0) goto werr; / * 遍历数据库所有键,并通过命令将它们的当前状态(值)记录到新 AOF 文件中 */ while((de = dictNext(di)) != NULL) { sds keystr; robj key, *o; long long expiretime; // 取出键 keystr = dictGetKey(de); // 取出值 o = dictGetVal(de); initStaticStringObject(key,keystr); // 取出过期时间 expiretime = getExpire(db,&key); /* * 如果键已经过期,那么跳过它,不保存 */ if (expiretime != -1 && expiretime < now) continue; /* Save the key and associated value * * 根据值的类型,选择适当的命令来保存值 */ if (o->type == REDIS_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkObject(&aof,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { if (rewriteListObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_SET) { if (rewriteSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_ZSET) { if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_HASH) { if (rewriteHashObject(&aof,&key,o) == 0) goto werr; } else { redisPanic("Unknown object type"); } /* Save the expire time * * 保存键的过期时间 */ if (expiretime != -1) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; // 写入 PEXPIREAT expiretime 命令 if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; } } // 释放迭代器 dictReleaseIterator(di); } // 冲洗并关闭新 AOF 文件 if (fflush(fp) == EOF) goto werr; if (aof_fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr; /* * 原子地改名,用重写后的新 AOF 文件覆盖旧 AOF 文件 */ if (rename(tmpfile,filename) == -1) { redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); return REDIS_ERR; } redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed"); return REDIS_OK; werr: fclose(fp); unlink(tmpfile); redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); if (di) dictReleaseIterator(di); return REDIS_ERR; }
int rewriteAppendOnlyFileBackground(void) { pid_t childpid; long long start; // 子进程 if ((childpid = fork()) == 0) { char tmpfile[256]; // 创建临时文件,并进行 AOF 重写 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { size_t private_dirty = zmalloc_get_private_dirty(); if (private_dirty) { redisLog(REDIS_NOTICE, "AOF rewrite: %zu MB of memory used by copy-on-write", private_dirty/(1024*1024)); } // 发送重写成功信号 exitFromChild(0); } else { // 发送重写失败信号 exitFromChild(1); } } else { // 父进程 // ...... } return REDIS_OK; }
AOF重写的操作是在服务器进程的周期操作函数 serverCron 中进行的,在 serverCron 函数中,服务器进程会接收子进程发来的信号(子进程的退出信号),当服务器进程检查到负责AOF重写的子进程退出时,会将AOF重写缓冲区的数据写入AOF文件末尾。
AOF重写缓冲区的存在是为了解决AOF重写产生的AOF文件与当前数据库状态不一致的问题。服务器在创建子进程进行AOF重写后,每执行一个写指令,不仅会将该写指令追加到 aof_buf 缓冲区,还会追加到 AOF重写缓冲区。所以当AOF重写结束后,只要将AOF重写缓冲区的数据追加到新的AOF文件中,就可以保证AOF文件保存的数据库状态和当前数据库状态一致。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { // ...... // 接收子进程发来的信号,非阻塞 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { int exitcode = WEXITSTATUS(statloc); int bysignal = 0; if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); // BGSAVE 执行完毕 if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); // BGREWRITEAOF 执行完毕 } else if (pid == server.aof_child_pid) { backgroundRewriteDoneHandler(exitcode,bysignal); } else { redisLog(REDIS_WARNING,"Warning, detected child with unmatched pid: %ld",(long)pid); } updateDictResizePolicy(); } // ...... } void backgroundRewriteDoneHandler(int exitcode, int bysignal) { // ...... // 将累积的重写缓存写入到临时文件中 // 这个函数调用的 write 操作会阻塞主进程 if (aofRewriteBufferWrite(newfd) == -1) { redisLog(REDIS_WARNING,"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); close(newfd); goto cleanup; } // ...... }
标签:释放 table 转换 date update 缓冲区 entry ash strerror