Redis主从复制

Redis Sentinel实现原理分析这篇文。Sentinel是为主从复制服务的,所以在这篇文章里面,我们反过来讲一下主从复制的实现。

主从复制涉及到RDB等机制,其中持久化部分在Redis持久化机制实现中介绍。

Redis主从复制流程简介

Redis Sentinel 是对主从复制流程而言的,所以先要理解主从复制的大概流程。这里需要注意,主从复制并不是 Redis Cluster。

  1. Slave 接受到 SLAVEOF 命令
  2. Slave 连接 Master
  3. Slave PING Master
  4. 鉴权
  5. Slave 发送 SYNC/PSYNC 命令

PSYNC命令用法

PSYNC 命令如下所示,其中:

  1. replicationid 表示我们断线重连前 Master 服务器的 id
  2. offset 表示 Slave 接受到最后命令的偏移量,以字节计算
1
2
3
4
// 旧
PSYNC runid offset
// 新
PSYNC replicationid offset

这里还有个特殊用法,表示我们要触发一次全量复制。

1
PSYNC ? -1

在 Redis 2.8后,提供了 PSYNC,这个命令能够支持全量复制和部分复制。这样在 Slave 断线重连之后,就可以部分复制,从而节省 Master 的计算资源和带宽。
在 Redis 4.0 版本后,优化了增量复制,主要包括:

  1. 重启后,也可以进行部分复制
    之前这种情况,重启后会丢失 runid,从而触发 PSYNC ? -1
  2. 当 Slave 被 promote 称为 Master 后,其他 Slave 可以从新 Master 处复制

主要流程

主要类以及常数

在服务器类中定义了 masterhost,表示 Master 节点的地址。如果是 NULL,表示自己就是 Master。所以这个字段会被用来判断是 Master 还是 Slave

1
2
3
4
5
6
7
// server.h
struct redisServer {
char *masteruser; /* AUTH with this user and masterauth with master */
char *masterauth; /* AUTH with this password with master */
char *masterhost; /* Hostname of master */
int masterport; /* Port of master */
}

介绍下repl_state的状态:

  1. REPL_STATE_NONE 0
    表示现在是SLAVEOF NO ONE的
  2. REPL_STATE_CONNECT 1
    replicationCron判断,如果处于这个状态,表示现在要去尝试连接Master了。
  3. REPL_STATE_CONNECTING 2

下面的状态是握手过程中的状态:

  1. REPL_STATE_RECEIVE_PONG 3
  2. REPL_STATE_SEND_AUTH 4
  3. REPL_STATE_RECEIVE_AUTH 5
  4. REPL_STATE_SEND_PORT 6
  5. REPL_STATE_RECEIVE_PORT 7
  6. REPL_STATE_SEND_IP 8
  7. REPL_STATE_RECEIVE_IP 9
  8. REPL_STATE_SEND_CAPA 10
  9. REPL_STATE_RECEIVE_CAPA 11
  10. REPL_STATE_SEND_PSYNC 12
  11. REPL_STATE_RECEIVE_PSYNC 13

下面状态是握手完毕的状态:

  1. REPL_STATE_TRANSFER 14
  2. REPL_STATE_CONNECTED 15

连接建立流程

  1. connectWithMaster

Full Sync流程

Partial Sync流程

代码解释 Slave部分

connectWithMaster: 建立套接口连接

connConnect系列函数,以及connection类封装了有关网络连接的库。
实际上connConnect通过anetTcpNonBlockBestEffortBindConnect尝试建立一个非阻塞的套接字,此时connect函数可能返回EINPROGRESS表示连接还在建立过程中,但我们其实可以不用等。通过aeCreateFileEvent将这个Socket描述符加入到事件循环里面,等到这个套接字可以写之后,会触发对应的回调。
等到连接建立后,回调会通过connSocketEventHandler被唤起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int connectWithMaster(void) {
server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();
if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,
NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
connGetLastError(server.repl_transfer_s));
connClose(server.repl_transfer_s);
server.repl_transfer_s = NULL;
return C_ERR;
}


server.repl_transfer_lastio = server.unixtime;
server.repl_state = REPL_STATE_CONNECTING;
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
return C_OK;
}

syncWithMaster: 握手以及准备传输RDB

在连接完毕后,connectWithMaster会回调syncWithMaster,此时状态是REPL_STATE_CONNECTING

