Redis基础机制分析

因为原《Redis底层对象实现原理分析》太大了,所以被拆解出来介绍Redis基础设施的相关实现,包括:

  1. redisDb,以及在这上面的增删改查
  2. Redis的expire和evict机制
  3. Redis的事件机制
  4. Redis的主从复制(一部分)
    注意,很多实现在引入主从复制之后都变得非常复杂,有很多边边角角要考虑,这也导致Redis的代码相比3.0版本要难看很多。本文对主从复制的涉及,局限于帮助理解实现。
    本文介绍的部分比如propagate机制。

本文中不介绍的是,它们在系列的其他文章中讲解:

  1. Redis的对象实现
  2. Redis Sentinel
  3. Redis Cluster
  4. Redis AOF/RDB

Redis源码结构

在3.0版本中,redis的主要结构都定义在redis.h中,在新版本中,它们被放到了server.h中。

我们主要介绍

  1. 一些常用的类
    1. redisServer
    2. redisDb
    3. redisObject
      包含添加对象的逻辑
  2. 删除逻辑
    包含对同步删除和异步删除的讨论。
  3. 查找逻辑
  4. expire
  5. evict
  6. propagate
  7. 事件机制
  8. 内存管理

Redis Server

这个章节中介绍Redis数据库顶层键的架构和增删改查的实现,主要包括:

redisDb类

1
2
3
4
5
6
7
8
9
10
11
typedef struct redisDb {
dict *dict; /* 数据库键空间 */
dict *expires; /* 键的过期时间,字典的键为键,字典的值为过期时间 */
dict *blocking_keys; /* 用来服务诸如BLPOP的命令,记录目前被阻塞的键 */
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* 数据库键的平均TTL,统计信息 */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

可以发现,redisDb本身就依赖dictlist等Redis底层结构的实现,说明Redis的复用性还是很好的。

client类

client类对应了3.0版本中的redisClient类。因为Redis对IO是多路复用的,所以需要为每个客户端连接维护一个状态,所以client实际上类似于session一样,是在服务器端维护的一个状态。而真正的Redis客户端定义在redis-cli.c这个文件里面。

1
2
3
4
5
6
7
// server.h

typedef struct client {
uint64_t id; /* Client incremental unique ID. */
connection *conn;
int resp; /* RESP protocol version. Can be 2 or 3. */
redisDb *db; /* Pointer to currently SELECTed DB. */

redisServer类

server是一个全局对象,它的类型是redisServer

1
2
3
4
5
// server.h
extern struct redisServer server;

// server.c
struct redisServer server; /* Server global state */

Redis基础类

增删改查涉及的系统梳理

  1. DB部分
    更新dirty
  2. Cluster部分
  3. 事件部分
    signalModifiedKey:包含通知WATCH列表、通知客户端更新缓存
    notifyKeyspaceEvent:通过PUBLUSH发送消息
  4. 主从复制/持久化部分
    propagate对应命令(在call中处理)
  5. Module部分

Redis Object

诸如dictsds之类的对象,在db层面实际上是用redisObject封装的,需要的时候通过robj->ptr获取实际需要的指针。

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct redisObject {
unsigned type:4; // 由OBJ_的值指定
unsigned encoding:4; // 由OBJ_ENCODING_的值指定
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;

// server.h
#define LRU_BITS 24

Redis对象的类型是用OBJ_宏来列出的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// server.h
/* The actual Redis Object */
#define OBJ_STRING 0 /* String object. */
#define OBJ_LIST 1 /* List object. */
#define OBJ_SET 2 /* Set object. */
#define OBJ_ZSET 3 /* Sorted set object. */
#define OBJ_HASH 4 /* Hash object. */

/* The "module" object type is a special one that signals that the object
* is one directly managed by a Redis module. In this case the value points
* to a moduleValue struct, which contains the object value (which is only
* handled by the module itself) and the RedisModuleType struct which lists
* function pointers in order to serialize, deserialize, AOF-rewrite and
* free the object.
*
* Inside the RDB file, module types are encoded as OBJ_MODULE followed
* by a 64 bit module type ID, which has a 54 bits module-specific signature
* in order to dispatch the loading to the right module, plus a 10 bits
* encoding version. */
#define OBJ_MODULE 5 /* Module object. */
#define OBJ_STREAM 6 /* Stream object. */

Redis对象实际使用的内部结构是用OBJ_ENCODING_宏来表示的,如前文所列举的,同一个对象可能有不同的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// server.h
// OBJ_ENCODING_RAW是普通的SDS
#define OBJ_ENCODING_RAW 0 /* Raw representation */
#define OBJ_ENCODING_INT 1 /* Encoded as integer */
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
// embstr是对短字符串的一种优化编码
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */

引用计数

有两种特殊的引用计数值:

  1. OBJ_SHARED_REFCOUNT
    makeObjectShared函数生成,在这种情况下这个对象是immutable的,因此可以不加锁地进行访问。这种对象也不受incrRefCount/decrRefCount控制。
    注意,这种对象设为immutable是合理的,它的一个通常作用是共享小整数对象,例如Redis会共享0到9999。
  2. OBJ_STATIC_REFCOUNT
    一般由initStaticStringObject宏生成。看上去这个一般用在在栈上面分配的临时对象的refcount,我对此也不是很确定。
    1
    2
    3
    4
    #define OBJ_SHARED_REFCOUNT INT_MAX     /* Global object never destroyed. */
    #define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object allocated in the stack. */
    // 第一个有特殊含义的refcount值
    #define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT

一般来说,使用引用计数可能存在循环引用的问题。Redis巧妙地避免了这个问题,首先在Redis的所有redisObject里面,只有String会被嵌入到其他类型中,也就是说ZSET等其他的数据类型不会互相引用(在Geo等新数据结构里面也是这样的么?)。而Redis对String类型引入对象共享机制,保证了不会产生互相引用。

1
2
3
4
5
6
7
8
9
10
11
void incrRefCount(robj *o) {
if (o->refcount < OBJ_FIRST_SPECIAL_REFCOUNT) {
o->refcount++;
} else {
if (o->refcount == OBJ_SHARED_REFCOUNT) {
/* Nothing to do: this refcount is immutable. */
} else if (o->refcount == OBJ_STATIC_REFCOUNT) {
serverPanic("You tried to retain an object allocated in the stack");
}
}
}

decrRefCount还负责销毁对象,步骤是freeXXXObject,然后在zfree。前者用来释放o->ptr指向的对象的内存,后者用来释放o的内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void decrRefCount(robj *o) {
if (o->refcount == 1) {
switch(o->type) {
case OBJ_STRING: freeStringObject(o); break;
case OBJ_LIST: freeListObject(o); break;
case OBJ_SET: freeSetObject(o); break;
case OBJ_ZSET: freeZsetObject(o); break;
case OBJ_HASH: freeHashObject(o); break;
case OBJ_MODULE: freeModuleObject(o); break;
case OBJ_STREAM: freeStreamObject(o); break;
default: serverPanic("Unknown object type"); break;
}
zfree(o);
} else {
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
}
}

我们可以这样理解incr/decrRefCount,如果我们创建或者复制一个对象,就要incr,如果我们要删除一个对象就要decr。

创建对象

通过createObject创建对象,refcount设为1。encoding设为OBJ_ENCODING_RAW,也就是普通SDS字符串。传入的typeOBJ_宏的某个特定值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
robj *createObject(int type, void *ptr) {
robj *o = zmalloc(sizeof(*o));
o->type = type;
o->encoding = OBJ_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;

/* Set the LRU to the current lruclock (minutes resolution), or
* alternatively the LFU counter. */
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
} else {
o->lru = LRU_CLOCK();
}
return o;
}

一般在创建完对象后,还需要通过dbAdd将它插入到数据库里面。

1
2
3
4
5
6
7
8
9
10
11
12
/* Add the key to the DB. It's up to the caller to increment the reference
* counter of the value if needed.
*
* The program is aborted if the key already exists. */
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val);

serverAssertWithInfo(NULL,key,retval == DICT_OK);
signalKeyAsReady(db, key, val->type);
if (server.cluster_enabled) slotToKeyAdd(key->ptr);
}

对象装箱拆箱

这个都是返回一个“新”对象,这里“新”的意思是在使用完这个对象都应该decrRefCount
如果是原生encoding储存的,就直接返回。

1
2
3
4
5
6
7
8
9
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)

robj *getDecodedObject(robj *o) {
robj *dec;

if (sdsEncodedObject(o)) {
incrRefCount(o);
return o;
}

如果使用SDS保存的整数,实际上里面是个long long,那么就需要先ll2string把这个转换成字符串。

1
2
3
4
5
6
7
8
9
10
    if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_INT) {
char buf[32];

ll2string(buf,32,(long)o->ptr);
dec = createStringObject(buf,strlen(buf));
return dec;
} else {
serverPanic("Unknown encoding type");
}
}

Redis Command

redisCommand对象

redisCommand对象有新旧很多种版本,新旧版本中存在一些区别,例如sflag的内容,我们以新版本为主。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// server.h
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
char *sflags; /* Flags as string representation, one char per flag. */
uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* 通常用在Redis Cluster转发过程中 */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
long long microseconds, calls;

command id,是从0开始递增的,作用是检查ACL。一个connection在执行命令前,服务器先要检查第id位有没有设置,如果设置了,说明这个connection有对应的权限。

1
2
    int id;
};

解释如下:

  1. sflags是字符串格式的,表示这个命令的一些特性
    例如
    write对应CMD_WRITE
    read-only对应CMD_READONLY
  2. flags是通过populateCommandTableParseFlagssflags生成的二进制表示。详见server.h中的CMD_定义,我们在下面会讲解。
  3. 下面是key三元组:firstkey表示第一个key参数的位置,lastkey表示最后一个key参数的位置,keystep表示key参数步长。通过上面三个参数,可以拿到所有的key。通常发生在getKeysFromCommandgetKeysUsingCommandTable函数调用链中。引入这个三元组的目的是有一些指令(如msetmsetnxkeystep取2)是支持在一个命令中对多个key/value对进行赋值的。我们需要注意的是诸如ZADD的指令虽然可以同时添加很多个(score, member)对,但是实际上他们是对一个key添加的,所以它们的三元组都是1。
  4. getkeys_proc表示从命令中判断命令的key,实际上就是当firstkeylastkeykeystep不能描述的时候,就会用到这个,返回一个int*表示所有key。例如后面举的eval的例子。
  5. microseconds表示该命令的调用总时间
  6. calls表示该命令的调用总次数
  7. id是在运行时给每个指令分配的id

flags枚举

从sflags可以解析得到flags,枚举如下:

  1. CMD_WRITE (1ULL<<0)
  2. CMD_READONLY (1ULL<<1)
    对应read-only,一般包括所有的非特殊的命令,例如返回keys的值,或者返回一些其他信息,例如TIME等。诸如admin、transaction相关的信息,也不会被标记为readonly,因为他们会影响服务器状态。
    只读命令和非只读命令在主从复制时,是不一样的
  3. CMD_DENYOOM (1ULL<<2)
    对应use-memory,表示这个命令可能导致内存增加。需要在发生OOM的时候拒绝掉。
  4. CMD_MODULE (1ULL<<3)
  5. CMD_ADMIN (1ULL<<4)
    对应admin,诸如SAVE或者SHUTDOWN的命令。
  6. CMD_PUBSUB (1ULL<<5)
    SUBSCRIBE、UNSUBSCRIBE
  7. CMD_NOSCRIPT (1ULL<<6)
    这样的命令不能在lua脚本中使用,例如AUTH、SAVE等。
  8. CMD_RANDOM (1ULL<<7)
    对应random,有的命令即使在相同的情况下的运行结果也是不确定的,诸如SPOP、RANDOMKEY。
  9. CMD_SORT_FOR_SCRIPT (1ULL<<8)
    对应to-sort,需要对输出序列进行排序。
  10. CMD_LOADING (1ULL<<9)
    在服务器启动载入过程中可以执行的命令。如果没标记该项目的命令,启动过程中不能执行。
  11. CMD_STALE (1ULL<<10)
  12. CMD_SKIP_MONITOR (1ULL<<11)
    no-monitor,不自动将这个命令propagate到MONITOR。
  13. CMD_SKIP_SLOWLOG (1ULL<<12)
    no-slowlog,不自动将这个命令propagate到slowlog。比如EXEC、AUTH之类的命。
  14. CMD_ASKING (1ULL<<13)
  15. CMD_FAST (1ULL<<14)
    这个命令是O(1)或者O(log(N))复杂度的,他们不会延误执行。注意所有可能导致DEL操作的并不是FAST命令,例如SET。
  16. CMD_NO_AUTH (1ULL<<15)

