Redis定义了一个叫做sdshdr(SDS or simple dynamic string)的数据结构。SDS不仅用于 保存字符串,还用来当做缓冲区,例如AOF缓冲区或输入缓冲区等。如下所示,整数len和free分别表示buf数组中已使用的长度和剩余可用的长度,buf是一个原生C字符串,以\0结尾。
sds就是sdshdr中char buf[]的别名,后面能看到,各种操作函数的入参和返回值都是sds而非sdshdr。那sdshdr中的len和free如何访问到?不然不就白定义了吗?答案就是灵活的指针,各种SDS的函数都是:struct sdshdr *sh = (void *)(s - (sizeof(struct sdshdr))),通过指针偏移计算出sdshdr的首地址。
关于字符串末尾的“\0”始终要注意:len和free中都没把“\0”算进去,例如为字符串”hello”创建sds,最终buf总长度是6,len为5,free为0。
/* 类型别名,用于指向 sdshdr 的 buf 属性 */
typedef char *sds;
/* 保存字符串对象的结构 */
struct sdshdr {
// buf 中已占用空间的长度
int len;
// buf 中剩余可用空间的长度
int free;
// 数据空间
char buf[];
};
有了SDS就可以统一接口,所有字符串函数的操作对象都是SDS而非原生的C字符串。下面分析一些典型的字符串操作,看看SDS是如何实现的。代码注释来自于huangz所著Redis 3.0详细注释版。
Redis内部使用sdsnew()函数,并在其基础上提供了sdsnew()和sdsempty()两个函数。在sdsnew()中,若传入字符串init若为NULL,则创建一个buf只有1个字节的空SDS,否则buf长度是字符串长度加1。len置为字符串长度initlen,free置为0。注意最终返回buf部分,而不是整个sdshdr,这在前面介绍过的sdshdr.h中有定义:typedef char *sds。
/* 根据给定字符串 init ,创建一个包含同样字符串的 sds */
sds sdsnew(const char *init) {
size_t initlen = (init == NULL) ? 0 : strlen(init);
return sdsnewlen(init, initlen);
}
/* 创建并返回一个只保存了空字符串 "" 的 sds */
sds sdsempty(void) {
return sdsnewlen("", 0);
}
/* 根据给定的初始化字符串 init 和字符串长度 initlen 创建一个新的 sds */
sds sdsnewlen(const void *init, size_t initlen) {
struct sdshdr *sh;
// 根据是否有初始化内容,选择适当的内存分配方式
// T = O(N)
if (init) {
// zmalloc 不初始化所分配的内存
sh = zmalloc(sizeof(struct sdshdr)+initlen+1);
} else {
// zcalloc 将分配的内存全部初始化为 0
sh = zcalloc(sizeof(struct sdshdr)+initlen+1);
}
// 内存分配失败,返回
if (sh == NULL) return NULL;
// 设置初始化长度
sh->len = initlen;
// 新 sds 不预留任何空间
sh->free = 0;
// 如果有指定初始化内容,将它们复制到 sdshdr 的 buf 中
// 以 \0 结尾,T = O(N)
if (initlen && init)
memcpy(sh->buf, init, initlen);
sh->buf[initlen] = ‘\0‘;
// 返回 buf 部分,而不是整个 sdshdr
return (char*)sh->buf;
}
Redis需要的是一个可以修改的字符串值,所以SDS是可变的。当对SDS中的字符串修改时,长度难免会长了或者短了,这时就要根据需要对SDS进行扩展。Redis的算法比较简单,甚至与Java中ArrayList有些相像。空间不足时重新分配一块大小翻倍的内存,缩小时则维持不变。有一点区别是:在Redis中,如果要扩展的长度超过1MB,那么只会额外分配1MB而不会翻倍。比如修改一个字符串到10MB了,那最终SDS占用空间只是 10MB + 1MB + 1B(“\0”)。
/* 最大预分配长度 */
#define SDS_MAX_PREALLOC (1024*1024)
/* 对 sds 中 buf 的长度进行扩展,确保在函数执行之后,
buf 至少会有 addlen + 1 长度的空余空间(额外的 1 字节是为 \0 准备的)*/
sds sdsMakeRoomFor(sds s, size_t addlen) {
struct sdshdr *sh, *newsh;
// 获取 s 目前的空余空间长度
size_t free = sdsavail(s);
size_t len, newlen;
// s 目前的空余空间已经足够,无须再进行扩展,直接返回
if (free >= addlen) return s;
// 获取 s 目前已占用空间的长度
len = sdslen(s);
sh = (void*) (s-(sizeof(struct sdshdr)));
// s 最少需要的长度
newlen = (len + addlen);
// 根据新长度,为 s 分配新空间所需的大小
if (newlen < SDS_MAX_PREALLOC)
// 如果新长度小于 SDS_MAX_PREALLOC,那么为它分配两倍于所需长度的空间
newlen *= 2;
else
// 否则,分配长度为目前长度加上 SDS_MAX_PREALLOC
newlen += SDS_MAX_PREALLOC;
// T = O(N)
newsh = zrealloc(sh, sizeof(struct sdshdr) + newlen + 1);
// 内存不足,分配失败,返回
if (newsh == NULL) return NULL;
// 更新 sds 的空余长度
newsh->free = newlen - len;
// 返回 sds
return newsh->buf;
}
/* 返回 sds 实际保存的字符串的长度,T = O(1) */
static inline size_t sdslen(const sds s) {
struct sdshdr *sh = (void*)(s - (sizeof(struct sdshdr)));
return sh->len;
}
/* 返回 sds 可用空间的长度,T = O(1) */
static inline size_t sdsavail(const sds s) {
struct sdshdr *sh = (void*)(s - (sizeof(struct sdshdr)));
return sh->free;
}
类似ArrayList的shrink方法,SDS也能收缩,重新分配一块仅仅能容纳字符串的空间。
/* 回收 sds 中的空闲空间,回收不会对 sds 中保存的字符串内容做任何修改 */
sds sdsRemoveFreeSpace(sds s) {
struct sdshdr *sh;
sh = (void*) (s-(sizeof(struct sdshdr)));
// 进行内存重分配,让 buf 的长度仅仅足够保存字符串内容,T = O(N)
sh = zrealloc(sh, sizeof(struct sdshdr)+sh->len+1);
// 空余空间为 0
sh->free = 0;
return sh->buf;
}
SDS也有类似原生C字符串的拼接、拷贝等函数。这里就不一一列举了。
C标准库的strlen函数的时间复杂度是O(n),因为它要遍历到字符串末尾的“\0”才会停止计数,将当前计数器的值作为字符串的长度返回。这在要求高性能的Redis中是无法容忍的。手头正好有glibc-2.15的源码,打开string文件夹下的strlen.c,发现前面的说法并不准确,标准库还是尽可能做了优化,总的思想就是利用数据对齐做快速的“字”查找:
/* Return the length of the null-terminated string STR. Scan for
the null terminator quickly by testing four bytes at a time. */
size_t
strlen (str)
const char *str;
{
const char *char_ptr;
const unsigned long int *longword_ptr;
unsigned long int longword, himagic, lomagic;
/* Handle the first few characters by reading one character at a time.
Do this until CHAR_PTR is aligned on a longword boundary. */
for (char_ptr = str;
((unsigned long int) char_ptr & (sizeof (longword) - 1)) != 0;
++char_ptr)
if (*char_ptr == ‘\0‘)
return char_ptr - str;
/* All these elucidatory comments refer to 4-byte longwords,
but the theory applies equally well to 8-byte longwords. */
longword_ptr = (unsigned long int *) char_ptr;
/* Bits 31, 24, 16, and 8 of this number are zero. Call these bits
the "holes." Note that there is a hole just to the left of
each byte, with an extra at the end:
bits: 01111110 11111110 11111110 11111111
bytes: AAAAAAAA BBBBBBBB CCCCCCCC DDDDDDDD
The 1-bits make sure that carries propagate to the next 0-bit.
The 0-bits provide holes for carries to fall into. */
himagic = 0x80808080L;
lomagic = 0x01010101L;
if (sizeof (longword) > 4)
{
/* 64-bit version of the magic. */
/* Do the shift in two steps to avoid a warning if long has 32 bits. */
himagic = ((himagic << 16) << 16) | himagic;
lomagic = ((lomagic << 16) << 16) | lomagic;
}
if (sizeof (longword) > 8)
abort ();
/* Instead of the traditional loop which tests each character,
we will test a longword at a time. The tricky part is testing
if *any of the four* bytes in the longword in question are zero. */
for (;;)
{
longword = *longword_ptr++;
if (((longword - lomagic) & ~longword & himagic) != 0)
{
/* Which of the bytes was the zero? If none of them were, it was
a misfire; continue the search. */
const char *cp = (const char *) (longword_ptr - 1);
if (cp[0] == 0)
return cp - str;
if (cp[1] == 0)
return cp - str + 1;
if (cp[2] == 0)
return cp - str + 2;
if (cp[3] == 0)
return cp - str + 3;
if (sizeof (longword) > 4)
{
if (cp[4] == 0)
return cp - str + 4;
if (cp[5] == 0)
return cp - str + 5;
if (cp[6] == 0)
return cp - str + 6;
if (cp[7] == 0)
return cp - str + 7;
}
}
}
}
SDS中的buf并没有“标新立异”,而是老老实实地按照原生C字符串的要求保存,末尾放置“\0”。这样做最大的好处就是标准库中许许多多好用的字符串操作函数又都可以重用了,避免这繁琐的工作(就算Redis作者有代码“洁癖”,也不会到标准库都不用的程度吧…)。
缓冲区溢出问题可是个安全界的老问题了。因为C字符串不记录自身长度,所以strcat()一旦拷贝过多的字节就会导致原始字符串空间被覆盖。考虑下面的例子,字符串s1为”Redis”,字符串s2为”MongoDB”,拼接”hello”到s1末尾时会发生什么?
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
char s1[] = "Redis";
char s2[] = "MongoDB";
int main(int argc, char const *argv[])
{
int s2len, i;
// Save original length of s2
s2len = strlen(s2);
// Cancat to s1
strcat(s1, "hello");
// Print s1 and s2
printf("s1 = [%s]\n", s1);
printf("s2 = [");
for (i = 0; i < s2len; ++i)
printf("%c", s2[i]);
printf("]\n");
return 0;
}
因为s1和s2编译链接后是相邻的(保存在.data段),所以拼接”hello”到s1末尾时会导致s1溢出,覆盖了s2的部分内容。最终打印出的s1和s2就是下面这个样子:
[root@vm Temp]# gcc -o bufoverflow bufoverflow.c
[root@vm Temp]# ./bufoverflow
s1 = [Redishello]
s2 = [elloDB]
这里顺带复习一个关于字符串常量和数组的基础知识。当我们将s1从char s1[]改为char *s1再编译运行时,就会报错异常终止,这又是为什么?对比bufoverflow-old(s1是数组)和bufoverflow(s1是指针),通过objdump查看.data段,发现char *的s1并没有保存在.data里,而是放在了.rodata中,因为char *是字符串常量不允许修改。所以 修改只读段.rodata导致了段错误。但是当我们对可读写区域,如各种buf数据,“搞破坏”时错误就悄悄发生了:
[root@vm Temp]# ./buffoverflow
Segmentation fault (core dumped)
[root@vm Temp]# objdump -j .data -s bufoverflow-old
Contents of section .data:
600a40 00000000 52656469 73004d6f 6e676f44 ....Redis.MongoD
600a50 42000000 B...
[root@vm Temp]# objdump -j .data -s bufoverflow
Contents of section .data:
600a58 00000000 00000000 88074000 00000000 ..........@.....
600a68 4d6f6e67 6f444200 MongoDB.
[root@BC-VM-edce4ac67d304079868c0bb265337bd4 Temp]# objdump -s bufoverflow
...
Contents of section .rodata:
400778 01000200 00000000 00000000 00000000 ................
400788 52656469 73006865 6c6c6f00 7331203d Redis.hello.s1 =
400798 205b2573 5d0a0073 32203d20 5b005d00 [%s]..s2 = [.].
...
前面在[1.2.2 扩展SDS空间](###1.2.2 扩展SDS空间)中分析过,新建的SDS会分配能恰好容纳字符串的空间。但后续对字符串修改导致SDS扩容时,Redis会为其留出一部分的预留空间。这样就避免了频繁调用zmalloc()和zfree()。
C字符串因为以末尾的“\0”空字符作为标记,所以C字符串只能保存文本数据。若用来保存图片、音视频、压缩文件等二进制数据的话,若其中含有哪怕一个空字符,就会导致错误。尽管Redis重用C标准库中字符串的基本操作函数,但为了不限制自身的应用场景,Redis保证SDS中的字符串是二进制安全的,即可以保存各种二进制数据的。
SDS中已经用单独的变量len保存了字符串的实际长度,所以解决方法就很简单了:所有SDS的API都使用len属性的值而不是“\0”来判断字符串是否结束。也就是说不管SDS的buf中保存的是不是普通文本数据,Redis都会假设它是二进制数据,以最安全的方式处理它们。同时,当SDS保存的确实是文本数据时,Redis还能享受到C标准库带来的便捷。
redisServer结构中保存了当前Redis实例的所有全局状态,其中一项aof_buf就是用作AOF缓冲区的SDS。
struct redisServer {
/* General */
...
// 数据库
redisDb *db;
// 事件状态
aeEventLoop *el;
...
/* Networking */
...
/* Fields used only for stats */
...
/* slowlog */
...
/* Configuration */
...
/* AOF persistence */
...
// AOF 缓冲区
sds aof_buf; /* AOF buffer, written before entering the event loop */
...
/* RDB persistence */
...
/* Logging */
...
/* Replication (master) */
...
/* Replication (slave) */
...
/* Replication script cache. */
...
/* Pubsub */
...
/* Cluster */
...
/* Scripting */
...
/* Assert & bug reporting */
...
};
在表示客户端连接的redisClient结构中,有一个叫做queryBuf的SDS属性,保存的就是来自远程客户端的命令请求。因为一个redisClient反复使用这一个缓冲区,所以SDS会根据每次请求的内容动态地缩小或扩大。
/* 因为 I/O 复用的缘故,需要为每个客户端维持一个状态。
多个客户端状态被服务器用链表连接起来。*/
typedef struct redisClient {
// 套接字描述符
int fd;
// 当前正在使用的数据库
redisDb *db;
// 当前正在使用的数据库的 id (号码)
int dictid;
// 客户端的名字
robj *name; /* As set by CLIENT SETNAME */
// 查询缓冲区
sds querybuf;
...
} redisClient;
下面简单分析一下Redis是如何使用查询缓冲区SDS的。首先,main()函数中调用initServer()初始化epoll创建连接的事件函数(都在redis.c中)acceptTcpHandler()负责接收TCP请求,它调用acceptCommonHandler()->createClient()创建出redisClient(在networking.c中)。因为采用的是异步的epoll机制,所以此时,queryBuf属性只是初始化为sdsempty(),同时绑定读事件处理函数readQueryFromClient(),这才是真正解析请求的地方!
/* 读取客户端的查询缓冲区内容 */
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
int nread, readlen;
size_t qblen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
// 设置服务器的当前客户端
server.current_client = c;
// 读入长度(默认为 16 MB)
readlen = REDIS_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= REDIS_MBULK_BIG_ARG)
{
int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}
// 获取查询缓冲区当前内容的长度
// 如果读取出现 short read ,那么可能会有内容滞留在读取缓冲区里面
// 这些滞留内容也许不能完整构成一个符合协议的命令,
qblen = sdslen(c->querybuf);
// 如果有需要,更新缓冲区内容长度的峰值(peak)
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
// 为查询缓冲区分配空间
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// 读入内容到查询缓存
nread = read(fd, c->querybuf+qblen, readlen);
...
}
原文地址:http://blog.csdn.net/dc_726/article/details/46270921