Redis持久化机制实现

Redis持久化机制包括AOF和RDB两种:

  1. RDB保存二进制形式的数据库快照。
  2. AOF以协议文本的方式,记录数据库写入的指令。

本文详细介绍这两种方式的实现,以及涉及到主从复制的情况。由于持久化涉及Redis文件系统RIO,所以也会对RIO进行介绍。
作为Redis源码分析的系列文章,本文使用的版本和Redis底层对象实现原理分析Redis Sentinel实现原理分析等文章是相同的。

RIO

rioInitWithFileFILE创建一个rio对象。

1
2
3
4
5
6
void rioInitWithFile(rio *r, FILE *fp) {
*r = rioFileIO;
r->io.file.fp = fp;
r->io.file.buffered = 0;
r->io.file.autosync = 0;
}

解释一下剩下来的两个参数:

  1. autosync
    表示在写入autosync个字节之后,就进行fsync
    可以通过rioSetAutoSync函数进行设置。

bio

Redis将耗时的io操作放到后台的线程来执行。因此叫做background io。

创建一个io任务

可以将下列的任务给bio做

  1. BIO_CLOSE_FILE
    等于延迟了的close(2)
  2. BIO_AOF_FSYNC
    等于延迟了的AOF fsync
  3. BIO_LAZY_FREE
    等于延迟了的内存释放
    对于每一种类型,维护一个任务队列bio_jobs[type],一个互斥量bio_jobs[type]和一个条件变量bio_newjob_cond[type])
    创建io任务很简单,首先获得对应任务类型的锁,然后将任务job加到对应列表bio_jobs[type]的尾部,然后通知条件变量。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    pthread_cond_signal(&bio_newjob_cond[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
    }

后台处理

bioInit里面可以看到,这个void *arg,实际上传入的是int类型的type。Redis会为每一种任务创建一个线程专门来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
unsigned long type = (unsigned long) arg;
sigset_t sigset;

/* Check that the type is within the right interval. */
if (type >= BIO_NUM_OPS) {
serverLog(LL_WARNING,
"Warning: bio thread started with wrong type %lu",type);
return NULL;
}

switch (type) {
case BIO_CLOSE_FILE:
redis_set_thread_title("bio_close_file");
break;
case BIO_AOF_FSYNC:
redis_set_thread_title("bio_aof_fsync");
break;
case BIO_LAZY_FREE:
redis_set_thread_title("bio_lazy_free");
break;
}

这个函数接受一个字符串,类似”0,2,3”, “0,2-3”, “0-20:2”这样。表示设置对某些CPU的亲和性。
此外,还需要让线程可以异步终止。

1
2
3
4
5
6
redisSetCpuAffinity(server.bio_cpulist);

/* Make the thread killable at any time, so that bioKillThreads()
* can work reliably. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

下面是处理信号机制,在这里面需要对bio_mutex[type]加锁的。

1
2
3
4
5
6
7
8
pthread_mutex_lock(&bio_mutex[type]);
/* Block SIGALRM so we are sure that only the main thread will
* receive the watchdog signal. */
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
serverLog(LL_WARNING,
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

下面的循环是一个经典的生产者消费者模型,我们这个函数是消费者。因此,如果我们检查到自己的队列是空的,那么就在条件变量bio_newjob_cond[type]上面等待,我们还需要同时传入bio_mutex[type],因为条件变量的实现需要对这个mutex加锁或者解锁。如果说队列不是空的,就读取队头,但是不实际pop,并且解锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
while(1) {
listNode *ln;

/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
continue;
}
/* Pop the job from the queue. */
ln = listFirst(bio_jobs[type]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
pthread_mutex_unlock(&bio_mutex[type]);

下面就是根据任务类型,去做相应的工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* Process the job accordingly to its type. */
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == BIO_AOF_FSYNC) {
redis_fsync((long)job->arg1);
} else if (type == BIO_LAZY_FREE) {
/* What we free changes depending on what arguments are set:
* arg1 -> free the object at pointer.
* arg2 & arg3 -> free two dictionaries (a Redis DB).
* only arg3 -> free the skiplist. */
if (job->arg1)
lazyfreeFreeObjectFromBioThread(job->arg1);
else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);

等我们处理完了,再把对应的节点pop出来。

1
2
3
4
5
6
7
8
9
10
        /* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;

/* Unblock threads blocked on bioWaitStepOfType() if any. */
pthread_cond_broadcast(&bio_step_cond[type]);
}
}

RDB