Demo

我们结合一个具体的定义来了解这个结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// server.c
struct redisCommand redisCommandTable[] = {
{"module",moduleCommand,-2, // -2表示大于等于2个参数
"admin no-script",
0,NULL,0,0,0,0,0,0},

{"get",getCommand,2, // 这个叫get的指令对应到void getCommand(client *c),有2个参数
"read-only fast @string", // sflags 中是只读的,fast表示命令执行时间超过阈值时,会记录延迟事件。
0, // flags
NULL, // getkeys_proc
1, // firstkey
1, // lastkey
1, // keystep
0, // microseconds
0, // calls
0 // id
},

/* Note that we can't flag set as fast, since it may perform an
* implicit DEL of a large key. */
{"set",setCommand,-3,
"write use-memory @string",
0,NULL,1,1,1,0,0,0},

{"setnx",setnxCommand,3,
"write use-memory fast @string",
0,NULL,1,1,1,0,0,0},

{"eval",evalCommand,-3,
"no-script @scripting",
0,
evalGetKeys, // eval无法通过key三元组描述,所以这里指定一个特殊的getkeys_proc
0,0,0,
0,0,0},

{"zadd",zaddCommand,-4,
"write use-memory fast @sortedset",
0,NULL,1,1,1,0,0,0},
...

命令的处理顺序

  1. call
    1. processCommand
      1. processCommandAndResetClient
        1. processInputBuffer
          1. readQueryFromClient
          2. handleClientsWithPendingReadsUsingThreads
        2. handleClientsWithPendingReadsUsingThreads
          1. stopThreadedIO
          2. beforeSleep

processCommand

这个函数很复杂:

  1. 通过call执行命令
  2. 准备从客户端进行一次读取

返回C_OK表示这个客户端还存在,否则表示这个客户端没了。

首先需要特别处理quit命令。

1
2
3
4
5
6
7
8
9
10
11
12
int processCommand(client *c) {
moduleCallCommandFilters(c);

/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc. */
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}

下面通过lookupCommand查找对应的命令结构,并处理找不到或者命令格式错误的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
sds args = sdsempty();
int i;
for (i=1; i < c->argc && sdslen(args) < 128; i++)
args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s",
(char*)c->argv[0]->ptr, args);
sdsfree(args);
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
rejectCommandFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}

判断命令的性质,是只读的,还是可写的等性质。

1
2
3
4
5
6
7
8
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
int is_denystale_command = !(c->cmd->flags & CMD_STALE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));

进行auth和ACL检查。
auth也就是登录状态检查。
ACL,即Access Control List,有一系列条件规则组成,用来具体控制某些用户是否可以运行某些命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
...
/* Check if the user is authenticated. This check is skipped in case
* the default user is flagged as "nopass" and is active. */
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
(DefaultUser->flags & USER_FLAG_DISABLED)) &&
!c->authenticated;
if (auth_required) {
/* AUTH and HELLO and no auth modules are valid even in
* non-authenticated state. */
if (!(c->cmd->flags & CMD_NO_AUTH)) {
rejectCommand(c,shared.noautherr);
return C_OK;
}
}

/* Check if the user can run this command according to the current
* ACLs. */
int acl_keypos;
int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
if (acl_retval != ACL_OK) {
addACLLogEntry(c,acl_retval,acl_keypos,NULL);
if (acl_retval == ACL_DENIED_CMD)
rejectCommandFormat(c,
"-NOPERM this user has no permissions to run "
"the '%s' command or its subcommand", c->cmd->name);
else
rejectCommandFormat(c,
"-NOPERM this user has no permissions to access "
"one of the keys used as arguments");
return C_OK;
}
...

如果启用了Redis Cluster,就要进行转发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
...
/* If cluster is enabled perform the cluster redirection here.
* However we don't perform the redirection if:
* 1) The sender of this command is our master.
* 2) The command has no key arguments. */
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
...

处理oom相关行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
...
/* Handle the maxmemory directive.
*
* Note that we do not want to reclaim memory if we are here re-entering
* the event loop since there is a busy Lua script running in timeout
* condition, to avoid mixing the propagation of scripts with the
* propagation of DELs due to eviction. */
if (server.maxmemory && !server.lua_timedout) {
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) return C_ERR;

int reject_cmd_on_oom = is_denyoom_command;
/* If client is in MULTI/EXEC context, queuing may consume an unlimited
* amount of memory, so we want to stop that.
* However, we never want to reject DISCARD, or even EXEC (unless it
* contains denied commands, in which case is_denyoom_command is already
* set. */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand) {
reject_cmd_on_oom = 1;
}

if (out_of_memory && reject_cmd_on_oom) {
rejectCommand(c, shared.oomerr);
return C_OK;
}

/* Save out_of_memory result at script start, otherwise if we check OOM
* untill first write within script, memory used by lua stack and
* arguments might interfere. */
if (c->cmd->proc == evalCommand || c->cmd->proc == evalShaCommand) {
server.lua_oom = out_of_memory;
}
}

/* Make sure to use a reasonable amount of memory for client side
* caching metadata. */
if (server.tracking_clients) trackingLimitUsedSlots();

/* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance. */
int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
server.masterhost == NULL &&
(is_write_command ||c->cmd->proc == pingCommand))
{
if (deny_write_type == DISK_ERROR_TYPE_RDB)
rejectCommand(c, shared.bgsaveerr);
else
rejectCommandFormat(c,
"-MISCONF Errors writing to the AOF file: %s",
strerror(server.aof_last_write_errno));
return C_OK;
}

/* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option. */
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
is_write_command &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
rejectCommand(c, shared.noreplicaserr);
return C_OK;
}

/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
is_write_command)
{
rejectCommand(c, shared.roslaveerr);
return C_OK;
}

/* Only allow a subset of commands in the context of Pub/Sub if the
* connection is in RESP2 mode. With RESP3 there are no limits. */
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
rejectCommandFormat(c,
"Can't execute '%s': only (P)SUBSCRIBE / "
"(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
c->cmd->name);
return C_OK;
}

/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
* when slave-serve-stale-data is no and we are a slave with a broken
* link with master. */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
is_denystale_command)
{
rejectCommand(c, shared.masterdownerr);
return C_OK;
}

/* Loading DB? Return an error if the command has not the
* CMD_LOADING flag. */
if (server.loading && is_denyloading_command) {
rejectCommand(c, shared.loadingerr);
return C_OK;
}

/* Lua script too slow? Only allow a limited number of commands.
* Note that we need to allow the transactions commands, otherwise clients
* sending a transaction with pipelining without error checking, may have
* the MULTI plus a few initial commands refused, then the timeout
* condition resolves, and the bottom-half of the transaction gets
* executed, see Github PR #7022. */
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != helloCommand &&
c->cmd->proc != replconfCommand &&
c->cmd->proc != multiCommand &&
c->cmd->proc != discardCommand &&
c->cmd->proc != watchCommand &&
c->cmd->proc != unwatchCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
rejectCommand(c, shared.slowscripterr);
return C_OK;
}
...

下面才是真正的执行,对于非multi,会调用call

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
...
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
return C_OK;
}

call

call就是调用指令的函数,有一系列的flag:

  1. CMD_CALL_NONE
  2. CMD_CALL_SLOWLOG
    检查指令执行的速度,是否记录到slow log中呢?
  3. CMD_CALL_STATS
    Populate command stats.
  4. CMD_CALL_PROPAGATE_AOF
    如果对数据有改动(可以通过server.dirty字段看出),或者client有一个强迫propagate的CLIENT_FORCE_AOF,就加到AOF上。
    相应的,如果client设置了CLIENT_PREVENT_AOF_PROP,那么即使数据集变动了,也不会写AOF。
    注意,无论client设置了什么,如果没有CMD_CALL_PROPAGATE_AOF,那么永远不会写AOF。
  5. CMD_CALL_PROPAGATE_REPL
    同理,但是对Slave。同样有CLIENT_FORCE_REPL/CLIENT_PREVENT_REPL_PROP
  6. CMD_CALL_PROPAGATE
    相当于PROPAGATE_AOF|PROPAGATE_REPL
  7. CMD_CALL_FULL
    相当于SLOWLOG|STATS|PROPAGATE

call主要就是用c->cmd->proc(c)执行命令,后者实际上就是xxxCommand()这样的命令。

1
2
3
4
5
6
void call(client *c, int flags) {
long long dirty;
ustime_t start, duration;
int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd;
...

fixed_time_expire在expire机制中见到过的,如果有命令在执行过程中,这个值就不是0。
还会把除了ADMIN之外的命令发送给MONITOR,ADMIN命令展示出来太危险了。

1
2
3
4
5
6
7
8
9
10
11
12
...
server.fixed_time_expire++;

/* Send the command to clients in MONITOR mode if applicable.
* Administrative commands are considered too dangerous to be shown. */
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
...

下面是一些初始化和执行工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
...
/* Initialization: clear the flags that must be set by the command on
* demand, and initialize the array for additional commands propagation. */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
redisOpArray prev_also_propagate = server.also_propagate;
redisOpArrayInit(&server.also_propagate);

/* Call the command. */
dirty = server.dirty;
updateCachedTime(0);
start = server.ustime;
c->cmd->proc(c);
...

在执行后,统计数据库被修改的次数dirty。在《Redis底层对象实现原理分析》中看到,比如我新加一个元素,或者修改一个元素,都会导致dirty增加。也就对应了数据的改变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
...
duration = ustime()-start;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;

/* When EVAL is called loading the AOF we don't want commands called
* from Lua to go into the slowlog or to populate statistics. */
if (server.loading && c->flags & CLIENT_LUA)
flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);

/* If the caller is Lua, we want to force the EVAL caller to propagate
* the script if the command flag or client flag are forcing the
* propagation. */
if (c->flags & CLIENT_LUA && server.lua_caller) {
if (c->flags & CLIENT_FORCE_REPL)
server.lua_caller->flags |= CLIENT_FORCE_REPL;
if (c->flags & CLIENT_FORCE_AOF)
server.lua_caller->flags |= CLIENT_FORCE_AOF;
}
...

记录延迟信息,并记录slowlog。其中latencyAddSampleIfNeeded在适当的时候调用latencyAddSample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
/* Log the command into the Slow log if needed, and populate the
* per-command statistics that we show in INFO commandstats. */
if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}

if (flags & CMD_CALL_STATS) {
/* use the real command that was executed (cmd and lastamc) may be
* different, in case of MULTI-EXEC or re-written commands such as
* EXPIRE, GEOADD, etc. */
real_cmd->microseconds += duration;
real_cmd->calls++;
}
...

下面处理propagate的情况,这个对应了CALL_开头的一些规则,就不详解了。最终会计算得到一个propagate_flags传给propagate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
...
/* Propagate the command into the AOF and replication link */
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;

/* Check if the command operated changes in the data set. If so
* set for replication / AOF propagation. */
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);

/* If the client forced AOF / replication of the command, set
* the flags regardless of the command effects on the data set. */
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;

/* However prevent AOF / replication propagation if the command
* implementations called preventCommandPropagation() or similar,
* or if we don't have the call() flags to do so. */
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;

/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically. */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
...

在结束之后,我们需要还原一下有关propagate的相关flag,因为call可能被递归调用。
【Q】我觉得这里一个典型的例子就是这里的multi、exec。

1
2
3
4
5
/* Restore the old replication flags, since call() can be executed
* recursively. */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
c->flags |= client_old_flags &
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);

