if (prepareClientToWrite(c) != REDIS_OK) return;prepareClientToWrite函数在每次发送响应时调用,并且要在向响应buffer添加数据之前。只有在返回REDIS_OK时,才能向响应buffer输出内容。在响应buffer为空时(固定buffer以及链表均为空)时,向事件循环注册写事件,回调函数是sendReplyToClient。
/* This function is called every time we are going to transmit new data
 * to the client. The behavior is the following:
 *
 * If the client should receive new data (normal clients will) the function
 * returns REDIS_OK, and make sure to install the write handler in our event
 * loop so that when the socket is writable new data gets written.
 *
 * If the client should not receive new data, because it is a fake client,
 * a master, a slave not yet online, or because the setup of the write handler
 * failed, the function returns REDIS_ERR.
 *
 * Typically gets called every time a reply is built, before adding more
 * data to the clients output buffers. If the function returns REDIS_ERR no
 * data should be appended to the output buffers. */
int prepareClientToWrite(redisClient *c) {
    if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
    if ((c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;
    if (c->fd <= 0) return REDIS_ERR; /* Fake client */
    // <MM>
    // 在没有向output buf输出之前,才可以注册write event handler
    // </MM>
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
    return REDIS_OK;
}	接下来,需要将响应内容添加到output buffer中。总体思路是,先尝试向固定buffer添加,添加失败的话,在尝试添加到响应链表。    if (obj->encoding == REDIS_ENCODING_RAW) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
            _addReplyObjectToList(c,obj);	先看一下添加到固定buffer的逻辑。这个函数很简单,主要检查两个地方,一是输出链表是否为空,二是buffer的剩余空间是否足够大。如果两个都满足,则可以添加到buffer。最后,拷贝内容,并更新bufpos(用于指向buffer中下一个空闲位置)。int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
    size_t available = sizeof(c->buf)-c->bufpos;
    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;
    /* If there already are entries in the reply list, we cannot
     * add anything more to the static buffer. */
    if (listLength(c->reply) > 0) return REDIS_ERR;
    /* Check that the buffer has enough space available for this string. */
    if (len > available) return REDIS_ERR;
    memcpy(c->buf+c->bufpos,s,len);
    c->bufpos+=len;
    return REDIS_OK;
}	接下来是添加到输出链表函数_addReplyObjectToList。逻辑也比较简单,就是链表追加的操作。如果链表为空的话,直接在尾部添加一个节点。redisClient->reply_bytes记录输出链表总大小,这里更新一下。    if (listLength(c->reply) == 0) {
        incrRefCount(o);
        listAddNodeTail(c->reply,o);
        c->reply_bytes += zmalloc_size_sds(o->ptr);
    }        /* Append to this object when possible. */
        if (tail->ptr != NULL &&
            sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
        {
            c->reply_bytes -= zmalloc_size_sds(tail->ptr);
            tail = dupLastObjectIfNeeded(c->reply);
            tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
            c->reply_bytes += zmalloc_size_sds(tail->ptr);
        }	如果超过REDIS_REPLY_CHUNK_BYTES,则新增一个节点,并追加到尾部。        else {
            incrRefCount(o);
            listAddNodeTail(c->reply,o);
            c->reply_bytes += zmalloc_size_sds(o->ptr);
        }	如果不限制客户端的响应buffer,那么大量的活跃客户端就会导致redis的内存爆掉(比如在操作频繁的实例上,打开了很多的monitor客户端)。所以需要限制响应buffer的大小,在超过限制时,关闭该客户端。_addReplyObjectToList的最后一行代码就是完成这个功能的。asyncCloseClientOnOutputBufferLimitReached(c);这个函数就是检查一下整个reply链表的内存大小是否超过软限制或硬限制。然后异步关闭该客户端(添加到链表,在serverCron进行关闭),之所以异步关闭,是因为没有立即停止请求处理,后续还有可能向响应buffer输出内容。具体的条件是大小超过硬限制,或者间隔n秒连续超过软限制。
/* Asynchronously close a client if soft or hard limit is reached on the
 * output buffer size. The caller can check if the client will be closed
 * checking if the client REDIS_CLOSE_ASAP flag is set.
 *
 * Note: we need to close the client asynchronously because this function is
 * called from contexts where the client can‘t be freed safely, i.e. from the
 * lower level functions pushing data inside the client output buffers. */
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
    // <MM>
    // assert不超过4GB - 64KB
    // </MM>
    redisAssert(c->reply_bytes < ULONG_MAX-(1024*64));
    if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
    if (checkClientOutputBufferLimits(c)) {
        sds client = catClientInfoString(sdsempty(),c);
        freeClientAsync(c);
        redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
        sdsfree(client);
    }
}	上面就是响应内容输出到buffer的过程,下面看一下发送buffer的过程。     while(c->bufpos > 0 || listLength(c->reply)) {
          …..
     }	循环内部首先是尝试发送固定buffer,totwritten记录一次sendReplyToClient函数发送数据的总大小。redisClient->sentlen在发送固定buffer和链表时是复用的。在发送固定buffer时,如果sentlen等于bufpos,则固定buffer发送完,接下来发送reply链表。         if (c->bufpos > 0) {
            // <MM>
            // buffer不空
            // </MM>
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;
            /* If the buffer was sent, set bufpos to zero to continue with
             * the remainder of the reply. */
            if (c->sentlen == c->bufpos) {
                c->bufpos = 0;
                c->sentlen = 0;
            }
        }	发送reply链表时,会从链表头部开始依次发送。首先,取头结点,计算对象的长度。            // <MM>
            // 将reply链表的头结点发送
            // </MM>
            o = listNodeValue(listFirst(c->reply));
            objlen = sdslen(o->ptr);
            objmem = zmalloc_size_sds(o->ptr);	如果节点内容为空,则直接将该节点删除。            if (objlen == 0) {
                listDelNode(c->reply,listFirst(c->reply));
                continue;
            }	接下来调用write系统调用,sentlen表示当前该对象已发送的长度。每次调用write发送尚未发送的内容(objlen - c->sentlen)。同样,这里需要更新totwritten。            nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;	如果sentlen==objlen,说明这个reply节点的数据发送完成,需要删除该节点。            /* If we fully sent the object on head go to the next one */
            if (c->sentlen == objlen) {
                listDelNode(c->reply,listFirst(c->reply));
                c->sentlen = 0;
                c->reply_bytes -= objmem;
            }	为了所有客户端的公平,redis限制每次事件循环中,每个客户端最大发送REDIS_MAX_WRITE_PER_EVENT(64KB)。但是,这也有例外,如果当前redis设置了最大内存,并且已经超过该限制,这里会尽量多得发送数据,以尽快释放空间。        /* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT
         * bytes, in a single threaded server it‘s a good idea to serve
         * other clients as well, even if a very large request comes from
         * super fast link that is always able to accept data (in real world
         * scenario think about ‘KEYS *‘ against the loopback interface).
         *
         * However if we are over the maxmemory limit we ignore that and
         * just deliver as much data as it is possible to deliver. */
        if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
            (server.maxmemory == 0 ||
             zmalloc_used_memory() < server.maxmemory)) break;	跳出while循环,首先会判断write是否出错。如果出错,记录日志并停止这个客户端。    if (nwritten == -1) {
        if (errno == EAGAIN) {
            nwritten = 0;
        } else {
            redisLog(REDIS_VERBOSE,
                "Error writing to client: %s", strerror(errno));
            freeClient(c);
            return;
        }
    }	如果发送出数据,则认为该客户端还是活跃的更新一下redisClient->lastinteraction(用于清理timeout的客户端)。       if (totwritten > 0) {
        /* For clients representing masters we don‘t count sending data
         * as an interaction, since we always send REPLCONF ACK commands
         * that take some time to just fill the socket output buffer.
         * We just rely on data / pings received for timeout detection. */
        if (!(c->flags & REDIS_MASTER)) c->lastinteraction = server.unixtime;
    }	最后,判断一下响应内容是不是全部发送完。如果发送完,则将删除写事件。否则,下一轮事件循环继续发送响应内容,指导完成为止。    // <MM>
    // 当client的响应buf输出完毕后,删除对应的write event
    // </MM>
    if (c->bufpos == 0 && listLength(c->reply) == 0) {
        c->sentlen = 0;
        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
        /* Close connection after entire reply has been sent. */
        if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c);
    }	请求和响应的过程已经完成,下一篇介绍一下AOF。原文地址:http://blog.csdn.net/chosen0ne/article/details/43308387