RDB机制的调用链(从下到上)如下所示:

  1. startSaving
    1. rdbSave
      1. flushAllDataAndResetRDB
        1. flushallCommand
          FLUSHALL指令
      2. saveCommand
        SAVE指令
      3. rdbSaveBackground
        1. bgsaveCommand
          BGSAVE指令

rdbSave

观察函数签名,将一个结构rsi存到文件filename里面

1
int rdbSave(char *filename, rdbSaveInfo *rsi)

首先是尝试创建临时的rdb文件,这里先创建临时文件,可能是为了防止RDB过程执行到一半宕掉了,导致写的RDB文件不全或者有问题。这样等到确定成功再改名会好一点?

1
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());

如果文件创建失败,会产生错误日志

1
2
3
4
5
6
serverLog(LL_WARNING,
"Failed opening the RDB file %s (in server root dir %s) "
"for saving: %s",
filename,
cwdp ? cwdp : "unknown",
strerror(errno));

下面就是真正的dump过程。首先创建一个rio对象rdb,并且调用函数startSaving

1
2
rioInitWithFile(&rdb,fp);
startSaving(RDBFLAGS_NONE);

这个函数根据传入的rdbflags,向Redis发送事件。有关事件模块的内容,我们不在这里进行论述。需要注意,函数中额外检查了pid,从而确定是同步RDB还是异步RDB。

1
2
3
4
5
6
7
8
9
10
11
void startSaving(int rdbflags) {
/* Fire the persistence modules end event. */
int subevent;
if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START;
else if (getpid()!=server.pid)
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START;
else
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START;
moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL);
}

对应的rdbflags有下面的取值

  1. RDBFLAGS_NONE
    rdbSave中调用
  2. RDBFLAGS_AOF_PREAMBLE
    是否用于AOF机制
  3. RDBFLAGS_REPLICATION
    是否用于主从复制
  4. RDBFLAGS_ALLOW_DUP
    这是一个选项

如果开启了rdb_save_incremental_fsync增量写盘,就设置一下rio的autosync字段,REDIS_AUTOSYNC_BYTES默认是32MB。容易看出,写32MB才刷盘,如果此时系统宕机,Redis的持久性是得不到保障的,这个在我们对InnoDB的介绍中也出现过。

1
2
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);

下面是核心逻辑rdbSaveRio,我们在后面专门讨论

1
2
3
4
if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}

下面执行fflush,将C库缓冲区写到内核缓冲区,再调用fsync强制落盘。由于RDB类似于写checkpoint而不是写日志,所以这边写完直接刷盘,不需要统计autosync。

1
2
3
4
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;

下面调用rename转换成正式的名字,调用stopSaving(1)发送成功事件。如果rename失败,就发送失败事件,并且调用unlink删除临时文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    /* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(...);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}

serverLog(LL_NOTICE,"DB saved on disk");
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
stopSaving(1);
return C_OK;

werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
stopSaving(0);
return C_ERR;

rdbSaveRio

首先看一下dump.rdb的内容,他通常位于redis的安装目录下。

1
REDIS0006þ^@^@^AcÀ^B^@^AbÀ^A^@^AaÀ^@ÿ<92>?6Äx^B±Ä

照例查看函数声明。

1
2
3
4
5
6
7
8
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {

dictIterator *di = NULL;
dictEntry *de;
char magic[10];
int j;
uint64_t cksum;
size_t processed = 0;

首先写入magic,和全局以及所有模块的辅助信息

1
2
3
4
5
6
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;

下面对于每一个数据库j,进行dump写入

1
2
3
4
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;

这里获得一个安全迭代器,也就是说在这个迭代器存在的时候是停止Rehash的。

1
2
3
di = dictGetSafeIterator(d);

/* Write the SELECT DB opcode */

写入RDB_OPCODE_SELECTDB这个op,并保存一些元数据:

  1. 当前db的编号
  2. 当前db的size
  3. 当前db的expires链表的size

这些元数据会通过提前写入的RDB_OPCODE_进行区分。

1
2
3
4
5
6
7
8
9
10
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(rdb,j) == -1) goto werr;

/* Write the RESIZE DB opcode. */
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

下面,遍历迭代器,以存储实际的数据。

1
2
3
4
5
6
7
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;

initStaticStringObject(key,keystr);

函数getExpire是用来获取key的过期时间的,我们需要同时将过期时间也写到RDB里面。而过期时间是单独存放在db->expires里面的,所以这里需要额外取出来,再存进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
        expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;

/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}