alsoPropagate函数可以往server.also_propagate里面加一些其他的op。下面就处理alsoPropagate的逻辑,也就是当propagate完当前的命令之后,还可以再去propagate一些命令。并且这些命令不被CLIENT_PREVENT_PROP影响。

1
2
3
4
5
6
if (server.also_propagate.numops) {
int j;
redisOp *rop;

if (flags & CMD_CALL_PROPAGATE) {
int multi_emitted = 0;

如果说已经被包在了MULTI里面,就不在继续包在also_propagate里面propagate了。
execCommandPropagateMulti实际上就是下面的propagate调用。这里的shared.multi或者shared.exec实际上是缓存的字符串对象EXECMULTI,减少频繁的内存分配的作用。

1
2
3
4
5
6
7
8
9
void execCommandPropagateMulti(client *c) {
propagate(server.multiCommand,c->db->id,&shared.multi,1,
PROPAGATE_AOF|PROPAGATE_REPL);
}

void execCommandPropagateExec(client *c) {
propagate(server.execCommand,c->db->id,&shared.exec,1,
PROPAGATE_AOF|PROPAGATE_REPL);
}

接下来就是做propagate。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
...
/* Wrap the commands in server.also_propagate array,
* but don't wrap it if we are already in MULTI context,
* in case the nested MULTI/EXEC.
*
* And if the array contains only one command, no need to
* wrap it, since the single command is atomic. */
if (server.also_propagate.numops > 1 &&
!(c->cmd->flags & CMD_MODULE) &&
!(c->flags & CLIENT_MULTI) &&
!(flags & CMD_CALL_NOWRAP))
{
execCommandPropagateMulti(c);
multi_emitted = 1;
}

for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
int target = rop->target;
/* Whatever the command wish is, we honor the call() flags. */
if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
if (target)
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
}

if (multi_emitted) {
execCommandPropagateExec(c);
}
}
redisOpArrayFree(&server.also_propagate);
}
server.also_propagate = prev_also_propagate;

这个应该是和客户端缓存有关的,如果client提供了keys tracking功能,要通知。这个函数里面维护了一个tracking invalidation表,这样客户端会收到一个invalidation信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    /* If the client has keys tracking enabled for client side caching,
* make sure to remember the keys it fetched via this command. */
if (c->cmd->flags & CMD_READONLY) {
client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ?
server.lua_caller : c;
if (caller->flags & CLIENT_TRACKING &&
!(caller->flags & CLIENT_TRACKING_BCAST))
{
trackingRememberKeys(caller);
}
}

server.fixed_time_expire--;
server.stat_numcommands++;
}

删除逻辑实现

为了理解下面论述中涉及到的expire相关实现,我们需要先介绍一些UNLINKDEL的实现。
delGenericCommand的实现是比较Legacy的,从c->argv中读取所有需要被删除的key,然后调用dbAsyncDelete或者dbSyncDelete。

1
2
3
4
5
6
7
8
9
10
/* This command implements DEL and LAZYDEL. */
void delGenericCommand(client *c, int lazy) {
int numdel = 0, j;

for (j = 1; j < c->argc; j++) {
expireIfNeeded(c->db,c->argv[j]);
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
dbSyncDelete(c->db,c->argv[j]);
if (deleted) {
...

容易发现,删除有两种模式:异步(lazy)删除和同步删除。异步删除的情况包括:

  1. delete逻辑
    1. delGenericCommand中传入lazy
      如果是unlink命令,那么一定是异步删除。
      如果是del命令,则取决于server.lazyfree_lazy_user_del
    2. dbDelete中设置了server.lazyfree_lazy_server_del
  2. expire逻辑
    1. expireIfNeeded中如果设置server.lazyfree_lazy_expire,则使用异步删除
      对应了Redis的lazy过期策略。
    2. activeExpireCycleTryExpire中如果设置server.lazyfree_lazy_expire,则使用异步删除
      对应着Redis的定期循环,主动过期策略。
    3. expireGenericCommand中如果设置server.lazyfree_lazy_expire,则使用异步删除
      直接运行expire命令,主动检查一下有没有过期。
  3. evict逻辑
    1. freeMemoryIfNeeded中如果设置server.lazyfree_lazy_eviction,则使用异步删除
  4. 其他
    1. RM_UnlinkKey

这些lazyfree_lazy_开头的配置,默认都是0。也就是说这些情况下默认都是同步删除。

同步删除的情况类似,除了“其他”中发生了变化:

  1. 其他
    1. rdbLoadRio

下面的代码会进行事件通知,我们将专门进行介绍

1
2
3
4
5
6
7
8
9
10
11
// 【续】delGenericCommand函数
...
signalModifiedKey(c,c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"del",c->argv[j],c->db->id);
server.dirty++;
numdel++;
}
}
addReplyLongLong(c,numdel);
}

del和unlink的唯一区别是,unlink一定是lazy删除的,但是del取决于配置lazyfree_lazy_user_del

1
2
3
4
5
6
7
void delCommand(client *c) {
delGenericCommand(c,server.lazyfree_lazy_user_del);
}

void unlinkCommand(client *c) {
delGenericCommand(c,1);
}

同步删除

看简单的同步实现。
首先,如果db->expires非空,从db->expires里面删除key,实际上是删除的过期时间。
这里有个注释,说从db->expires中删除一个entry不会释放key->ptr这个sds,因为它和db->dict是共享的。这里应该说的是在setExpire里面往db->expires添加key的时候,加的实际上是指向db->dict中的指针。
但果真是这样的么?继续看dictDelete最终调用dictGenericDelete。查看dictDelete实际上是dictGenericDelete的实现(在“dict的其他相关方法”这个章节中介绍),发现新版本的代码肯定会调用dictFreeKey(Redis3.0里面有个dictFreeEntryKey,不要混淆了)。

1
2
3
4
5
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

检查dictFreeKey的实现发现,这个函数调用keyDestructor,它似乎一定会导致对应sds的析构。看上去和上面的注释是矛盾的。

1
2
3
4
// dict.h
#define dictFreeKey(d, entry) \
if ((d)->type->keyDestructor) \
(d)->type->keyDestructor((d)->privdata, (entry)->key)

究竟是怎么回事呢?我们看下keyptrDictType和dbDictType这两个dict类型就有了答案。原来对于db->expires,它实际的类型就没有设置keyDestructor,所以不会析构key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// server.c
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);

/* Db->expires */
dictType keyptrDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL /* val destructor */
};

/* Db->dict, keys are sds strings, vals are Redis objects. */
dictType dbDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictObjectDestructor /* val destructor */
};

可以说一下#define DICT_NOTUSED(V) ((void) V)是经典的关闭编译器unused variable warning的办法。

1
2
3
4
5
6
7
// server.c
void dictSdsDestructor(void *privdata, void *val)
{
DICT_NOTUSED(privdata);

sdsfree(val);
}

下面接着看dbSyncDelete的逻辑,刚才是删除的db->expires,还需要删除db->dict
此外server.cluster_enabled的情况进行了额外的处理。

1
2
3
4
5
6
7
8
9
    // 再从db->dict里面删除key
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
// Redis Cluster相关函数
if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
} else {
return 0;
}
}

异步删除

异步删除的核心是调用dictUnlink而不是dictDelete
前面的是大差不差的,删除db->expires里面的字段,因为他们的dictType不一样,他们的析构行为(keyDestructor)也不一样。这就导致expire可以直接dictDelete。

1
2
3
4
5
6
7
8
9
10
11
12
// lazyfree.c

/* Delete a key, value, and associated expiration entry if any, from the DB.
* If there are enough allocations to free the value object may be put into
* a lazy free list instead of being freed synchronously. The lazy free list
* will be reclaimed in a different bio.c thread. */
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
...

下面调用dictUnlink而不是dictDelete了。这里注意区别一下dictUnlink和前面提到的UNLINK命令。dictUnlink的作用是将对应的key从dict中删除,但不会释放对应的结构,而是直接返回。

1
2
3
4
5
// 续dbAsyncDelete
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
dictEntry *de = dictUnlink(db->dict,key->ptr);

拿到这个de,我们手动来析构。会首先使用lazyfreeGetFreeEffort来计算析构的代价,如果代价过高,就将这个对象放到lazy free list里面让它后台去析构。不然的话就在后面的代码中同步析构,这是因为如果对象很小,那么再搞这一套异步反而更耗时。

1
2
3
4
5
6
// 续dbAsyncDelete
...
if (de) {
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);
...

先来看lazy的实现,如果算出来值得,那么就lazy。但这里还有个特殊情况我们不能异步删除,根据注释,如果这个对象是被共享的(val->refcount就是一个大于1的值),我们不能就直接把它现在就回收掉。这个倒不经常发生,但确实Redis的一些实现代码会用incrRefCount来保护对象,然后调用dbDelete。在这种情况下我们会fall through到下面dictFreeUnlinkedEntry的调用,它的最终效果相当于直接调用decrRefCount
经过了上述的判断,我们就可以使用bioCreateBackgroundJob来异步删除了。

1
2
3
4
5
6
7
8
9
// 续dbAsyncDelete
...
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
dictSetVal(db->dict,de,NULL);
}
}
...

下面就是同步删除的实现。dictFreeUnlinkedEntry这一块就是给之前nofree没做的事情擦一下屁股,包含调用dictFreeKey啥的来释放key和value所占用的内存。
slotToKeyDel这个是Redis Cluster的实现逻辑,用来算出来这个key在哪个slot上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 续dbAsyncDelete
...
/* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
} else {
return 0;
}
}

// dict.c
void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
if (he == NULL) return;
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
}

这里面涉及的几个函数来讲解一下

lazyfreeGetFreeEffort

这个函数计算并返回释放一个对象的代价。返回值不一定是这个对象对应的内存分配次数,但是和这个量成比例的。具体来说:

  1. 对于字符串,函数永远返回1。
  2. 对于用诸如哈希表等数据结构表示的聚合对象,返回组成该对象元素的数量。
  3. 对于只需要一次内存分配就产生的对象,认为是独立的一个对象,即使实际上是由多个造成的。
  4. 对于列表对象,返回quicklist里面的元素数量。
1
2
3
4
5
6
7
size_t lazyfreeGetFreeEffort(robj *obj) {
if (obj->type == OBJ_LIST) {
quicklist *ql = obj->ptr;
return ql->len;
} else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
dict *ht = obj->ptr;
return dictSize(ht);

对于ZSET,如果是跳表实现,就返回跳表的长度。如果是ziplist实现就返回1?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
zset *zs = obj->ptr;
return zs->zsl->length;
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
dict *ht = obj->ptr;
return dictSize(ht);
} else if (obj->type == OBJ_STREAM) {
size_t effort = 0;
stream *s = obj->ptr;

/* Make a best effort estimate to maintain constant runtime. Every macro
* node in the Stream is one allocation. */
effort += s->rax->numnodes;

/* Every consumer group is an allocation and so are the entries in its
* PEL. We use size of the first group's PEL as an estimate for all
* others. */
if (s->cgroups) {
raxIterator ri;
streamCG *cg;
raxStart(&ri,s->cgroups);
raxSeek(&ri,"^",NULL,0);
/* There must be at least one group so the following should always
* work. */
serverAssert(raxNext(&ri));
cg = ri.data;
effort += raxSize(s->cgroups)*(1+raxSize(cg->pel));
raxStop(&ri);
}
return effort;
} else {
return 1; /* Everything else is a single allocation. */
}
}

atomicIncr

是一个原子操作,更新lazyfree里面的一个static变量lazyfree_objects。根据不同的操作系统的支持,有三种实现:
如果支持atomic语义

1
#define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED)

如果有sync语义,一般是gcc的一个内置宏

1
#define atomicIncr(var,count) __sync_add_and_fetch(&var,(count))

如果什么都没有,用mutex,mutex的名字是变量名加上_mutex,这些mutex随着变量名一起被定义,只是可能不会被用到,如lazyfree_objects_mutex。

1
2
3
4
5
#define atomicIncr(var,count) do { \
pthread_mutex_lock(&var ## _mutex); \
var += (count); \
pthread_mutex_unlock(&var ## _mutex); \
} while(0)

bioCreateBackgroundJob

所有的bio开头的函数表示Redis的Background IO服务。根据注释,将来也许会迁移到libeio。

1
2
3
4
5
6
7
8
9
10
11
12
13
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));