1
2
3
4
5
void syncWithMaster(connection *conn) {
char tmpfile[256], *err = NULL;
int dfd = -1, maxtries = 5;
int psync_result;
...

检查一下,如果现在又是SLAVEOF NO ONE了,就把这个连接关掉。

1
2
3
4
5
6
7
8
...
/* If this event fired after the user turned the instance into a master
* with SLAVEOF NO ONE we must just return ASAP. */
if (server.repl_state == REPL_STATE_NONE) {
connClose(conn);
return;
}
...

因为是非阻塞的连接,所以我们要检查一下现在连接的状态。如果失败,就goto error,里面内容是重置状态,例如,server.repl_state会被重置为REPL_STATE_CONNECT

1
2
3
4
5
6
7
8
9
...
/* Check for errors in the socket: after a non blocking connect() we
* may find that the socket is in error state. */
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
connGetLastError(conn));
goto error;
}
...

下面是一个状态机的实现。我们在Sentinel中已经见过类似的了,Redis中状态机的实现就是,对于状态X,表示状态X前一个状态已经处理完了,目前正在处理状态X的工作。当状态机处理完一个状态后,在最后将状态设置为下一个要做的事情。也就是我们不用类似X_FINISHED这样的状态,因为X_FINISHED根据完成的情形不同,可能有多种状态转移
【REPL_STATE_CONNECTING】这个状态下,我们尝试发送一个同步命令PING,然后直接修改状态到REPL_STATE_RECEIVE_PONG。如果这个同步命令发送有问题,就直接goto error了,不会走到下面流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
...
/* Send a PING to check the master is able to reply without errors. */
if (server.repl_state == REPL_STATE_CONNECTING) {
serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
connSetReadHandler(conn, syncWithMaster);
connSetWriteHandler(conn, NULL);
server.repl_state = REPL_STATE_RECEIVE_PONG;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL);
if (err) goto write_error;
return;
}
...

【REPL_STATE_RECEIVE_PONG】我们只要收到对PING的回复,就进入了REPL_STATE_RECEIVE_PONG状态,但这个回复未必是PONG,也可能是一个AUTH错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
...
/* Receive the PONG command. */
if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);

/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
if (err[0] != '+' &&
strncmp(err,"-NOAUTH",7) != 0 &&
strncmp(err,"-NOPERM",7) != 0 &&
strncmp(err,"-ERR operation not permitted",28) != 0)
{
serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
sdsfree(err);
goto error;
} else {
serverLog(LL_NOTICE,
"Master replied to PING, replication can continue...");
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_AUTH;
}
...

【REPL_STATE_SEND_AUTH】如果需要AUTH认证,我们就发送AUTH,进入REPL_STATE_RECEIVE_AUTH。否则直接进入REPL_STATE_SEND_PORT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
/* AUTH with the master if required. */
if (server.repl_state == REPL_STATE_SEND_AUTH) {
if (server.masteruser && server.masterauth) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",
server.masteruser,server.masterauth,NULL);
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
} else if (server.masterauth) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",server.masterauth,NULL);
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
} else {
server.repl_state = REPL_STATE_SEND_PORT;
}
}
...

【REPL_STATE_RECEIVE_AUTH】如果验证通过,就进入REPL_STATE_SEND_PORT

1
2
3
4
5
6
7
8
9
10
11
12
13
...
/* Receive AUTH reply. */
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
if (err[0] == '-') {
serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PORT;
}
...

【REPL_STATE_SEND_PORT】下面一步,我们需要发送我们当前的端口,进入REPL_STATE_RECEIVE_PORT状态。
在发送完之后,我们在主节点执行INFO replication,会在其中显示我们反馈的port。
【Q】slave_announce_portport的区别是什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
if (server.repl_state == REPL_STATE_SEND_PORT) {
int port;
if (server.slave_announce_port) port = server.slave_announce_port;
else if (server.tls_replication && server.tls_port) port = server.tls_port;
else port = server.port;
sds portstr = sdsfromlonglong(port);
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
"listening-port",portstr, NULL);
sdsfree(portstr);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_PORT;
return;
}
...

【REPL_STATE_RECEIVE_PORT】接下来,我们用类似的办法发送IP,这里注意,如果没有指定slave_announce_ip就直接跳转到REPL_STATE_SEND_CAPA,否则跳转到REPL_STATE_SEND_IP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
...
/* Receive REPLCONF listening-port reply. */
if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF listening-port: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_IP;
}

/* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
if (server.repl_state == REPL_STATE_SEND_IP &&
server.slave_announce_ip == NULL)
{
server.repl_state = REPL_STATE_SEND_CAPA;
}

/* Set the slave ip, so that Master's INFO command can list the
* slave IP address port correctly in case of port forwarding or NAT. */
if (server.repl_state == REPL_STATE_SEND_IP) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
"ip-address",server.slave_announce_ip, NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_IP;
return;
}

/* Receive REPLCONF ip-address reply. */
if (server.repl_state == REPL_STATE_RECEIVE_IP) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF ip-address: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_CAPA;
}
...

【REPL_STATE_SEND_CAPA】发送IP的过程很类似,我们就不说了。下面这一对状态是REPL_STATE_SEND_CAPA,用来发送Slave的容量。这一对状态结束之后,进入REPL_STATE_SEND_PSYNC状态。
【Q】这个容量指的是什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
...
/* Inform the master of our (slave) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The master will ignore capabilities it does not understand. */
if (server.repl_state == REPL_STATE_SEND_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
"capa","eof","capa","psync2",NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_CAPA;
return;
}

/* Receive CAPA reply. */
if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF capa. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF capa: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PSYNC;
}
...

【REPL_STATE_SEND_PSYNC】下面,我们尝试PSYNC。主要就是调用若干次slaveTryPartialResynchronization:第一次传0进去,让它发PSYNC指令,并且设置状态为REPL_STATE_RECEIVE_PSYNC;后面就不断地传1进去,查询结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
...
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC;
return;
}

/* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */
if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {
serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
"state should be RECEIVE_PSYNC but is %d",
server.repl_state);
goto error;
}

psync_result = slaveTryPartialResynchronization(conn,1);
...

下面查看返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
...
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */

/* If the master is in an transient error, we should try to PSYNC
* from scratch later, so go to the error path. This happens when
* the server is loading the dataset or is not connected with its
* master and so forth. */
if (psync_result == PSYNC_TRY_LATER) goto error;

/* Note: if PSYNC does not return WAIT_REPLY, it will take care of
* uninstalling the read handler from the file descriptor. */

if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections.\n");
redisCommunicateSystemd("READY=1\n");
}
return;
}

/* PSYNC failed or is not supported: we want our slaves to resync with us
* as well, if we have any sub-slaves. The master may transfer us an
* entirely different data set and we have no way to incrementally feed
* our slaves after that. */
disconnectSlaves(); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
...

如果PSYNC能支持,我们前面就返回了,下面对于不支持的情况,我们就得用老的SYNC方法。在开始传输后,进入REPL_STATE_TRANSFER状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
...
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.master_replid and master_initial_offset are
* already populated. */
if (psync_result == PSYNC_NOT_SUPPORTED) {
serverLog(LL_NOTICE,"Retrying with SYNC...");
if (connSyncWrite(conn,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}
...

如果不支持无盘加载,那么就要在磁盘上创建一个临时文件。
查看函数useDisklessLoad,无盘加载需要满足:

  1. repl_diskless_load配置
  2. 所有的模块都能处理读错误
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
/* Prepare a suitable temp file for bulk transfer */
if (!useDisklessLoad()) {
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
goto error;
}
server.repl_transfer_tmpfile = zstrdup(tmpfile);
server.repl_transfer_fd = dfd;
}
...

下面非阻塞地进行SYNC,设置读取SYNC数据的回调readSyncBulkPayload,如果成功就切换状态为REPL_STATE_TRANSFER
这里,我们设置了repl_transfer_size为1,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
/* Setup the non blocking download of the bulk file. */
if (connSetReadHandler(conn, readSyncBulkPayload)
== C_ERR)
{
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (%s)",
strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));
goto error;
}

server.repl_state = REPL_STATE_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_lastio = server.unixtime;
return;
...

下面是错误处理,需要将状态重置为等待连接的REPL_STATE_CONNECT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
error:
if (dfd != -1) close(dfd);
connClose(conn);
server.repl_transfer_s = NULL;
if (server.repl_transfer_fd != -1)
close(server.repl_transfer_fd);
if (server.repl_transfer_tmpfile)
zfree(server.repl_transfer_tmpfile);
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
server.repl_state = REPL_STATE_CONNECT;
return;