下面这些代码不太清楚是什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* If we are storing the replication information on disk, persist
* the script cache as well: on successful PSYNC after a restart, we need
* to be able to process any EVALSHA inside the replication backlog the
* master will send us. */
if (rsi && dictSize(server.lua_scripts)) {
di = dictGetIterator(server.lua_scripts);
while((de = dictNext(di)) != NULL) {
robj *body = dictGetVal(de);
if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
goto werr;
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}

if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;

最后,存入一个EOF和CRC64校验码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    /* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return C_OK;

werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return C_ERR;
}

flushAllDataAndResetRDB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void flushAllDataAndResetRDB(int flags) {
server.dirty += emptyDb(-1,flags,NULL);
if (server.rdb_child_pid != -1) killRDBChild();
if (server.saveparamslen > 0) {
/* Normally rdbSave() will reset dirty, but we don't want this here
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
int saved_dirty = server.dirty;
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSave(server.rdb_filename,rsiptr);
server.dirty = saved_dirty;
}
server.dirty++;
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchroneus. */
if (!(flags & EMPTYDB_ASYNC))
jemalloc_purge();
#endif
}

AOF

AOF机制的调用链(从下到上)如下所示:

  1. feedAppendOnlyFile
    1. propagate
      通常在各个模块中被带有PROPAGATE_AOF|PROPAGATE_REPL参数地调用

在key过期时,propagateExpire会被调用,从而发送过期消息给AOF

feedAppendOnlyFile

首先看参数,dictid实际上表示当前redis数据库的id

1
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {

下面的这些cat...Command方法,实际上都是根据操作去重新组装回命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
sds buf = sdsempty();
robj *tmpargv[3];

/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
if (dictid != server.aof_selected_db) {
char seldb[64];

snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}

if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
...
} else if (cmd->proc == setCommand && argc > 3) {
int i;
robj *exarg = NULL, *pxarg = NULL;
for (i = 3; i < argc; i ++) {
if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
}
serverAssert(!(exarg && pxarg));

if (exarg || pxarg) {
/* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
buf = catAppendOnlyGenericCommand(buf,3,argv);
if (exarg)
buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
exarg);
if (pxarg)
buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
pxarg);
} else {
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
} else {
/* All the other commands don't need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}

函数sdscatlenbuf追加到server.aof_buf末尾,类似于concat,但这个取名有点迷惑,让人觉得是category的简写。

1
2
3
4
5
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

1
2
3
4
5
6
7
8
9
    /* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

sdsfree(buf);
}

aofRewriteBufferAppend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;

while(len) {
/* If we already got at least an allocated block, try appending
* at least some piece into it. */
if (block) {
unsigned long thislen = (block->free < len) ? block->free : len;
if (thislen) { /* The current block is not already full. */
memcpy(block->buf+block->used, s, thislen);
block->used += thislen;
block->free -= thislen;
s += thislen;
len -= thislen;
}
}

if (len) { /* First block to allocate, or need another block. */
int numblocks;

block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
listAddNodeTail(server.aof_rewrite_buf_blocks,block);

/* Log every time we cross more 10 or 100 blocks, respectively
* as a notice or warning. */
numblocks = listLength(server.aof_rewrite_buf_blocks);
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
LL_NOTICE;
serverLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
}

/* Install a file event to send data to the rewrite child if there is
* not one already. */
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}

catAppendOnlyExpireAtCommand

我们抽取一个cat...Command进行分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {
long long when;
robj *argv[3];

/* Make sure we can use strtoll */
seconds = getDecodedObject(seconds);
when = strtoll(seconds->ptr,NULL,10);
/* Convert argument into milliseconds for EXPIRE, SETEX, EXPIREAT */
if (cmd->proc == expireCommand || cmd->proc == setexCommand ||
cmd->proc == expireatCommand)
{
when *= 1000;
}
/* Convert into absolute time for EXPIRE, PEXPIRE, SETEX, PSETEX */
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == setexCommand || cmd->proc == psetexCommand)
{
when += mstime();
}

Redis中的引用计数规则,让人觉得有点难懂,原因是有的对象是由被调用者而不是调用者释放的,但在这里的代码基本都是由调用者释放(调用decrRefCount)的。

1
2
3
4
5
6
7
8
9
    decrRefCount(seconds);
argv[0] = createStringObject("PEXPIREAT",9);
argv[1] = key;
argv[2] = createStringObjectFromLongLong(when);
buf = catAppendOnlyGenericCommand(buf, 3, argv);
decrRefCount(argv[0]);
decrRefCount(argv[2]);
return buf;
}

Reference