job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
pthread_cond_signal(&bio_newjob_cond[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}

dictSetVal

见dict相关

slotToKeyDel

见 Redis Cluster 相关

查找

lookUpKey相关方法根据查找目的是读或者写区分了lookupKeyReadlookupKeyWrite两个方向的函数,此外还根据是否WithFlags或者OrReply派生出其他几种函数。

对于lookupKeyWrite来讲,有一个副作用,就是会先检查一下要不要expire,如果需要就直接expire掉。
对于lookupKeyRead来讲,也要处理expire的问题,但是因为涉及到主从复制的问题,所以要进行额外处理。【Q】为什么不需要对写处理呢?我想应该是因为只有Master处理写,处理完再发指令给Slave。

直接介绍带Flags的版本。

1
2
3
4
5
6
7
robj *lookupKeyRead(redisDb *db, robj *key) {
return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
}

robj *lookupKeyWrite(redisDb *db, robj *key) {
return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
}

WithFlags目前只包含了LOOKUP_NONELOOKUP_NOTOUCH两个选项。:

  1. LOOKUP_NONE
  2. LOOKUP_NOTOUCH
    表示这次访问不要更新LRU啥的,例如type这样的命令就带上这个参数。

lookupKeyReadWithFlags

下面查看lookupKeyReadWithFlags的实现,相比于写要复杂点,因为要处理键过期的时候读的问题。

1
2
3
4
5
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val;

if (expireIfNeeded(db,key) == 1) {
...

Master的masterhost肯定是NULL,这是一个经典判定。首先考虑Master的情况,如果key过期了,那么就直接安全地返回NULL,并且触发一个keymiss事件。这里注释上说在Master情况下,expireIfNeeded返回0当且只当这个key不存在。
为什么强调Master呢,实际上可以结合expireIfNeeded的实现来看。提前说一下,对Slave而言expireIfNeeded不会真的让key过期并删除,而只是返回key在逻辑上是过期的,而真正的过期是由Master来同步的,其目的是保持Slave和Master的一致。

1
2
3
4
5
6
...
/* Key expired. If we are in the context of a master, expireIfNeeded()
* returns 0 only when the key does not exist at all, so it's safe
* to return NULL ASAP. */
if (server.masterhost == NULL) {
...

更新统计信息,server.stat_keyspace_misses可以通过INFO keyspace_misses命令来查看。

1
2
3
4
5
6
...
server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
return NULL;
}
...

上面是对Master情况的处理,下面是对Slave的情况。我们已经知道,Slave并不会真的删除过期key,而是等待Master的Del指令。所以即使expireIfNeeded返回1表示过期,
但根据注释,对Slave而言,作为一个额外的安全措施,如果相关指令是只读的,还是可以在这里安全地返回NULL。Redis的说法是:对于只读命令,这样可以向client提供一个更加一致性的行为。这个会包含GETS,当使用Slave来扩容读的时候。我的理解就是尽管slave上还没有删除,但是过期就是过期,我们要和Master一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
...
if (server.current_client &&
server.current_client != server.master &&
server.current_client->cmd &&
server.current_client->cmd->flags & CMD_READONLY)
{
server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
return NULL;
}
}
val = lookupKey(db,key,flags);
if (val == NULL) {
server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
}
else
server.stat_keyspace_hits++;
return val;
}

lookupKeyWriteWithFlags

首先查看lookupKeyWriteWithFlags的实现,直接先检查下expire,然后调用lookupKey。这里的expireIfNeeded也是Redis的lazy过期策略的实现,在每次查找的时候都会调用,检查这个键是不是已经过期了。
不同于lookupKeyReadWithFlags,这里就不会统计keymiss啥的了。

1
2
3
4
5
6
7
8
9
10
// db.c
/* Lookup a key for write operations, and as a side effect, if needed, expires
* the key if its TTL is reached.
*
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
expireIfNeeded(db,key);
return lookupKey(db,key,flags);
}

lookUpKey

lookUpKey的主要内容包括从db里面找到对应的key,并且维护LRU或LFU。它是一个较为底层的 API。

1
2
3
4
5
6
7
8
9
10
// db.c
robj *lookupKey(redisDb *db, robj *key, int flags) {
// 从db中获得key对应的entry
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
// 如果找到了,就取出val
robj *val = dictGetVal(de);

// 如果没有设置
...

下面是LRU和LFU的实现,更新每个key的访问情况,从而方便后续evict。详细见有关updateLFU的实现见”Redis的LRU和LFU实现”这一章节。
但先要做一些判断:

  1. 如果设置了LOOKUP_NOTOUCH
  2. 如果有子进程正在进行保存,就不进行LFU操作,以免破坏COW。
1
2
3
4
5
6
7
8
9
10
11
12
13
...
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
}
return val;
} else {
return NULL;
}
}

expire

如何判断键已过期?

诸如EXPIRE/RENAME等的实现中会调用setExpire函数设置过期时间。setExpire会把每个键的过期时间都被存在db->expires这个字典里面。
通过getExpire可以从字典中读取到过期时间。

getExpire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* Return the expire time of the specified key, or -1 if no expire
* is associated with this key (i.e. the key is non volatile) */
long long getExpire(redisDb *db, robj *key) {
dictEntry *de;

/* No expire? return ASAP */
if (dictSize(db->expires) == 0 ||
(de = dictFind(db->expires,key->ptr)) == NULL) return -1;

/* The entry was found in the expire dict, this means it should also
* be present in the main dict (safety check). */
serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
return dictGetSignedIntegerVal(de);
}

keyIsExpired

keyIsExpired作用是判断某个键有没有过期。主要功能就是比较现在的时间,和获得的key的过期时间。被expireIfNeeded调用,

1
2
3
4
5
6
7
8
9
10
/* Check if the key is expired. */
int keyIsExpired(redisDb *db, robj *key) {
mstime_t when = getExpire(db,key);
mstime_t now;

if (when < 0) return 0; /* No expire for this key */

/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;
...

下面一段代码的目的是:如果在执行lua脚本,将时间设置成脚本执行开始的时间,这样在脚本执行过程中就不会expire。这么做的原因是源自Github上面的Issue1525。作者发现这个脚本在Master和Slave上的执行是不一样的。原因是在Master上第一次执行可能key存在,第二次就不存在了。这导致incr实际只被执行了一次。但是因为此时Master会合成一个DEL指令,让Slave也删除并过期这个Key。此时,如果相同的脚本运行在Slave上面,那么incr一次也不会被执行。

1
2
3
4
5
6
7
8
9
if redis.call("exists",KEYS[1]) == 1
then
redis.call("incr","mycounter")
end

if redis.call("exists",KEYS[1]) == 1
then
return redis.call("incr","mycounter")
end

为了保障向Slave和AOF的propagate是一致的,首先在执行lua脚本的时候,要禁止expire(就是这里的行为);但是在执行脚本之前,先要对涉及的key做下expireIfNeeded

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
...
if (server.lua_caller) {
now = server.lua_time_start;
}
/* If we are in the middle of a command execution, we still want to use
* a reference time that does not change: in that case we just use the
* cached time, that we update before each call in the call() function.
* This way we avoid that commands such as RPOPLPUSH or similar, that
* may re-open the same key multiple times, can invalidate an already
* open object in a next call, if the next call will see the key expired,
* while the first did not. */
else if (server.fixed_time_expire > 0) {
now = server.mstime;
}
/* For the other cases, we want to use the most fresh time we have. */
else {
now = mstime();
}

/* The key expired if the current (virtual or real) time is greater
* than the expire time of the key. */
return now > when;
}

expireIfNeeded

expireIfNeeded用来删除过期的键,它是被动expire的关键步骤。返回0表示键有效(键未过期,或永不过期),否则返回1表示已经过期并被删除。
对于Master,如果找到的键是expire的,会被从数据库中evict掉。并且会导致想AOF和Slave流propagate一条DEL或者UNLINK指令。

1
2
3
4
5
6
7
// db.c
/* This function is called when we are going to perform some operation
* in a given key, but such key may be already logically expired even if
* it still exists in the database. The main way this function is called
* is via lookupKey*() family of functions.
*/
int expireIfNeeded(redisDb *db, robj *key) {

如果没有过期,就返回0

1
2
3
...
if (!keyIsExpired(db,key)) return 0;
...

首先,通过keyIsExpired检测是不是已经过期了,如果还没有过期,上面就直接返回0了,再往下就是处理过期的情况。
根据注释,如果Redis运行在主从模式下,并且是在Slave上,expireIfNeeded直接返回,而不是继续删除键。这是因为Slave上的key过期是由Master控制的,Slave并不直接处理key的过期。Master会发送一个同步的DEL命令给Slave来删除某个键,Slave等到那时候再删除,这样做的目的是出于一致性的考量
但尽管如此,对Slave调用expireIfNeeded也应该返回一个正确的值,也就是这个时候键应不应该过期。因此,Slave上是先过期,然后再删除键的,这其中存在一个窗口时间,因为Slave还没有来得及收到并处理Master的DEL
下面肯定对应了已经过期的情况了。

1
2
3
...
if (server.masterhost != NULL) return 1;
...

下面负责通知删除事件,这里还出现了propagateExpire函数,我们也统一在后面讲解

1
2
3
4
5
6
7
...
server.stat_expiredkeys++;
// 向AOF文件和Slave节点传播过期信息,实际会调用propagate函数
propagateExpire(db,key,server.lazyfree_lazy_expire);
// 发送事件通知
notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",key,db->id);
...

下面是真正的过期删除的过程。这里根据server.lazyfree_lazy_expire的配置,可以选择异步删除或者同步删除,这类似于上面讨论过的UNLINKDEL的实现。事实上在expireGenericCommand上就可以看到对应的映射关系。

1
2
3
4
5
...
int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key);
if (retval) signalModifiedKey(NULL,db,key);
return retval;
}

主动expire实现

databasesCron可以看到,如果开启了主动expire,并且自己是master,则会定时运行activeExpireCycle。
介绍参数
active_expire_effort默认值为1,表示避免有超过10%的过期key,同时CPU占用不超过25%
config_keys_per_loop表示

1
2
3
4
5
6
7
8
9
10
11
12
void activeExpireCycle(int type) {
unsigned long
effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */
config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP +
ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,
config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION +
ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort,
config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC +
2*effort,
config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE-
effort;
...

下面是几个全局变量:

  1. timelimit_exit表示是否已经超时了。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    ...
    /* This function has some global state in order to continue the work
    * incrementally across calls. */
    static unsigned int current_db = 0; /* Last DB tested. */
    static int timelimit_exit = 0; /* Time limit hit in previous call? */
    static long long last_fast_cycle = 0; /* When last fast cycle ran. */

    int j, iteration = 0;
    int dbs_per_call = CRON_DBS_PER_CALL;
    long long start = ustime(), timelimit, elapsed;
    ...

如果所有的clients停止了,那么我们的主动expire循环也要停止,从而保持数据库是静态的。没搞懂为啥这么设计。

1
2
3
4
5
6
...
/* When clients are paused the dataset should be static not just from the
* POV of clients not being able to write, but also from the POV of
* expires and evictions of keys not being performed. */
if (clientsArePaused()) return;
...

在这里,Redis的主动过期策略分为了fast和slow两个模式。第一种在key比较少的情况下尝试是用较少的cpu,一旦这些过期的键的数量小于某个给定值,就退出。第二种更激进一点,以减少内存占用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
...
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exit
* for time limit, unless the percentage of estimated stale keys is
* too high. Also never repeat a fast cycle for the same period
* as the fast cycle total duration itself. */
if (!timelimit_exit &&
server.stat_expired_stale_perc < config_cycle_acceptable_stale)
return;

if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)
return;

last_fast_cycle = start;
}
...

每次扫多少db呢?默认dbs_per_call为CRON_DBS_PER_CALL,即16:

  1. dbs_per_call不能超过总的db数。
  2. 如果timelimit_exit,需要扫描全部db
    我的理解是如果上次active expire都超时了,说明肯定有很多expire key等待清理,我们全部做一遍,以免占用太多内存。