write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */
serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
sdsfree(err);
goto error;
}

slaveTryPartialResynchronization: PSYNC分支

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

/* Try a partial resynchronization with the master if we are about to reconnect.
* If there is no cached master structure, at least try to issue a
* "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
* command in order to obtain the master run id and the master replication
* global offset.
*
* This function is designed to be called from syncWithMaster(), so the
* following assumptions are made:
*
* 1) We pass the function an already connected socket "fd".
* 2) This function does not close the file descriptor "fd". However in case
* of successful partial resynchronization, the function will reuse
* 'fd' as file descriptor of the server.master client structure.
*
* The function is split in two halves: if read_reply is 0, the function
* writes the PSYNC command on the socket, and a new function call is
* needed, with read_reply set to 1, in order to read the reply of the
* command. This is useful in order to support non blocking operations, so
* that we write, return into the event loop, and read when there are data.
*
* When read_reply is 0 the function returns PSYNC_WRITE_ERR if there
* was a write error, or PSYNC_WAIT_REPLY to signal we need another call
* with read_reply set to 1. However even when read_reply is set to 1
* the function may return PSYNC_WAIT_REPLY again to signal there were
* insufficient data to read to complete its work. We should re-enter
* into the event loop and wait in such a case.
*
* The function returns:
*
* PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue.
* PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
* In this case the master run_id and global replication
* offset is saved.
* PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
* the caller should fall back to SYNC.
* PSYNC_WRITE_ERROR: There was an error writing the command to the socket.
* PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1.
* PSYNC_TRY_LATER: Master is currently in a transient error condition.
*
* Notable side effects:
*
* 1) As a side effect of the function call the function removes the readable
* event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
* 2) server.master_initial_offset is set to the right value according
* to the master reply. This will be used to populate the 'server.master'
* structure replication offset.
*/
  1. PSYNC_WRITE_ERROR 0
    套接口不可写。
  2. PSYNC_WAIT_REPLY 1
    需要read_erply设置为1,并调用函数。
  3. PSYNC_CONTINUE 2
  4. PSYNC_FULLRESYNC 3
    表示虽然支持PSYNC,但现在仍然需要一次Full SYNC。在这情况下,我们需要保存Master的runid和offset。
  5. PSYNC_NOT_SUPPORTED 4
    不支持PSYNC。
  6. PSYNC_TRY_LATER 5
    暂时连不上Master,要重试。
1
2
3
4
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
char *psync_replid;
char psync_offset[32];
sds reply;

首先,是写部分。这里的写,指的是往连接里面发送PSYNC指令:

  1. 如果我们缓存了server.masterserver.cached_master
    通常是在replicationCacheMaster中设置的
  2. 如果是第一次连
    发送
    1
    PSYNC ? -1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    /* Writing half */
if (!read_reply) {
/* Initially set master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.master_initial_offset = -1;

if (server.cached_master) {
psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_replid = "?";
memcpy(psync_offset,"-1",3);
}

/* Issue the PSYNC command */
reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
connSetReadHandler(conn, NULL);
return PSYNC_WRITE_ERROR;
}
return PSYNC_WAIT_REPLY;
}
...

读出Master的回复,如果是空,我们就返回继续等待PSYNC_WAIT_REPLY

1
2
3
4
5
6
7
8
9
10
11
12
...
/* Reading half */
reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
if (sdslen(reply) == 0) {
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
sdsfree(reply);
return PSYNC_WAIT_REPLY;
}

connSetReadHandler(conn, NULL);
...

如果回复是+FULLRESYNC,表示需要一次Full SYNC。

1
2
3
4
5
6
7
8
9
10
11
12
13
...
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *replid = NULL, *offset = NULL;

/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
replid = strchr(reply,' ');
if (replid) {
replid++;
offset = strchr(replid,' ');
if (offset) offset++;
}
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
...
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */
memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
server.master_replid,
server.master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
...

