本文将通过源代码对 Redis 的 zset(sorted set)的实现原理进行分析。
Redis 源码在:https://github.com/redis/redis。
xxxxxxxxxx
...
Inside server.c you can find code that handles other vital things of the Redis server:
* ...
* ...
* ...
* The global variable `redisCommandTable` defines all the Redis commands, specifying the name of the command, the function implementing the command, the number of arguments required, and other properties of each command.
...
src/server.c 源文件中定义的全局变量 redisCommandTable
用于定义所有 Redis 命令,指定命令的名称、实现命令的函数、命令需要的参数的个数、命令的其它属性。
x
struct redisCommand redisCommandTable[] = {
{"zadd",zaddCommand,-4,
"write use-memory fast @sortedset",
0,NULL,1,1,1,0,0,0},
};
接下来都将以 zadd 命令为例进行说明,zadd 命令的实现函数是 zaddCommand(),它的声明在 src/server.h 头文件中:
x
/* Compact list data structure */
/* Sorted set object. */
/* Raw representation */
/* Encoded as ziplist */
/* Encoded as skiplist */
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
robj *createObject(int type, void *ptr);
robj *createZsetObject(void);
robj *createZsetZiplistObject(void);
void zaddCommand(client *c);
如果对 Makefile 不了解,可以先阅读:http://timd.cn/makefile/
x
REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
REDIS_SERVER_OBJ=... object.o t_zset.o ...
all: $(REDIS_SERVER_NAME) ...
# redis-server
$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ)
$(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a $(FINAL_LIBS)
%.o: %.c .make-prerequisites
$(REDIS_CC) -MMD -o $@ -c $<
通过 Makefile 可以看出与 zset 相关的命令由 src/t_zset.c 源文件中定义的函数实现。
x
void zaddGenericCommand(client *c, int flags) {
/* 该循环退出后,scoreidx 被设置为第一个 score-element 对的 score 部分在参数列表中的索引 */
/* Parse options. At the end 'scoreidx' is set to the argument position
* of the score of the first score-element pair. */
scoreidx = 2;
while(scoreidx < c->argc) {
scoreidx++;
}
/* 从 db 中查找 sorted set */
zobj = lookupKeyWrite(c->db,key);
/* 如果 sorted set 不存在,则创建它 */
if (zobj == NULL) {
/* 如果 zset_max_ziplist_entries 选项被设置为 0,或
* 第一个元素的长度大于 zset_max_ziplist_value 选项,
* 则创建使用 Skiplist 编码的 sorted set;
* 否则,创建使用 Ziplist 编码的 sorted set
*/
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetZiplistObject();
}
/* 将新创建的 sorted set 即 zobj 对象添加到数据库中 */
dbAdd(c->db,key,zobj);
}
/* 调用 zsetAdd 将所有 score-element 对添加到 sorted set 中 */
for (j = 0; j < elements; j++) {
double newscore;
score = scores[j];
int retflags = flags;
ele = c->argv[scoreidx+1+j*2]->ptr;
int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
}
}
/* zaddCommand() 是 zadd 命令的实现函数,它调用 zaddGenericCommand() */
void zaddCommand(client *c) {
zaddGenericCommand(c,ZADD_NONE);
}
createZsetObject() 和 createZsetZiplistObject() 的实现在 src/object.c 源文件中:
x
robj *createObject(int type, void *ptr) {
robj *o = zmalloc(sizeof(*o));
o->type = type;
o->encoding = OBJ_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;
/* Set the LRU to the current lruclock (minutes resolution), or
* alternatively the LFU counter. */
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
} else {
o->lru = LRU_CLOCK();
}
return o;
}
robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
robj *o;
zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST;
return o;
}
robj *createZsetZiplistObject(void) {
unsigned char *zl = ziplistNew();
robj *o = createObject(OBJ_ZSET,zl);
o->encoding = OBJ_ENCODING_ZIPLIST;
return o;
}
可见当使用 Skiplist 编码时,用到了 dict 和 Skiplist;当使用 Ziplist 编码时,用到了 Ziplist。后面将对 Skiplist 和 Ziplist 进行分析。
接下来看 zsetAdd() 的流程:
x
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
/* Turn options into simple to check vars. */
int incr = (*flags & ZADD_INCR) != 0;
int nx = (*flags & ZADD_NX) != 0;
int xx = (*flags & ZADD_XX) != 0;
int gt = (*flags & ZADD_GT) != 0;
int lt = (*flags & ZADD_LT) != 0;
*flags = 0; /* We'll return our response flags. */
double curscore;
/* NaN as input is an error regardless of all the other parameters. */
if (isnan(score)) {
*flags = ZADD_NAN;
return 0;
}
/* 根据 sorted set 的编码更新它 */
/* Update the sorted set according to its encoding. */
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
/* 下面是 Ziplist 编码的 sorted set 的更新流程 */
/* if 语句块中是元素已经在 sorted set 中的情况 */
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
/* 如果设置了 nx 标记,则返回,因为相同的元素已经存在 */
/* NX? Return, same element already exists. */
if (nx) {
*flags |= ZADD_NOP;
return 1;
}
/* 如果设置了 incr 标记,那么计算新 score */
/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
if (newscore) *newscore = score;
}
/* 当 score 有变动时,移除、重新插入元素 */
/* Remove and re-insert when score changed. */
if (score != curscore &&
/* LT? Only update if score is less than current. */
(!lt || score < curscore) &&
/* GT? Only update if score is greater than current. */
(!gt || score > curscore))
{
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*flags |= ZADD_UPDATED;
}
return 1;
/* 更新已存在的元素的分值时,Ziplist 的长度不会发生改变,
* 所以不会触发转换成 SkipList 的操作
*/
/* 如果元素不存在,且未设置 xx 标记,则插入元素 */
} else if (!xx) {
/* 将元素和 score 插入到 Ziplist */
/* Optimize: check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
/* 如果 Ziplist 中的元素数超过 zset_max_ziplist_entries 选项,
* 或新插入的元素的长度超过 zset_max_ziplist_value 选项,
* 则将 sorted set 转换成 SkipList 编码
*/
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
sdslen(ele) > server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*flags |= ZADD_ADDED;
return 1;
/* 否则,直接返回 */
} else {
*flags |= ZADD_NOP;
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;
/* 通过哈希表判断元素是否已经在 sorted set 中 */
de = dictFind(zs->dict,ele);
/* 下面是元素已经在 sorted set 中的情况,与使用 Ziplist 编码的情况类似 */
if (de != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*flags |= ZADD_NOP;
return 1;
}
curscore = *(double*)dictGetVal(de);
/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
if (newscore) *newscore = score;
}
/* Remove and re-insert when score changes. */
if (score != curscore &&
/* LT? Only update if score is less than current. */
(!lt || score < curscore) &&
/* GT? Only update if score is greater than current. */
(!gt || score > curscore))
{
znode = zslUpdateScore(zs->zsl,curscore,ele,score);
/* Note that we did not removed the original element from
* the hash table representing the sorted set, so we just
* update the score. */
dictGetVal(de) = &znode->score; /* Update score ptr. */
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
/* 复制元素 */
ele = sdsdup(ele);
/* 将 score-ele 对插入到跳表中 */
znode = zslInsert(zs->zsl,score,ele);
/* 将 ele 和 score 之间的映射关系保存到哈希表中 */
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
*flags |= ZADD_ADDED;
if (newscore) *newscore = score;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* Never reached. */
}
综上得出:
sorted set 的编码可以是 Skiplist,也可以是 Ziplist。并且只有当下面的两个条件都满足时,才会使用 Ziplist 编码:
如果 sorted set 的编码是 Ziplist,并且新插入的元素导致上面的两个条件不再同时满足,那么 Redis 会将其编码转换成 SkipList。
如果对跳表不了解,可以先阅读:http://timd.cn/data-structure/skiplist/
以下是 src/server.h 中定义的两个结构体:
xxxxxxxxxx
/* Dynamic safe strings */
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned long span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
其中:
zskiplistNode 是跳表的节点:
ele 表示跳表中的元素
score 表示元素的分值,跳表根据该值对节点进行排序
backward 指向前一个节点,头节点和第一节点的前一个节点是 NULL
数组 level:
level 的长度表示节点占据的高度,头节点的高度是 ZSKIPLIST_MAXLEVEL,其它节点的高度在 1 和 ZSKIPLIST_MAXLEVEL 之间(包含 1 和 ZSKIPLIST_MAXLEVEL)
分量 level[i] 包含 2 个域:
zskiplist 是跳表:
下面是一个Redis SkipList 的示例(未画 backward 指针):
下面分析几个与跳表有关的操作:
1,创建跳表:
xxxxxxxxxx
/* Create a skiplist node with the specified number of levels.
* The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}
/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
/* 创建头节点,并对其初始化 */
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
其中:
2,向跳表中插入新节点:
xxxxxxxxxx
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* Redis 的 Skiplist 允许不同元素具有相同的分值,并且可以保证不会重复插入相同的元素,
* 因为 zslInsert() 的调用方应该在插入元素之前检测其是否在哈希表中。
*/
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
其中:
下面对 zslInsert() 的代码进行分析:
从最高层向下,逐层寻找每层的插入位置,并计算插入位置到头节点的距离
随机生成新节点占据的层数 level(1 到 ZSKIPLIST_MAXLEVEL 之间)
如果 level 大于跳表的当前高度,那么更新 update[]、rank[],重置跳表的高度
创建新节点 x
从最底层向上,调整插入点和新节点的 forward 指针,以及 span 值,直到 level
递增未达到的层级的 span 值
调整 x 以及 x 的 forward 的 backward 指针;如果 x 是最后一个节点,将跳表的 tail 指向它
增加跳表长度
关于查找、删除等操作,以后有机会再补充。通过这两个函数已经可以对 Redis Skiplist 有完整的了解了。
Ziplist 的源码在 src/ziplist.c 源文件中。
1,Ziplist 的总体布局
<zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>
注意:除非另外规定,否则所有域都以小端(little endian)格式存储。
<uint32_t zlbytes> 是用于保存 Ziplist 占用的字节数的无符号整数(包含 zlbytes 域本身的四个字节)。存储该值的目的是为了能够在不遍历 Ziplist 的情况下,改变整个结构的大小。
<uint32_t zltail> 是 Ziplist 中最后一个 entry 的偏移。它允许在不遍历整个列表的情况下,在远端执行弹出操作。
<uint32_t zllen> 是entry 的数量。当 entry 的数量超过 2^16-2 时,该值被设置为 2^16-1,此时如果想要知道列表中有多少条目,需要遍历整个列表。
<uint32_t zlend> 是代表 Ziplist 的结尾的特殊 entry。它被编码为等于 255 的单字节。任何常规 entry 的首字节都不会被设置为 255。
2,Ziplist Entry
Ziplist 中的 entry 以包含 2 部分信息的元数据开头:
因此,完整的 entry 的结构如下:
<prevlen> <encoding> <entry-data>
有时编码也代表 entry 本身。此时,可以省略 <entry-data> 部分:
<prevlen> <encoding>
前一个 entry 的长度 <prevlen> 的编码方式如下:
因此几乎所有 entry 的编码都如下:
<prevlen from 0 to 253> <encoding> <entry-data>
如果前一个 entry 的长度大于 253 字节,那么将使用如下的编码:
0xFE <4 bytes unsigned little endian prevlen> <encoding> <entry-data>
entry 的编码依赖于 entry 的内容:
第一个字节足够用于确定 entry 的种类,不同类型和编码的概述如下:
|00pppppp| - 1 字节
长度小于或等于 63 字节(6 位)的字符串值。
"pppppp" 表示 6 位无符号长度
|01pppppp|qqqqqqqq| - 2 字节
长度小于或等于 16383 字节(14 位)的字符串值。
注意:以大端格式存储这个 14 位数值
|10000000|qqqqqqqq|rrrrrrrr|ssssssss|tttttttt| - 5 字节
长度大于或等于 16384 字节的字符串值。
只有第一个字节后面的 4 个字节表示该长度(最大可达 2^32-1)。第一个字节的低 6 位未被使用,且都被设置为 0。
注意:以大端格式存储这个 32位数值
|11000000| - 3 字节
编码为 int16_t(2 字节) 的整数
|11010000| - 5 字节
编码为 int32_t(4 字节) 的整数
|11100000| - 9 字节
编码为 int64_t(8 字节) 的整数
|11110000| - 4 字节
编码为 24 位(3 字节)有符号的整数
|11111110| - 2 字节
编码为 8 位(1 字节)有符号的整数
|1111xxxx| - xxxx 在 0001 和 1101 之间
|11111111| - 代表 Ziplist 结束的特殊 entry
在 Ziplist 头中,所有整数都以小端格式存储。