1
2
3
4
5
6
7
8
9
10
11
...
/* We usually should test CRON_DBS_PER_CALL per iteration, with
* two exceptions:
*
* 1) Don't test more DBs than we have.
* 2) If last time we hit the time limit, we want to scan all DBs
* in this iteration, as there is work to do in some DB and we don't want
* expired keys to use memory for too much time. */
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
...

在这里通过计算耗时,来限制active expire循环对CPU的占用。默认CPU限制是ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC。我们最多在这个函数中只能用timelimit这么多微秒server.hz指的是表示一秒钟被触发多少次,config_cycle_slow_time_perc是个CPU的百分比,也就是每次迭代中只能用config_cycle_slow_time_perc/100这么久。因为每次迭代的耗时是1/server.hz秒,即1000000/server.hz微秒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
/* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
* time per iteration. Since this function gets called with a frequency of
* server.hz times per second, the following is the max amount of
* microseconds we can spend in this function. */
timelimit = config_cycle_slow_time_perc*1000000/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;

if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = config_cycle_fast_duration; /* in microseconds. */

/* Accumulate some global stats as we expire keys, to have some idea
* about the number of keys that are already logically expired, but still
* existing inside the database. */
long total_sampled = 0;
long total_expired = 0;
...

外层的循环,遍历所有的数据库。如果timelimit_exit为1,说明内层循环中已经发现执行超时了,外层循坏也退出。

1
2
3
4
5
6
7
8
9
10
11
12
...
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
/* Expired and checked in a single loop. */
unsigned long expired, sampled;

redisDb *db = server.db+(current_db % server.dbnum);

/* Increment the DB now so we are sure if we run out of time
* in the current DB we'll restart from the next. This allows to
* distribute the time evenly across DBs. */
current_db++;
...

内层的循环,如果每次循环结束,还是有很高的没有处理的过期的key,就需要继续做。但我们也不能一直这么做下去,所以每过16次,就会检查是否超过timelimit。如果是的话,就设置timelimit_exit为1,然后退出当前循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
...
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;
iteration++;

/* If there is nothing to expire try next DB ASAP. */
if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
slots = dictSlots(db->expires);
now = mstime();

/* When there are less than 1% filled slots, sampling the key
* space is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. */
if (num && slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;

/* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones. */
expired = 0;
sampled = 0;
ttl_sum = 0;
ttl_samples = 0;

// 每个db最多抽样这么多个
if (num > config_keys_per_loop)
num = config_keys_per_loop;

/* Here we access the low level representation of the hash table
* for speed concerns: this makes this code coupled with dict.c,
* but it hardly changed in ten years.
*
* Note that certain places of the hash table may be empty,
* so we want also a stop condition about the number of
* buckets that we scanned. However scanning for free buckets
* is very fast: we are in the cache line scanning a sequential
* array of NULL pointers, so we can scan a lot more buckets
* than keys in the same time. */
long max_buckets = num*20;
long checked_buckets = 0;

while (sampled < num && checked_buckets < max_buckets) {
for (int table = 0; table < 2; table++) {
if (table == 1 && !dictIsRehashing(db->expires)) break;

unsigned long idx = db->expires_cursor;
idx &= db->expires->ht[table].sizemask;
dictEntry *de = db->expires->ht[table].table[idx];
long long ttl;

/* Scan the current bucket of the current table. */
checked_buckets++;
while(de) {
/* Get the next entry now since this entry may get
* deleted. */
dictEntry *e = de;
de = de->next;

ttl = dictGetSignedIntegerVal(e)-now;
if (activeExpireCycleTryExpire(db,e,now)) expired++;
if (ttl > 0) {
/* We want the average TTL of keys yet
* not expired. */
ttl_sum += ttl;
ttl_samples++;
}
sampled++;
}
}
db->expires_cursor++;
}
total_expired += expired;
total_sampled += sampled;

/* Update the average TTL stats for this database. */
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;

/* Do a simple running average with a few samples.
* We just use the current estimate with a weight of 2%
* and the previous estimate with a weight of 98%. */
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
}

// 这里就是检查并设置timelimit_exit
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
server.stat_expired_time_cap_reached_count++;
break;
}
}
} while (sampled == 0 ||
(expired*100/sampled) > config_cycle_acceptable_stale);
}

elapsed = ustime()-start;
server.stat_expire_cycle_time_used += elapsed;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);

/* Update our estimate of keys existing but yet to be expired.
* Running average with this sample accounting for 5%. */
double current_perc;
if (total_sampled) {
current_perc = (double)total_expired/total_sampled;
} else
current_perc = 0;
server.stat_expired_stale_perc = (current_perc*0.05)+
(server.stat_expired_stale_perc*0.95);
}

propagateExpire

在前面的代码中,还看到propagateExpire的使用。我们知道,在主从结构下,键实际的expire操作是在Master完成的。在expire之后,Master会发送DEL指令给Slave和AOF,也就是这个函数。
在注释中还指出,因为AOF,以及Master到Slave的连接都是保证有序的,所以即使有操作去写已经失效的key,都能保证结果是一致的。

1
2
3
4
5
6
7
8
9
10
11
12
13
void propagateExpire(redisDb *db, robj *key, int lazy) {
robj *argv[2];

argv[0] = lazy ? shared.unlink : shared.del;
argv[1] = key;
incrRefCount(argv[0]);
incrRefCount(argv[1]);

propagate(server.delCommand,db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);

decrRefCount(argv[0]);
decrRefCount(argv[1]);
}

propagate机制

在 expire 中,提到了 propagate 函数,因此这里也顺便介绍一些 propagate 机制。
propagate 机制是 Redis 主从复制逻辑的一部分。通常来说,Redis主从复制包含两个机制

  1. sync/psync
    用来处理 sync 和 psync 指令,也就是刚开始来个全量同步,将 Slave 的状态更新至 Master 当前状态。
  2. propagate
    将指令从 Master 同步到 Slave 或者 AOF 文件。

propagate 机制将特定的指令传播给 AOF 或者 Slave,这些指令有下面几种:

  1. PROPAGATE_NONE
    压根就不传播。
  2. PROPAGATE_AOF
    如果开启了 AOF,就传播给 AOF。此时就会调用 AOF 的主入口函数 feedAppendOnlyFile。RDB 和 AOF 机制在专门的文章介绍。
  3. PROPAGATE_REPL
    传播给 Slave。同样调用 replicationFeedSlaves 函数。

根据注释,不能够在各个 command 的实现代码中使用这个函数,因为它不会 wrap the resulting commands in MULTI/EXEC,如果需要,应该用 alsoPropagatepreventCommandPropagationforceCommandPropagation 等。
However for functions that need to (also) propagate out of the context of a command execution, for example when serving a blocked client, you want to use propagate().

1
2
3
4
5
6
7
8
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

evict实现

介绍

Redis对当前执行环节的判断

  1. server.masterhost == NULL
    常常被用来判断是不是 Master 服务器
  2. server.current_client != server.master
    根据注释,这是指的服务器的当前客户端,仅用于崩溃报告。
  3. sentinelRedisInstance->flags & (SRI_MASTER|SRI_SLAVE)
  4. sentinelRedisInstance->slave_master_host

大家都知道,Redis里面有下面几种evict policy

  1. noeviction
    这是默认情况。
    内存爆了,就直接报错。
  2. allkeys-lru
    对所有的键做LRU。
  3. allkeys-lfu
    对所有的键做LFU。
  4. allkeys-random
    对所有的key做随机删除。
  5. volatile-lru/volatile-lfu/volatile-random
    这是对有expire的键做对应的操作。
  6. volatile-ttl
    删除剩余生命最短的键。

而对应的实现,就在freeMemoryIfNeeded中。根据注释,这个函数被定时调用,当发现超出最大使用内存后,就会释放相关内存。如果释放内存成功,或者我们不需要释放内存,那么返回C_OK;如果我们没有能够释放足够的内存,那么返回C_ERR。总之一堆废话。。。其实想了解的是这几个问题:

  1. 如何计算现在已经使用了多少内存?
  2. 如何实现LFU和LRU?
  3. 释放内存会对其他模块产生什么影响?

LRU和LFU的一般实现及优缺点讨论

LRU

对于 LRU,一个队列就行了,把最近用到的元素放到队列尾部,需要 evict 的时候就弹出头部,一般用双向队列就行。但这样查找一个 Key 就变成 O(n) 的了,但这也不难,只需要用一个 map 记录一下对应元素在队列中的位置就行。也就是说,用hash+双向链表来维护。hash 用来实现 O(1)查询,双向链表用来维护顺序。

Redis并没有采用这个办法来维护一个LRU,显然内存开销很大,这是值得的么?Redis 有专门的文章中讨论这个事情。他们的结论是当 maxmemory-samples 数为10的时候,也就是随机选10个里面去掉一个,这样的近似 LRU 算法的性能已经很好了。

因此,Redis 实际上是记录了最后一次访问某个 key 的时间戳的。当然这倒不是因为复用 LFU 的空间,毕竟 LRU 是先有的。此外,还有一个 evictionPoolEntry。这个 pool 的容量是16,里面的 key 是按照 LRU 有序排列的。

Redis 构造了一个负载。顺序地从头到尾访问,此时第一个 key 就是理想 LRU 的 candidate。然后再加入 50% 的 key,从而让 50% 的老 key 被 eveit 掉。比对四种算法。图中浅灰色点为被 evict 的 key,灰色点为没有被 evict 掉的 key,绿色点是新增加的 key。

  1. 理论上的 LRU 中,前一半的 old key 全部被 evict 掉了。后一半的没有被 evict 掉,保持深灰色。
  2. Redis 3.0 因为使用了 pool,所以比 2.8 好一些。

LFU

实现 LFU 时需要记录对应的访问次数。在淘汰时选择最少访问次数的键值对。此时队列的性质就不够用了,但可以考虑下面的方案

  1. 用优先队列,把访问次数作为 key,大不了手动实现一个二叉堆嘛。
  2. 用一个双层链表,第一层是从0开始的访问次数,第二层是具有这个访问次数的所有键值对的开链表。为了节约空间,第一层可以是哈希表的形式。

Redis的LRU和LFU实现

本章介绍了 Redis 对 LRU 和 LFU 数据结构的维护,这是必要的前置知识。
这一部分实现在先前介绍过的 lookupKey 函数中。Redis 对每个 robj 对象去维护了一个 lru:LRU_BITS字段。在3.0版本,这个字段被用来存储当前秒级别的时间戳,服务于 LRU,后续版本还会服务 LFU。具体选择 LRU 还是 LFU 是根据 server.maxmemory_policy来定的。

1
2
3
4
5
6
7
8
// In lookUpKey
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
}

真正的 evict 的时刻是 freeMemoryIfNeeded 函数。

维护了的LRU或者是LFU在evictionPoolPopulate中起作用,会分别根据estimateObjectIdleTime255-LFUDecrAndReturn(e)进行排序。

LRU

LRU_CLOCK这里会选择是直接用server.lruclock(也是在serverCron里面调用getLRUClock设置的),或者直接自己调用一次getLRUClock。这个比较是怎么来的呢?有必要介绍一下,毕竟诸如run_with_period里面也有这样的比较。
首先,在文章中已经介绍过,server.hz指的是表示一秒钟被触发多少次。那么1000/server.hz就表示触发1次要多少毫秒。LRU_CLOCK_RESOLUTION的默认值是1000,表示时钟精度是1000毫秒调用一次。所以只要LRU的精度小于server调用的精度,就可以复用server.lruclock,从而少调用一次getLRUClock。
【Q】岂不是大多数情况下都可以复用server的时钟?毕竟hz不会为0.5啊。

1
2
3
4
5
6
7
8
9
10
11
12
13
// server.h
#define LRU_BITS 24

// evict.c
unsigned int LRU_CLOCK(void) {
unsigned int lruclock;
if (1000/server.hz <= LRU_CLOCK_RESOLUTION) {
lruclock = server.lruclock;
} else {
lruclock = getLRUClock();
}
return lruclock;
}

LFU