否则,我们可以部分同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
if (!strncmp(reply,"+CONTINUE",9)) {
/* Partial resync was accepted. */
serverLog(LL_NOTICE,
"Successful partial resynchronization with master.");

/* Check the new replication ID advertised by the master. If it
* changed, we need to set the new ID as primary ID, and set or
* secondary ID as the old master ID up to the current offset, so
* that our sub-slaves will be able to PSYNC with us after a
* disconnection. */
char *start = reply+10;
char *end = reply+9;
while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
...

这里new表示Master端传来的runid。如果和我们当前的server.replid不一样,我们要重新设置一下,并且将老的server.replid复制给server.replid2
【Q】这里涉及到三个replid,他们的区别是什么呢?

  1. server.replid
  2. server.replid2
  3. server.cached_master->replid
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
if (end-start == CONFIG_RUN_ID_SIZE) {
char new[CONFIG_RUN_ID_SIZE+1];
memcpy(new,start,CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\0';

if (strcmp(new,server.cached_master->replid)) {
/* Master ID changed. */
serverLog(LL_WARNING,"Master replication ID changed to %s",new);

/* Set the old ID as our ID2, up to the current offset+1. */
memcpy(server.replid2,server.cached_master->replid,
sizeof(server.replid2));
server.second_replid_offset = server.master_repl_offset+1;

/* Update the cached master ID and our own primary ID to the
* new one. */
memcpy(server.replid,new,sizeof(server.replid));
memcpy(server.cached_master->replid,new,sizeof(server.replid));

如果当前Slave有Sub Slave,全部断开,让他们重新走PSYNC流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
...
/* Disconnect all the sub-slaves: they need to be notified. */
disconnectSlaves();
}
}

/* Setup the replication to continue. */
sdsfree(reply);
replicationResurrectCachedMaster(conn);

/* If this instance was restarted and we read the metadata to
* PSYNC from the persistence file, our replication backlog could
* be still not initialized. Create it. */
if (server.repl_backlog == NULL) createReplicationBacklog();
return PSYNC_CONTINUE;
}

/* If we reach this point we received either an error (since the master does
* not understand PSYNC or because it is in a special state and cannot
* serve our request), or an unexpected reply from the master.
*
* Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise
* return PSYNC_TRY_LATER if we believe this is a transient error. */

if (!strncmp(reply,"-NOMASTERLINK",13) ||
!strncmp(reply,"-LOADING",8))
{
serverLog(LL_NOTICE,
"Master is currently unable to PSYNC "
"but should be in the future: %s", reply);
sdsfree(reply);
return PSYNC_TRY_LATER;
}

if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
serverLog(LL_WARNING,
"Unexpected reply to PSYNC from master: %s", reply);
} else {
serverLog(LL_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
}
...

readSyncBulkPayload: SYNC分支 接受RDB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
/* Asynchronously read the SYNC payload we receive from a master */
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(connection *conn) {
char buf[PROTO_IOBUF_LEN];
ssize_t nread, readlen, nwritten;
int use_diskless_load = useDisklessLoad();
redisDb *diskless_load_backup = NULL;
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS;
off_t left;

/* Static vars used to hold the EOF mark, and the last bytes received
* form the server: when they match, we reached the end of the transfer. */
static char eofmark[CONFIG_RUN_ID_SIZE];
static char lastbytes[CONFIG_RUN_ID_SIZE];
static int usemark = 0;

/* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
if (server.repl_transfer_size == -1) {
if (connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
goto error;
}

if (buf[0] == '-') {
serverLog(LL_WARNING,
"MASTER aborted replication with an error: %s",
buf+1);
goto error;
} else if (buf[0] == '\0') {
/* At this stage just a newline works as a PING in order to take
* the connection live. So we refresh our last interaction
* timestamp. */
server.repl_transfer_lastio = server.unixtime;
return;
} else if (buf[0] != '$') {
serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
}

/* There are two possible forms for the bulk payload. One is the
* usual $<count> bulk format. The other is used for diskless transfers
* when the master does not know beforehand the size of the file to
* transfer. In the latter case, the following format is used:
*
* $EOF:<40 bytes delimiter>
*
* At the end of the file the announced delimiter is transmitted. The
* delimiter is long and random enough that the probability of a
* collision with the actual file content can be ignored. */
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
usemark = 1;
memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
/* Set any repl_transfer_size to avoid entering this code path
* at the next call. */
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s",
use_diskless_load? "to parser":"to disk");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving %lld bytes from master %s",
(long long) server.repl_transfer_size,
use_diskless_load? "to parser":"to disk");
}
return;
}

