if (prepareClientToWrite(c) != REDIS_OK) return;prepareClientToWrite函数在每次发送响应时调用,并且要在向响应buffer添加数据之前。只有在返回REDIS_OK时,才能向响应buffer输出内容。在响应buffer为空时(固定buffer以及链表均为空)时,向事件循环注册写事件,回调函数是sendReplyToClient。
/* This function is called every time we are going to transmit new data * to the client. The behavior is the following: * * If the client should receive new data (normal clients will) the function * returns REDIS_OK, and make sure to install the write handler in our event * loop so that when the socket is writable new data gets written. * * If the client should not receive new data, because it is a fake client, * a master, a slave not yet online, or because the setup of the write handler * failed, the function returns REDIS_ERR. * * Typically gets called every time a reply is built, before adding more * data to the clients output buffers. If the function returns REDIS_ERR no * data should be appended to the output buffers. */ int prepareClientToWrite(redisClient *c) { if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK; if ((c->flags & REDIS_MASTER) && !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR; if (c->fd <= 0) return REDIS_ERR; /* Fake client */ // <MM> // 在没有向output buf输出之前,才可以注册write event handler // </MM> if (c->bufpos == 0 && listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR) return REDIS_ERR; return REDIS_OK; }接下来,需要将响应内容添加到output buffer中。总体思路是,先尝试向固定buffer添加,添加失败的话,在尝试添加到响应链表。
if (obj->encoding == REDIS_ENCODING_RAW) { if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) _addReplyObjectToList(c,obj);先看一下添加到固定buffer的逻辑。这个函数很简单,主要检查两个地方,一是输出链表是否为空,二是buffer的剩余空间是否足够大。如果两个都满足,则可以添加到buffer。最后,拷贝内容,并更新bufpos(用于指向buffer中下一个空闲位置)。
int _addReplyToBuffer(redisClient *c, char *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos; if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK; /* If there already are entries in the reply list, we cannot * add anything more to the static buffer. */ if (listLength(c->reply) > 0) return REDIS_ERR; /* Check that the buffer has enough space available for this string. */ if (len > available) return REDIS_ERR; memcpy(c->buf+c->bufpos,s,len); c->bufpos+=len; return REDIS_OK; }接下来是添加到输出链表函数_addReplyObjectToList。逻辑也比较简单,就是链表追加的操作。如果链表为空的话,直接在尾部添加一个节点。redisClient->reply_bytes记录输出链表总大小,这里更新一下。
if (listLength(c->reply) == 0) { incrRefCount(o); listAddNodeTail(c->reply,o); c->reply_bytes += zmalloc_size_sds(o->ptr); }
/* Append to this object when possible. */ if (tail->ptr != NULL && sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES) { c->reply_bytes -= zmalloc_size_sds(tail->ptr); tail = dupLastObjectIfNeeded(c->reply); tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr)); c->reply_bytes += zmalloc_size_sds(tail->ptr); }如果超过REDIS_REPLY_CHUNK_BYTES,则新增一个节点,并追加到尾部。
else { incrRefCount(o); listAddNodeTail(c->reply,o); c->reply_bytes += zmalloc_size_sds(o->ptr); }如果不限制客户端的响应buffer,那么大量的活跃客户端就会导致redis的内存爆掉(比如在操作频繁的实例上,打开了很多的monitor客户端)。所以需要限制响应buffer的大小,在超过限制时,关闭该客户端。_addReplyObjectToList的最后一行代码就是完成这个功能的。
/* Asynchronously close a client if soft or hard limit is reached on the * output buffer size. The caller can check if the client will be closed * checking if the client REDIS_CLOSE_ASAP flag is set. * * Note: we need to close the client asynchronously because this function is * called from contexts where the client can‘t be freed safely, i.e. from the * lower level functions pushing data inside the client output buffers. */ void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) { // <MM> // assert不超过4GB - 64KB // </MM> redisAssert(c->reply_bytes < ULONG_MAX-(1024*64)); if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return; if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(),c); freeClientAsync(c); redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client); sdsfree(client); } }上面就是响应内容输出到buffer的过程,下面看一下发送buffer的过程。
while(c->bufpos > 0 || listLength(c->reply)) { ….. }循环内部首先是尝试发送固定buffer,totwritten记录一次sendReplyToClient函数发送数据的总大小。redisClient->sentlen在发送固定buffer和链表时是复用的。在发送固定buffer时,如果sentlen等于bufpos,则固定buffer发送完,接下来发送reply链表。
if (c->bufpos > 0) { // <MM> // buffer不空 // </MM> nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; /* If the buffer was sent, set bufpos to zero to continue with * the remainder of the reply. */ if (c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; } }发送reply链表时,会从链表头部开始依次发送。首先,取头结点,计算对象的长度。
// <MM> // 将reply链表的头结点发送 // </MM> o = listNodeValue(listFirst(c->reply)); objlen = sdslen(o->ptr); objmem = zmalloc_size_sds(o->ptr);如果节点内容为空,则直接将该节点删除。
if (objlen == 0) { listDelNode(c->reply,listFirst(c->reply)); continue; }接下来调用write系统调用,sentlen表示当前该对象已发送的长度。每次调用write发送尚未发送的内容(objlen - c->sentlen)。同样,这里需要更新totwritten。
nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten;如果sentlen==objlen,说明这个reply节点的数据发送完成,需要删除该节点。
/* If we fully sent the object on head go to the next one */ if (c->sentlen == objlen) { listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; c->reply_bytes -= objmem; }为了所有客户端的公平,redis限制每次事件循环中,每个客户端最大发送REDIS_MAX_WRITE_PER_EVENT(64KB)。但是,这也有例外,如果当前redis设置了最大内存,并且已经超过该限制,这里会尽量多得发送数据,以尽快释放空间。
/* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT * bytes, in a single threaded server it‘s a good idea to serve * other clients as well, even if a very large request comes from * super fast link that is always able to accept data (in real world * scenario think about ‘KEYS *‘ against the loopback interface). * * However if we are over the maxmemory limit we ignore that and * just deliver as much data as it is possible to deliver. */ if (totwritten > REDIS_MAX_WRITE_PER_EVENT && (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory)) break;跳出while循环,首先会判断write是否出错。如果出错,记录日志并停止这个客户端。
if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; } else { redisLog(REDIS_VERBOSE, "Error writing to client: %s", strerror(errno)); freeClient(c); return; } }如果发送出数据,则认为该客户端还是活跃的更新一下redisClient->lastinteraction(用于清理timeout的客户端)。
if (totwritten > 0) { /* For clients representing masters we don‘t count sending data * as an interaction, since we always send REPLCONF ACK commands * that take some time to just fill the socket output buffer. * We just rely on data / pings received for timeout detection. */ if (!(c->flags & REDIS_MASTER)) c->lastinteraction = server.unixtime; }最后,判断一下响应内容是不是全部发送完。如果发送完,则将删除写事件。否则,下一轮事件循环继续发送响应内容,指导完成为止。
// <MM> // 当client的响应buf输出完毕后,删除对应的write event // </MM> if (c->bufpos == 0 && listLength(c->reply) == 0) { c->sentlen = 0; aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c); }请求和响应的过程已经完成,下一篇介绍一下AOF。