在访问一个对象的时候,用 updateLFU 更新 lru 字段。这个函数会在高16位存一个分钟级别的时间戳 ldt,在低8位存访问计数 counter。这两个值被存放在一个字段中完全是为了节省空间和复用字段,其组合后的值整体上没有排序意义。
执行 LFU 策略时更新 lru 字段需要注意两点,即要根据时间衰减,但也要根据访问次数增长

  1. 首先,通过 LFUDecrAndReturn,减少 counter
    减少的值与当前时间和记录的 ldt 的差值有关。也就是隔得时间越长,减的越多。
  2. 然后,通过 LFULogIncr以一定概率增加 counter。
  3. 最后,将最新的 counter 和 ldt 重新组装起来存入 lru 中。
1
2
3
4
5
6
7
8
9
10
// db.c
/*
* Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */
void updateLFU(robj *val) {
unsigned long counter = LFUDecrAndReturn(val);
counter = LFULogIncr(counter);
// 组装lru字段
val->lru = (LFUGetTimeInMinutes()<<8) | counter;
}

衰减counter

LFUDecrAndReturn 返回 counter,和当前对象的 frequency 正相关。这个函数在返回前会先根据当前访问时刻和上次记录的 ldt 的差值来减少 counter 的值。
counter 具体减少多少由 server.lfu_decay_time 决定。它是个衰变因子,默认是1,这时候对 counter 的减少就是经过的分钟数。如果将它设为更大的值,则 counter 减少的量会变少。它还有个特殊值0,表示 counter 只能加不能减。
每次衰减会对 counter 减去 num_periods,知道减少到0为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
// evict.c
unsigned long LFUDecrAndReturn(robj *o) {
unsigned long ldt = o->lru >> 8; // 取出老的分钟时间戳
unsigned long counter = o->lru & 255; // 取出老的计数
unsigned long num_periods = server.lfu_decay_time ?
LFUTimeElapsed(ldt) / server.lfu_decay_time :
0;
if (num_periods)
counter = (num_periods > counter) ? 0 : counter - num_periods;
return counter;
}
// config.c
createIntConfig("lfu-decay-time", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_decay_time, 1, INTEGER_CONFIG, NULL, NULL)

简单介绍 LFUTimeElapsed,用来计算从 ldt 开始经过了多少分钟。

1
2
3
4
5
6
7
// 获得从ldt开始经过了多少分钟
unsigned long LFUTimeElapsed(unsigned long ldt) {
unsigned long now = LFUGetTimeInMinutes();
if (now >= ldt) return now-ldt;
// 如果now小了,就当成已经wrap了刚好一次,这个和estimateObjectIdleTime的实现是类似的
return 65535-ldt+now;
}

【Q】其实我觉得这里有个优化点,可能每次访问的时候 num_periods 都是 0.9 这样的数,结果导致每次都不衰减。我想对于这种情况,最好就不要更新 ldt,让它和下一次的攒起来一起算。

增加counter

介绍 counter 随访问次数的增长 LFULogIncr
每次访问都需要增加访问计数,但未必每次都自增,而是随机出一个 r,当它小于阈值 p 后才会自增 counter
其中p的值是1.0/(baseval*lfu_log_factor+1),其中 basevalmax(0,counter-LFU_INIT_VAL)

可以发现 p 是在 0 和 1 之间的。但进一步讨论下 baseval 和 p 的关系:

  1. baseval=0,p=1
  2. baseval=1,p=0.09
  3. baseval=10,p=0.009

可以看到:

  1. counter 越大,counter 的自增概率就越小。
  2. lfu_log_factor 越大,counter 的自增概率就越小
    其实 counter 的增长和实际的访问次数是成对数关系的。如果我们需要存储很高的访问频次,可以设置更大的 lfu_log_factor
    这样的设计使得区区 8 bits 足够存储很大的命中次数。

在更新版本的redis.conf中,列出了不同 lfu_log_factor 取值下,若干次 hit 之后,counter 增加的数量。

因为r是随机取的,所以可能用数学计算挺困难的。

1
2
3
4
5
6
7
8
9
10
11
+--------+------------+------------+------------+------------+------------+
| factor | 100 hits | 1000 hits | 100K hits | 1M hits | 10M hits |
+--------+------------+------------+------------+------------+------------+
| 0 | 104 | 255 | 255 | 255 | 255 |
+--------+------------+------------+------------+------------+------------+
| 1 | 18 | 49 | 255 | 255 | 255 |
+--------+------------+------------+------------+------------+------------+
| 10 | 10 | 18 | 142 | 255 | 255 |
+--------+------------+------------+------------+------------+------------+
| 100 | 8 | 11 | 49 | 143 | 255 |
+--------+------------+------------+------------+------------+------------+

还需要特别介绍下 LFU_INIT_VAL,每个对象在初始化时,对应的 counter 是 LFU_INIT_VAL 即 5。没有这个,新生对象就会在 LFUDecrAndReturn 的时候,因为 counter 很小被很快淘汰掉。

当然,在 LFULogIncr 时需要将它还原回实际的值来计算 p。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// server.h
#define LFU_INIT_VAL 5

// evict.c
uint8_t LFULogIncr(uint8_t counter) {
// 确保不会回绕
if (counter == 255) return 255;
// 随机数r
double r = (double)rand()/RAND_MAX;
double baseval = counter - LFU_INIT_VAL;
if (baseval < 0) baseval = 0;
double p = 1.0/(baseval*server.lfu_log_factor+1);
// p小于该随机数r才增长
if (r < p) counter++;
return counter;
}
// config.c
createIntConfig("lfu-log-factor", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_log_factor, 10, INTEGER_CONFIG, NULL, NULL)

evictionPoolEntry和evictionPoolPopulate

来看 evictionPoolPopulate 这个函数,它作用是往 evictionPool 里面加一些 evictionPoolEntry。一个 evictionPoolEntry 表示数据库中的某个 key。一系列 evictionPoolEntry 组成一个 evictionPool。在 evictionPool 中的 entry 都是按照 idle 排序的,从小到大升序排列,最左边的 idle time 最小。因此 evict 的时候可以只看最左边的 pool。

1
2
3
4
5
6
7
8
9
10
#define EVPOOL_SIZE 16
#define EVPOOL_CACHED_SDS_SIZE 255
struct evictionPoolEntry {
unsigned long long idle; /* Object idle time (inverse frequency for LFU) */
sds key; /* Key name. */
sds cached; /* Cached SDS object for key name. */
int dbid; /* Key DB number. */
};

static struct evictionPoolEntry *EvictionPoolLRU;
  1. idle
    表示每个对象的空闲时间。pool 里面只能加入具有更大 idle time 的键。如果还有空余空间,就始终加入。
  2. cached
    这是一个有趣的优化
    如果 key 的长度比较小,它就会被存在预分配好空间的 cached 结构中,从而避免在 key 中分配空间的开销。
  3. dbid
    表示这个键所属的数据库。

如何根据 LRU 或者 LFU 计算 idle 呢?

  1. 如果采用LRU
    调用 estimateObjectIdleTime 函数计算,实际上就是乘以一个 LRU_CLOCK_RESOLUTION。这里实现上还处理了一下回绕 wrap 的情况。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    unsigned long long estimateObjectIdleTime(robj *o) {
    unsigned long long lruclock = LRU_CLOCK();
    if (lruclock >= o->lru) {
    return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
    } else {
    return (lruclock + (LRU_CLOCK_MAX - o->lru)) *
    LRU_CLOCK_RESOLUTION;
    }
    }
  2. 如果采用LFU
    这里要反向一下,就是用255减一下 LFUDecrAndReturn(o)。因为idle和访问频率是相反的。

下面是 evictionPoolPopulate 的具体实现。
输入参数:

  1. sampledict 表示从哪个 dict 里面进行采样,根据策略不同,可能是 dict(allkeys 策略) 或者 expire(volatile 策略)。
  2. keydict 只能是对应的 dict。因为 expire 里面只是存一个”引用”。
1
2
3
4
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
int j, k, count;
dictEntry *samples[server.maxmemory_samples];
...

dictGetSomeKeys 函数从 dict 里面任意取出若干个entry。server.maxmemory_samples 默认被设置成5个,对应于前几章的论述,后面应该会改成10个吧。在取出这些 entry 到 samples后,挨个尝试将它们插入到 pool 中。

1
2
3
4
5
6
7
8
9
10
11
...
count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
for (j = 0; j < count; j++) {
unsigned long long idle;
sds key;
robj *o;
dictEntry *de;

de = samples[j];
key = dictGetKey(de);
...

下面的一段代码计算这个对象的 idle 时间。具体的策略在前面讲过了,但首先需要考虑当前的 dict 可能是 expire,这样就要回 dict 表再查一次。得到对应的 keydict 中的 entry 即 de,以及 key 对应的 val 即 o

  1. 回表的情况
    显然,只要 sampledict 不等于 keydict 就需要回表。因为此时 sampledict 肯定是 expire。
  2. 不回表的情况就可以直接取 val
  3. 特殊情况:volatile-ttl 策略
    前面两种情况都是用的 keydict 中对应的 entry、key 和 val,但这里直接用 sampledict
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
...
if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
if (sampledict != keydict) de = dictFind(keydict, key);
o = dictGetVal(de);
}

/* Calculate the idle time according to the policy. This is called
* idle just because the code initially handled LRU, but is in fact
* just a score where an higher score means better candidate. */
if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
idle = estimateObjectIdleTime(o);
} else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
idle = 255-LFUDecrAndReturn(o);
} else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
/* In this case the sooner the expire the better. */
idle = ULLONG_MAX - (long)dictGetVal(de);
} else {
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
}
...

上面计算出了当前 key 对应的 idle 时间。接下来将元素插入到 pool 中。这是一个类似插入排序的过程。
首先,找到第一个空 bucket,或者找到第一个 idle <= pool[k].idle ,可以插到它前面。

下面的循环能够跳过所有不满足以上条件的情况。【Q】这里能直接二分么?

1
2
3
4
5
6
7
8
9
...
/* Insert the element inside the pool.
* First, find the first empty bucket or the first populated
* bucket that has an idle time smaller than our idle time. */
k = 0;
while (k < EVPOOL_SIZE &&
pool[k].key &&
pool[k].idle < idle) k++;
...

下面处理两种特殊情况:

  1. 我们在处理最左边的桶,此时待插入 key 的 idle 比 pool 里面所有的 idle 都要小,但没有空余的格子了。
    这说明这个 entry 非常新,我们没必要将它插入到 pool 中,所以直接诶 continue 掉。
  2. bucket 完全空的情况,可以直接用最左边的格子。
1
2
3
4
5
6
7
8
9
...
if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
/* Can't insert if the element is < the worst element we have
* and there are no empty buckets. */
continue;
} else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
/* Inserting into empty position. No setup needed before insert. */
} else {
...

注意,keycache 有点类似于自引用结构的关系,但其实不是。因为 key 和 cached 实际上都是 sds,也就是个 char*。移动或者复制 sds,只是移动指针,并没有改变指向的内容。所以只需要保证只要它不释放就行
当然,发散一下,如果 key 是一个指向 cachedsds*,那就真的是自引用结构了。但也并不需要绑定 key 和它可能指向的 cached 在一个结构中。因为如下所示,它们可以是一一对应的

1
2
| key 1 | key 2 | key 3 | nul k |
| nul c | cac 3 | cac 2 | cac 1 |

接下来看最一般的情况。此时 k 是第一个满足 idle <= pool[k].idle,我们的新 entry 应该插入在 k 之前。下面就插入排序,把待插入的 de 插入,然后把原来 k 以及之后的数字往右边移动。这里使用了 memmove,它能自动检测 src 内存和 dest 内存重叠的情况并处理,是更安全的 memcpy。
又可以细分为两种情况:

  1. 最右边还有空位,将 [k,) 整体右移一格,新 entry 预计插入 k
    尽管这时候最右边的 cached 是空,但还是要备份。否则就会泄露掉那一块内存。
  2. 最右边没有空位,将整个数组右移一格,新entry预计插入在k-1
    此时最左边是idle最小的,将它从pool里面去掉,换成idle更大的。
    但同时,也要保证最左边的cached不被意外释放。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
...
if (pool[EVPOOL_SIZE-1].key == NULL) {
/* Free space on the right? Insert at k shifting
* all the elements from k to end to the right. */

sds cached = pool[EVPOOL_SIZE-1].cached;
memmove(pool+k+1,pool+k,
sizeof(pool[0])*(EVPOOL_SIZE-k-1));
pool[k].cached = cached;
} else {
/* No free space on right? Insert at k-1 */
k--;
/* Shift all elements on the left of k (included) to the
* left, so we discard the element with smaller idle time. */
sds cached = pool[0].cached; /* Save SDS before overwriting. */
if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
memmove(pool,pool+1,sizeof(pool[0])*k);
pool[k].cached = cached;
}
}
...