if (!use_diskless_load) {
/* Read the data from the socket, store it to a file and search
* for the EOF. */
if (usemark) {
readlen = sizeof(buf);
} else {
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
}

nread = connRead(conn,buf,readlen);
if (nread <= 0) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
/* equivalent to EAGAIN */
return;
}
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake(1);
return;
}
server.stat_net_input_bytes += nread;

/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */
int eof_reached = 0;

if (usemark) {
/* Update the last bytes array, and check if it matches our
* delimiter. */
if (nread >= CONFIG_RUN_ID_SIZE) {
memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,
CONFIG_RUN_ID_SIZE);
} else {
int rem = CONFIG_RUN_ID_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0)
eof_reached = 1;
}

/* Update the last I/O time for the replication transfer (used in
* order to detect timeouts during replication), and write what we
* got from the socket to the dump file on disk. */
server.repl_transfer_lastio = server.unixtime;
if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
serverLog(LL_WARNING,
"Write error or short write writing to the DB dump file "
"needed for MASTER <-> REPLICA synchronization: %s",
(nwritten == -1) ? strerror(errno) : "short write");
goto error;
}
server.repl_transfer_read += nread;

/* Delete the last 40 bytes from the file if we reached EOF. */
if (usemark && eof_reached) {
if (ftruncate(server.repl_transfer_fd,
server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
{
serverLog(LL_WARNING,
"Error truncating the RDB file received from the master "
"for SYNC: %s", strerror(errno));
goto error;
}
}

/* Sync data on disk from time to time, otherwise at the end of the
* transfer we may suffer a big delay as the memory buffers are copied
* into the actual disk. */
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
off_t sync_size = server.repl_transfer_read -
server.repl_transfer_last_fsync_off;
rdb_fsync_range(server.repl_transfer_fd,
server.repl_transfer_last_fsync_off, sync_size);
server.repl_transfer_last_fsync_off += sync_size;
}

/* Check if the transfer is now complete */
if (!usemark) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}

/* If the transfer is yet not complete, we need to read more, so
* return ASAP and wait for the handler to be called again. */
if (!eof_reached) return;
}

/* We reach this point in one of the following cases:
*
* 1. The replica is using diskless replication, that is, it reads data
* directly from the socket to the Redis memory, without using
* a temporary RDB file on disk. In that case we just block and
* read everything from the socket.
*
* 2. Or when we are done reading from the socket to the RDB file, in
* such case we want just to read the RDB file in memory. */
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");

/* We need to stop any AOF rewriting child before flusing and parsing
* the RDB, otherwise we'll create a copy-on-write disaster. */
if (server.aof_state != AOF_OFF) stopAppendOnly();

/* When diskless RDB loading is used by replicas, it may be configured
* in order to save the current DB instead of throwing it away,
* so that we can restore it in case of failed transfer. */
if (use_diskless_load &&
server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
{
/* Create a backup of server.db[] and initialize to empty
* dictionaries */
diskless_load_backup = disklessLoadMakeBackups();
}
/* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB
* (Where disklessLoadMakeBackups left server.db empty) because we
* want to execute all the auxiliary logic of emptyDb (Namely,
* fire module events) */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);

/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
connSetReadHandler(conn, NULL);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (use_diskless_load) {
rio rdb;
rioInitWithConn(&rdb,conn,server.repl_transfer_size);

/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the RDB is received. */
connBlock(conn);
connRecvTimeout(conn, server.repl_timeout*1000);
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION);

if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) {
/* RDB loading failed. */
stopLoading(0);
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization DB "
"from socket");
cancelReplicationHandshake(1);
rioFreeConn(&rdb, NULL);
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Restore the backed up databases. */
disklessLoadRestoreBackups(diskless_load_backup,1,
empty_db_flags);
} else {
/* Remove the half-loaded data in case we started with
* an empty replica. */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}

/* Note that there's no point in restarting the AOF on SYNC
* failure, it'll be restarted when sync succeeds or the replica
* gets promoted. */
return;
}
stopLoading(1);

/* RDB loading succeeded if we reach this point. */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Delete the backup databases we created before starting to load
* the new RDB. Now the RDB was loaded with success so the old
* data is useless. */
disklessLoadRestoreBackups(diskless_load_backup,0,empty_db_flags);
}

/* Verify the end mark is correct. */
if (usemark) {
if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) ||
memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0)
{
serverLog(LL_WARNING,"Replication stream EOF marker is broken");
cancelReplicationHandshake(1);
rioFreeConn(&rdb, NULL);
return;
}
}