下面,将新entry插入pool中。这里说明下cached的使用:

  1. 如果key的长度大于EVPOOL_CACHED_SDS_SIZE
    则复制keypool[k].key
  2. 如果key的长度较小,就可以尝试做优化,将它放在cached中,然后让把cache赋值给key,从而避免复制底层的字符串。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
// 尝试复用pool entry中的cached SDS。因为内存分配和回收还是开销比较大的。
int klen = sdslen(key);
if (klen > EVPOOL_CACHED_SDS_SIZE) {
pool[k].key = sdsdup(key);
} else {
memcpy(pool[k].cached,key,klen+1);
sdssetlen(pool[k].cached,klen);
pool[k].key = pool[k].cached;
}
pool[k].idle = idle;
pool[k].dbid = dbid;
}
}

dictGetSomeKeys

dictGetSomeKeys这个函数,是对一个dict来说的,而不是对db来说的。
它不保证一定返回正好count个,也不保证返回的元素都不重复。返回值被存到des里面,需要保证这个数组至少能容纳count个。
取出来的指针存在des中返回。des必须预分配至少count个空间,尽管函数可能未必能取到count个。其原因可能是本来就没那么多个,或者我们经过多轮迭代没添加完。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* This function samples the dictionary to return a few keys from random
* locations.
*
* Note that this function is not suitable when you need a good distribution
* of the returned items, but only when you need to "sample" a given number
* of continuous elements to run some kind of algorithm or to produce
* statistics. However the function is much faster than dictGetRandomKey()
* at producing N elements. */
unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {
unsigned long j; /* internal hash table id, 0 or 1. */
unsigned long tables; /* 1 or 2 tables? */
unsigned long stored = 0, maxsizemask;
unsigned long maxsteps;

if (dictSize(d) < count) count = dictSize(d);
maxsteps = count*10;

首先,执行一点渐进式rehash。然后将maxsizemask设置为所有ht(没有rehash是1个,有是2个)的最大容量。
i设置为随机一个位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
...
/* Try to do a rehashing work proportional to 'count'. */
for (j = 0; j < count; j++) {
if (dictIsRehashing(d))
_dictRehashStep(d);
else
break;
}

tables = dictIsRehashing(d) ? 2 : 1;
maxsizemask = d->ht[0].sizemask;
if (tables > 1 && maxsizemask < d->ht[1].sizemask)
maxsizemask = d->ht[1].sizemask;
/* Pick a random point inside the larger table. */
unsigned long i = random() & maxsizemask;
...

下面进入主循环,循环条件有两个,一个是取满count个,一个是执行最多maxsteps=count*10次。
在每一次迭代中,对所有的ht(1或2个)进行处理。

1
2
3
4
5
...
unsigned long emptylen = 0; /* Continuous empty entries so far. */
while(stored < count && maxsteps--) {
for (j = 0; j < tables; j++) {
...

涉及到rehashidx相关的逻辑,表示在每次进入dictRehash函数的时候,首先ht[0].table[rehashidx]这个桶。如果现在在rehash过程中,到d->rehashidx为止的所有index都已经被访问过了。实际上这些桶里面都空(not populated)了,因此我们可以跳过ht[0]里面$[0,idx-1]$这个区间的关卡,直接去看ht[1]里面的。这其实是一个优化,在dictRehash实现中,也有对空桶跳过的优化。
特别地,如果iht[1]里面也已经超了,这就表示截止到rehashidx两个表里面都没有了。【Q】为什么可以认为ht[1]中的rehashidx之前的也不需要判定了呢?或者说,为啥两个ht可以共享一个i呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
...
if (tables == 2 && j == 0 && i < (unsigned long) d->rehashidx) {
if (i >= d->ht[1].size)
i = d->rehashidx;
else
continue;
}
if (i >= d->ht[j].size) continue; /* Out of range for this table. */
dictEntry *he = d->ht[j].table[i];

/* Count contiguous empty buckets, and jump to other
* locations if they reach 'count' (with a minimum of 5). */
if (he == NULL) {
// 我们会统计遇到连续空桶的数量,如果超过了5个,就重新随机一个位置。
emptylen++;
if (emptylen >= 5 && emptylen > count) {
i = random() & maxsizemask;
emptylen = 0;
}
} else {
// 否则,我们使用桶里面所有的元素
emptylen = 0;
while (he) {
*des = he;
des++;
he = he->next;
stored++;
if (stored == count) return stored;
}
}
}
...

在主循环结束后,会自增i的值。

1
2
3
4
5
...
i = (i+1) & maxsizemask;
}
return stored;
}

主逻辑freeMemoryIfNeeded

freeMemoryIfNeeded函数是evict的主要逻辑。
首先,如果是从服务器,并且配置了server.repl_slave_ignore_maxmemory就忽略。

1
2
3
4
5
6
int freeMemoryIfNeeded(void) {
int keys_freed = 0;
/* By default replicas should ignore maxmemory
* and just be masters exact copies. */
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
...

下面就来计算占用了多少内存mem_reported,主要函数getMaxmemoryState我们放在后面单独讲解。mem_reported表示总共用了多少内存,mem_tofree表示应该释放多少内存(不算Slave和AOF的缓存)。
clientsArePaused的检查,有点奇怪。根据注释,它的意思是,如果client都被pause了,那么数据就是静止的。不仅对于所有的client是这样,对于还没有做expire和evict的所有key也是这样。我觉得这应该是一个优化,防止在这种情况下再走下面的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
...
size_t mem_reported, mem_tofree, mem_freed;
mstime_t latency, eviction_latency, lazyfree_latency;
long long delta;
int slaves = listLength(server.slaves);
int result = C_ERR;

if (clientsArePaused()) return C_OK;
if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
return C_OK;

mem_freed = 0;
...

latencyStartMonitor这个宏和stopwatch一样。

1
2
3
...
latencyStartMonitor(latency);
...

下面开始根据淘汰政策maxmemory_policy进行讨论,如果是noeviction,那就直接返回。

1
2
3
4
...
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
goto cant_free; /* We need to free memory, but policy forbids. */
...

整个内存释放过程是多次的,因此用一个循环来。

1
2
3
4
5
6
7
8
9
10
...
while (mem_freed < mem_tofree) {
int j, k, i;
static unsigned int next_db = 0;
sds bestkey = NULL;
int bestdbid;
redisDb *db;
dict *dict;
dictEntry *de;
...

处理要排序的情况

第一个 if 用来处理所有需要排序的情况。查看代码,要用 while 循环去找 bestkey,原因是可能从 pool 里面找到的 key 不存在了,【Q】可是究竟什么情况下会发生这个情况呢?
循环里面的过程就是我们去遍历整个数据库里面的所有db,如果它的dict或者expires不为空,则调用evictionPoolPopulate。这个函数会往pool里面加入一些key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
...
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
struct evictionPoolEntry *pool = EvictionPoolLRU;

while(bestkey == NULL) {
unsigned long total_keys = 0, keys;

/* We don't want to make local-db choices when expiring keys,
* so to start populate the eviction pool sampling keys from
* every DB. */
for (i = 0; i < server.dbnum; i++) {
db = server.db+i;
dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->dict : db->expires;
if ((keys = dictSize(dict)) != 0) {
evictionPoolPopulate(i, dict, db->dict, pool);
total_keys += keys;
}
}
if (!total_keys) break; /* No keys to evict. */
...

下面遍历整个 pool,找到最合适的一个。解释几个问题:

  1. 为什么要从尾往头遍历?
    在对evictionPool的介绍中提到,它是有序的,最左边的idle time最小,最右边的最大,因此优先淘汰右边的。
  2. 为什么要有bestdbid?将key之间的比较转化为数据库之间的比较么?
  3. server.db[pool[k].dbid]是什么鬼?
    实际上是要选择pool[k].dbid这个db。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
...
/* Go backward from best to worst element to evict. */
for (k = EVPOOL_SIZE-1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
bestdbid = pool[k].dbid;

if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
de = dictFind(server.db[pool[k].dbid].dict,
pool[k].key);
} else {
de = dictFind(server.db[pool[k].dbid].expires,
pool[k].key);
}

/* Remove the entry from the pool. */
if (pool[k].key != pool[k].cached)
sdsfree(pool[k].key);
pool[k].key = NULL;
pool[k].idle = 0;

/* If the key exists, is our pick. Otherwise it is
* a ghost and we need to try the next element. */
if (de) {
bestkey = dictGetKey(de);
break;
} else {
/* Ghost... Iterate again. */
// 这个很奇怪,什么时候会出现这种情况呢?
}
}
}
}
...

处理随机情况

第二个if,用来处理随机的情况。这个很简单,直接调用dictGetRandomKey就行,和eviction pool也没啥关系了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
...
/* volatile-random and allkeys-random policy */
else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
{
/* When evicting a random key, we try to evict a key for
* each DB, so we use the static 'next_db' variable to
* incrementally visit all DBs. */
for (i = 0; i < server.dbnum; i++) {
j = (++next_db) % server.dbnum;
db = server.db+j;
dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
db->dict : db->expires;
if (dictSize(dict) != 0) {
de = dictGetRandomKey(dict);
bestkey = dictGetKey(de);
bestdbid = j;
break;
}
}
}
...

删除元素

如果我们找到了要删除的元素bestkey,就执行删除元素过程。
首先,调用老朋友propagateExpire,这个会发送一条删除指令给AOF/Slave。

1
2
3
4
5
6
7
...
/* Finally remove the selected key. */
if (bestkey) {
db = server.db+bestdbid;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
...

接着,我们统计这次evict释放了多少内存,就是首尾两个zmalloc_used_memory相减。这个有点粗略了,就在刚才我们还将AOF/Slave缓存单独拿出来算的呢,现在直接总内存相减了。在注释中还提到,有可能用来propagateExpire的内存比我们释放的db内存还多呢,但我们是管不了的,否则mem_freed < mem_tofree这个循环条件永远达不到了。并且,这些缓存终究会被释放的。
这里还统计了一下调用dictSyncDelete等的时间,并且通过latencyAddSampleIfNeeded放到统计里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
...
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
signalModifiedKey(NULL,db,keyobj);
latencyEndMonitor(eviction_latency);
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
delta -= (long long) zmalloc_used_memory();
mem_freed += delta;
...

下面是一些统计性的工作。

1
2
3
4
5
6
7
...
server.stat_evictedkeys++;
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
decrRefCount(keyobj);
keys_freed++;
...

我们在循环中就强制往Slave发送数据,确保即使在要传的数据都很大的情况下,我们仍然能够快速传递。
特别地,我们在while (mem_freed < mem_tofree)这个循环的最后,还会有条件地检查一下内存是不是达标。这个主要是对异步删除来说的,在这种情况下,dbAsyncDelete流程中对内存的释放未必能和我们循环这边同步起来。所以我们每释放16个键,就检查一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
if (slaves) flushSlavesOutputBuffers();

if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
/* Let's satisfy our stop condition. */
mem_freed = mem_tofree;
}
}
} else {
goto cant_free; /* nothing to free... */
}
}
result = C_OK;
...

异常情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
...
cant_free:
/* We are here if we are not able to reclaim memory. There is only one
* last thing we can try: check if the lazyfree thread has jobs in queue
* and wait... */
if (result != C_OK) {
latencyStartMonitor(lazyfree_latency);
while(bioPendingJobsOfType(BIO_LAZY_FREE)) {
if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
result = C_OK;
break;
}
usleep(1000);
}
latencyEndMonitor(lazyfree_latency);
latencyAddSampleIfNeeded("eviction-lazyfree",lazyfree_latency);
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
return result;
}

getMaxmemoryState

这个函数获得内存的使用情况,包括:

  1. total
    总共使用的内存。
    来自zmalloc_used_memory
  2. logical
    mem_used,表示出了Slave/AOF buffer之外的内存。
    这个计算就是要减去overhead,也就是Slave/AOF buffer的内存,用freeMemoryGetNotCountedMemory计算得到的。
  3. level
    表示内存使用率
1
2
3
4
5
6
7
8
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level) {
size_t mem_reported, mem_used, mem_tofree;

/* Check if we are over the memory usage limit. If we are not, no need
* to subtract the slaves output buffers. We can just return ASAP. */
mem_reported = zmalloc_used_memory();
if (total) *total = mem_reported;
...

上面获得了总内存量,如果没有设置最大内存,或者总内存量都没有操作,也不需要计算比例,那么就直接返回了

1
2
3
4
5
...
/* We may return ASAP if there is no need to compute the level. */
int return_ok_asap = !server.maxmemory || mem_reported <= server.maxmemory;
if (return_ok_asap && !level) return C_OK;
...

计算两个缓冲区占用的内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
...
/* Remove the size of slaves output buffers and AOF buffer from the
* count of used memory. */
mem_used = mem_reported;
size_t overhead = freeMemoryGetNotCountedMemory();
mem_used = (mem_used > overhead) ? mem_used-overhead : 0;

/* Compute the ratio of memory usage. */
if (level) {
if (!server.maxmemory) {
*level = 0;
} else {
*level = (float)mem_used / (float)server.maxmemory;
}
}

if (return_ok_asap) return C_OK;

/* Check if we are still over the memory limit. */
if (mem_used <= server.maxmemory) return C_OK;

/* Compute how much memory we need to free. */
mem_tofree = mem_used - server.maxmemory;

if (logical) *logical = mem_used;
if (tofree) *tofree = mem_tofree;

return C_ERR;
}

Redis事件

在前面的代码中可以看到下面的语句,实际上是对主数据库c->db进行修改后,需要进行事件通知,我们将在下面介绍这几个语句的作用。

1
2
3
4
signalModifiedKey(c,c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"del",c->argv[j],c->db->id);
server.dirty++;

signalModifiedKey

signalModifiedKey是key被修改的钩子函数,每当数据库c->db里面的key被改动时,会调用这个函数。这里的key发生改动也包括key对应的值发生改动,这是因为从genericSetKey的实现可以看到,SET指令也会导致signalModifiedKey被调用。
此外,根据注释,每一次DB被flush时,signalFlushDb会被调用。

1
2
3
4
5
6
// db.c

void signalModifiedKey(client *c, redisDb *db, robj *key) {
touchWatchedKey(db,key);
trackingInvalidateKey(c,key);
}

touchWatchedKey

touchWatchedKey字如其名,它的作用是让WATCH这个键的事务失效。

1
2
3
4
5
6
7
/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
...

这里先特判一下,如果db->watched_keys为空就直接返回,这个用法在redis中非常常见,我猜想可能是dictFind的开销还是比较大的。

1
2
3
...
if (dictSize(db->watched_keys) == 0) return;
...

下面从db->watched_keys上拿到WATCH这个key的所有的client,并且对这个链表上的每一个client设置CLIENT_DIRTY_CAS这个flag。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
// 这个函数是dictFind(只能得到dictEntry)和dictGetVal的简单封装
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;

/* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
/* Check if we are already watching for this key */
listRewind(clients,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);

c->flags |= CLIENT_DIRTY_CAS;
}
}

trackingInvalidateKey

下面看另一个函数trackingInvalidateKey。这个系列的函数是在Redis6.0左右被引入的,主要用途是维护客户端缓存。

1
2
3
4
5
/* Wrapper (the one actually called across the core) to pass the key
* as object. */
void trackingInvalidateKey(client *c, robj *keyobj) {
trackingInvalidateKeyRaw(c,keyobj->ptr,sdslen(keyobj->ptr),1);
}

当key的值被改变后,在keys tracking的逻辑下,我们的任务是给每一个有可能缓存了当前keys的client发送通知。如果传入的c为空,表示这个不是一个client的场景,而是例如服务器内部做expire。
bcast参数的作用是是否要将这个key通过BCAST模式广播给client们。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/* 
* This is the case when the function is called from the Redis core once a key is modified, however
* we also call the function in order to evict keys in the key table in case
* of memory pressure: in that case the key didn't really change, so we want
* just to notify the clients that are in the table for this key, that would
* otherwise miss the fact we are no longer tracking the key for them. */
void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) {
if (TrackingTable == NULL) return;

if (bcast && raxSize(PrefixTable) > 0)
trackingRememberKeyToBroadcast(c,key,keylen);

rax *ids = raxFind(TrackingTable,(unsigned char*)key,keylen);
if (ids == raxNotFound) return;

raxIterator ri;
raxStart(&ri,ids);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
uint64_t id;
memcpy(&id,ri.key,sizeof(id));
client *target = lookupClientByID(id);
/* Note that if the client is in BCAST mode, we don't want to
* send invalidation messages that were pending in the case
* previously the client was not in BCAST mode. This can happen if
* TRACKING is enabled normally, and then the client switches to
* BCAST mode. */
if (target == NULL ||
!(target->flags & CLIENT_TRACKING)||
target->flags & CLIENT_TRACKING_BCAST)
{
continue;
}

/* If the client enabled the NOLOOP mode, don't send notifications
* about keys changed by the client itself. */
if (target->flags & CLIENT_TRACKING_NOLOOP &&
target == c)
{
continue;
}

sendTrackingMessage(target,key,keylen,0);
}
raxStop(&ri);

/* Free the tracking table: we'll create the radix tree and populate it
* again if more keys will be modified in this caching slot. */
TrackingTableTotalItems -= raxSize(ids);
raxFree(ids);
raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL);
}

notifyKeyspaceEvent

函数notifyKeyspaceEvent用来触发数据库事件,这个对应了Redis中的叫“键空间通知”/“键事件通知”的特性。这个特性是通过PUBLISH机制实现的。
简单来说,对0号数据库的键mykey执行DEL key [key ...]命令时,系统将分发两条消息,相当于执行以下两个PUBLISH channel message命令。其中__keyspace系列命令称为键空间通知(key-space notification),__keyevent系列命令称为键事件通知(key-event notification)。订阅第一个PUBLISH命令,可以接收0号数据库中所有修改键mykey的事件。订阅第二个PUBLISH命令,可以接收0号数据库中所有执行del命令的键

1
2
PUBLISH __keyspace@0__:mykey del
PUBLISH __keyevent@0__:del mykey

下面看看这个函数的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
sds chan;
robj *chanobj, *eventobj;
int len = -1;
char buf[24];

/* If any modules are interested in events, notify the module system now.
* This bypasses the notifications configuration, but the module engine
* will only call event subscribers if the event type matches the types
* they are interested in. */
moduleNotifyKeyspaceEvent(type, event, key, dbid);

/* If notifications for this class of events are off, return ASAP. */
if (!(server.notify_keyspace_events & type)) return;

eventobj = createStringObject(event,strlen(event));

/* __keyspace@<db>__:<key> <event> notifications. */
if (server.notify_keyspace_events & NOTIFY_KEYSPACE) {
chan = sdsnewlen("__keyspace@",11);
len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, key->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, eventobj);
decrRefCount(chanobj);
}

/* __keyevent@<db>__:<event> <key> notifications. */
if (server.notify_keyspace_events & NOTIFY_KEYEVENT) {
chan = sdsnewlen("__keyevent@",11);
if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, eventobj->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, key);
decrRefCount(chanobj);
}
decrRefCount(eventobj);
}

内存管理

Redis内存管理zmalloc

Redis基于zmalloc系列函数进行内存分配。
zmalloc是为了解决什么问题呢?主要是为了做到异常处理和内存统计的功能。
下面看zmalloc的实现。
可以看到,它会额外分配一个PREFIX_SIZE,用来存储额外信息。zmalloc最终返回的是(char*)ptr+PREFIX_SIZE,这个有点类似于SDS的骚操作。PREFIX_SIZE的大小是由宏来定义的,并且可以通过HAVE_MALLOC_SIZE禁用内存统计的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// zmalloc.c

#ifdef HAVE_MALLOC_SIZE
#define PREFIX_SIZE (0)
#else
#if defined(__sun) || defined(__sparc) || defined(__sparc__)
#define PREFIX_SIZE (sizeof(long long))
#else
#define PREFIX_SIZE (sizeof(size_t))
#endif
#endif

void *zmalloc(size_t size) {
void *ptr = malloc(size+PREFIX_SIZE);

if (!ptr) zmalloc_oom_handler(size);
#ifdef HAVE_MALLOC_SIZE
// 如果不记录内存分配大小
update_zmalloc_stat_alloc(zmalloc_size(ptr));
return ptr;
#else
// 如果记录内存分配大小
*((size_t*)ptr) = size;
update_zmalloc_stat_alloc(size+PREFIX_SIZE);
return (char*)ptr+PREFIX_SIZE;
#endif
}

下面仔细查看一下update_zmalloc_stat_alloc函数的实现,不出所料的话,应该是通过一个原子操作来实现更新的。实际上也果不其然,atomicIncr的实现在后面会讲到。

1
2
3
4
static size_t used_memory = 0;
pthread_mutex_t used_memory_mutex = PTHREAD_MUTEX_INITIALIZER;

#define update_zmalloc_stat_alloc(__n) atomicIncr(used_memory,(__n))

还可以看到的是一个用来处理oom的函数zmalloc_oom_handler。对于C语言来说,malloc在内存分配失败后会返回一个0指针,然后我们在进行后续操作的时候要自行判断。基本上对于oom的处理就是打印一条日志然后abort了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// zmalloc.c
static void zmalloc_default_oom(size_t size) {
fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n",
size);
fflush(stderr);
abort();
}

static void (*zmalloc_oom_handler)(size_t) = zmalloc_default_oom;

// server.c
void redisOutOfMemoryHandler(size_t allocation_size) {
serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!",
allocation_size);
serverPanic("Redis aborting for OUT OF MEMORY. Allocating %zu bytes!",
allocation_size);
}

// server.h
#define serverPanic(...) _serverPanic(__FILE__,__LINE__,__VA_ARGS__),_exit(1)
// debug.c
void _serverPanic(const char *file, int line, const char *msg, ...) {
va_list ap;
va_start(ap,msg);
char fmtmsg[256];
vsnprintf(fmtmsg,sizeof(fmtmsg),msg,ap);
va_end(ap);

bugReportStart();
serverLog(LL_WARNING,"------------------------------------------------");
serverLog(LL_WARNING,"!!! Software Failure. Press left mouse button to continue");
serverLog(LL_WARNING,"Guru Meditation: %s #%s:%d",fmtmsg,file,line);

if (server.crashlog_enabled) {
#ifdef HAVE_BACKTRACE
logStackTrace(NULL, 1);
#endif
printCrashReport();
}
bugReportEnd(0, 0);
}

void bugReportStart(void) {
pthread_mutex_lock(&bug_report_start_mutex);
if (bug_report_start == 0) {
serverLogRaw(LL_WARNING|LL_RAW,
"\n\n=== REDIS BUG REPORT START: Cut & paste starting from here ===\n");
bug_report_start = 1;
}
pthread_mutex_unlock(&bug_report_start_mutex);
}

总结

总结一下本章节中比较有意思的实现:

  1. LFU算法
  2. evictPoolEntry中,key和cached的维护
  3. 诸如keyptrDictType和dbDictType这种C形式的OOP的实现
  4. Redis对OOM的管理

Reference

  1. https://my.oschina.net/lscherish/blog/4467394
    对Redis中的LRU和LFU进行了讲解。本文吸纳了其中的部分内容并进行了修订。
  2. https://juejin.cn/post/6844903454654087182