/* Cleanup and restore the socket to the original state to continue
* with the normal replication. */
rioFreeConn(&rdb, NULL);
connNonBlock(conn);
connRecvTimeout(conn,0);
} else {
/* Ensure background save doesn't overwrite synced data */
if (server.rdb_child_pid != -1) {
serverLog(LL_NOTICE,
"Replica is about to load the RDB file received from the "
"master, but there is a pending RDB child running. "
"Killing process %ld and removing its temp file to avoid "
"any race",
(long) server.rdb_child_pid);
killRDBChild();
}

/* Rename rdb like renaming rewrite aof asynchronously. */
int old_rdb_fd = open(server.rdb_filename,O_RDONLY|O_NONBLOCK);
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
serverLog(LL_WARNING,
"Failed trying to rename the temp DB into %s in "
"MASTER <-> REPLICA synchronization: %s",
server.rdb_filename, strerror(errno));
cancelReplicationHandshake(1);
if (old_rdb_fd != -1) close(old_rdb_fd);
return;
}
/* Close old rdb asynchronously. */
if (old_rdb_fd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)old_rdb_fd,NULL,NULL);

if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization "
"DB from disk");
cancelReplicationHandshake(1);
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
serverLog(LL_NOTICE,"Removing the RDB file obtained from "
"the master. This replica has persistence "
"disabled");
bg_unlink(server.rdb_filename);
}
/* Note that there's no point in restarting the AOF on sync failure,
it'll be restarted when sync succeeds or replica promoted. */
return;
}

/* Cleanup. */
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
serverLog(LL_NOTICE,"Removing the RDB file obtained from "
"the master. This replica has persistence "
"disabled");
bg_unlink(server.rdb_filename);
}

zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
server.repl_transfer_fd = -1;
server.repl_transfer_tmpfile = NULL;
}

/* Final setup of the connected slave <- master link */
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;

/* Fire the master link modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_UP,
NULL);

/* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
memcpy(server.replid,server.master->replid,sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();

/* Let's create the replication backlog if needed. Slaves need to
* accumulate the backlog regardless of the fact they have sub-slaves
* or not, in order to behave correctly if they are promoted to
* masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");

if (server.supervised_mode == SUPERVISED_SYSTEMD) {
redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections.\n");
redisCommunicateSystemd("READY=1\n");
}

/* Send the initial ACK immediately to put this replica in online state. */
if (usemark) replicationSendAck();

/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
if (server.aof_enabled) restartAOFAfterSYNC();
return;

error:
cancelReplicationHandshake(1);
return;
}

主事件循环

replicationCron

主要代码位于replication.c中。
主函数replicationCronserverCron触发,每隔一秒钟触发一次。

1
run_with_period(1000) replicationCron();

下面查看主函数。

1
2
3
4
// replication.c
void replicationCron(void) {
static long long replication_cron_loops = 0;
...

首先,下面是几个超时判断:

  1. 建立连接过程中超时
  2. 传输过程中超时
  3. 心跳/数据超时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
...
/* Non blocking connection timeout? */
if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState()) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
cancelReplicationHandshake(1);
}

/* Bulk transfer I/O timeout? */
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
cancelReplicationHandshake(1);
}

/* Timed out master when we are an already connected slave? */
if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
freeClient(server.master);
}
...

判断是否需要连接Master。
connectWithMaster这个函数会将状态设置为REPL_STATE_CONNECTING,并设置回调syncWithMaster

1
2
3
4
5
6
7
8
9
...

/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
connectWithMaster();
}
...

如果Master支持PSYNC,就定期发送ACK。
这个ACK的作用是发送一个REPLCONF ACK命令给Master,从而通知自己当前的复制偏移。

1
2
3
4
5
6
7
8
...
/* Send ACK to master from time to time.
* Note that we do not send periodic acks to masters that don't
* support PSYNC and replication offsets. */
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
replicationSendAck();
...

下面,我们对所有Slave发送PING。根据注释,如果我们连接了Slave(是不是说明当前节点是Master?),就按时PING它们。这样Slave们能够维护到Master的显式的超时时间,从而在TCP连接并没有真正丢失的时候,检查一个断线的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
...
/* If we have attached slaves, PING them from time to time.
* So slaves can implement an explicit timeout to masters, and will
* be able to detect a link disconnection even if the TCP connection
* will not actually go down. */
listIter li;
listNode *ln;
robj *ping_argv[1];

/* First, send PING according to ping_slave_period. */
if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
listLength(server.slaves))
{
/* Note that we don't send the PING if the clients are paused during
* a Redis Cluster manual failover: the PING we send will otherwise
* alter the replication offsets of master and slave, and will no longer
* match the one stored into 'mf_master_offset' state. */
int manual_failover_in_progress =
server.cluster_enabled &&
server.cluster->mf_end &&
clientsArePaused();

if (!manual_failover_in_progress) {
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb,
ping_argv, 1);
decrRefCount(ping_argv[0]);
}
}
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
...
/* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file.
*
* Also send the a newline to all the chained slaves we have, if we lost
* connection from our master, to keep the slaves aware that their
* master is online. This is needed since sub-slaves only receive proxied
* data from top-level masters, so there is no explicit pinging in order
* to avoid altering the replication offsets. This special out of band
* pings (newlines) can be sent, they will have no effect in the offset.
*
* The newline will be ignored by the slave but will refresh the
* last interaction timer preventing a timeout. In this case we ignore the
* ping period and refresh the connection once per second since certain
* timeouts are set at a few seconds (example: PSYNC response). */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

int is_presync =
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));

if (is_presync) {
connWrite(slave->conn, "\n", 1);
}
}
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
...
/* Disconnect timedout slaves. */
if (listLength(server.slaves)) {
listIter li;
listNode *ln;

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate != SLAVE_STATE_ONLINE) continue;
if (slave->flags & CLIENT_PRE_PSYNC) continue;
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
{
serverLog(LL_WARNING, "Disconnecting timedout replica: %s",
replicationGetSlaveName(slave));
freeClient(slave);
}
}
}

/* If this is a master without attached slaves and there is a replication
* backlog active, in order to reclaim memory we can free it after some
* (configured) time. Note that this cannot be done for slaves: slaves
* without sub-slaves attached should still accumulate data into the
* backlog, in order to reply to PSYNC queries if they are turned into
* masters after a failover. */
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
server.repl_backlog && server.masterhost == NULL)
{
time_t idle = server.unixtime - server.repl_no_slaves_since;

if (idle > server.repl_backlog_time_limit) {
/* When we free the backlog, we always use a new
* replication ID and clear the ID2. This is needed
* because when there is no backlog, the master_repl_offset
* is not updated, but we would still retain our replication
* ID, leading to the following problem:
*
* 1. We are a master instance.
* 2. Our slave is promoted to master. It's repl-id-2 will
* be the same as our repl-id.
* 3. We, yet as master, receive some updates, that will not
* increment the master_repl_offset.
* 4. Later we are turned into a slave, connect to the new
* master that will accept our PSYNC request by second
* replication ID, but there will be data inconsistency
* because we received writes. */
changeReplicationId();
clearReplicationId2();
freeReplicationBacklog();
serverLog(LL_NOTICE,
"Replication backlog freed after %d seconds "
"without connected replicas.",
(int) server.repl_backlog_time_limit);
}
}

/* If AOF is disabled and we no longer have attached slaves, we can
* free our Replication Script Cache as there is no need to propagate
* EVALSHA at all. */
if (listLength(server.slaves) == 0 &&
server.aof_state == AOF_OFF &&
listLength(server.repl_scriptcache_fifo) != 0)
{
replicationScriptCacheFlush();
}

replicationStartPendingFork();

/* Remove the RDB file used for replication if Redis is not running
* with any persistence. */
removeRDBUsedToSyncReplicas();

/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
refreshGoodSlavesCount();
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
}

如果SLAVEOF自己会怎么样?

Reference

  1. http://cbsheng.github.io/posts/redis%E5%AE%9A%E6%97%B6%E4%BB%BB%E5%8A%A1servercron/
  2. https://www.cnblogs.com/kismetv/p/9236731.html
  3. https://youjiali1995.github.io/redis/replication/
  4. https://wenfh2020.com/2020/05/31/redis-replication-next/
    有注释
  5. https://redis.io/commands/psync
  6. https://zhuanlan.zhihu.com/p/44105707
  7. https://zhuanlan.zhihu.com/p/86617437
    讲解sub slave