Redis Sentinel实现原理分析

Sentinel(哨兵)监控Redis集群中Master状态,是Redis 的高可用性解决方案。它监视一系列Master-Slave集群(和其他Sentinel)。当某个Master下线时,自动将该Master下的某个Slave升级为Master。

本文中【接上】暗示这个逻辑其实不属于父标题,但是和上面的标题成时序关系,例如上一个标题是Request的产生逻辑,下一个就是对端对Request的处理逻辑,下一个就是对对端Response的处理逻辑。

总览

Sentinel用法与常见命令

表示监控一个叫mymaster的服务器,其IP:Port为127.0.0.1:6379,并且这个Master要Failover至少需要2个。但注意这里的2并不是下确界,除了这个约束,还需要有满足majority的限制条件。

1
sentinel monitor mymaster 127.0.0.1 6379 2

down-after-milliseconds表示经过这一段时间,如果服务器没有对Sentinel的PING进行回复,或者一直返回无效回复(注意,回复错误也是有效回复),那么就会被标记为SDOWN。

1
sentinel down-after-milliseconds mymaster 60000

failover-timeout表示故障恢复的超时时间。

1
sentinel failover-timeout mymaster 180000

parallel-syncs表示在执行故障转移时,最多可以有多少Slave同时对新Master进行同步,数字越小,完成故障转移所需的时间就越长。

1
sentinel parallel-syncs mymaster 1

常用结构/模块

sentinelRedisInstance

每个被监视的Redis实例都会创建一个sentinelRedisInstance结构,它具有flag分别标注了实例的状态。
可以看到

1
2
3
#define SRI_MASTER  (1<<0)
#define SRI_SLAVE (1<<1)
#define SRI_SENTINEL (1<<2)

SDOWN表示主观下线,即被一个Sentinel认为下线。ODOWN是客观下线,表示被Sentinel集群认为下线。

1
2
3
4
5
6
7
8
9
10
11
12
#define SRI_S_DOWN (1<<3)   /* Subjectively down (no quorum). */
#define SRI_O_DOWN (1<<4) /* Objectively down (confirmed by others). */
#define SRI_MASTER_DOWN (1<<5) /* A Sentinel with this flag set thinks that
its master is down. */
#define SRI_FAILOVER_IN_PROGRESS (1<<6) /* Failover is in progress for
this master. */
#define SRI_PROMOTED (1<<7) /* Slave selected for promotion. */
#define SRI_RECONF_SENT (1<<8) /* SLAVEOF <newmaster> sent. */
#define SRI_RECONF_INPROG (1<<9) /* Slave synchronization in progress. */
#define SRI_RECONF_DONE (1<<10) /* Slave synchronized with new master. */
#define SRI_FORCE_FAILOVER (1<<11) /* Force failover with master up. */
#define SRI_SCRIPT_KILL_SENT (1<<12) /* SCRIPT KILL already sent on -BUSY */

sentinelState类和sentinel对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct sentinelState {
char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
uint64_t current_epoch; /* Current epoch. */
dict *masters; /* Dictionary of master sentinelRedisInstances.
Key is the instance name, value is the
sentinelRedisInstance structure pointer. */
int tilt; /* Are we in TILT mode? */
int running_scripts; /* Number of scripts in execution right now. */
mstime_t tilt_start_time; /* When TITL started. */
mstime_t previous_time; /* Last time we ran the time handler. */
list *scripts_queue; /* Queue of user scripts to execute. */
char *announce_ip; /* IP addr that is gossiped to other sentinels if
not NULL. */
int announce_port; /* Port that is gossiped to other sentinels if
non zero. */
unsigned long simfailure_flags; /* Failures simulation. */
int deny_scripts_reconfig; /* Allow SENTINEL SET ... to change script
paths at runtime? */
} sentinel;

初始化

消息

INFO消息

  1. 发现Slave节点
  2. 确认主从关系

PING消息

检查实例(Master/Slave/Sentinel)是否超时/返回错误,从而决定是否进入SDOWN状态

PUB/SUB sentinel:hello 频道

  1. 广播自己(Sentinel)的存在
  2. 发送Master的当前配置给其他Sentinel用于同步

客户端可以订阅Sentinel 的消息,例如配置变更信息

1
+switch-master <master name> <oldip> <oldport> <newip> <newport>

连接

相比3.0版本,目前版本的Redis把连接相关的逻辑都放到了instanceLink里面。
Redis Sentinel会对每个Instance维护2个Hiredis连接ccpc,其中cc用于发送命令,而pc用来做Pub/Sub。
为什么不直接用一个连接呢?有一种解释是为了防止command连接断开时,丢失广播的消息

1
2
3
typedef struct instanceLink {
redisAsyncContext *cc; /* Hiredis context for commands. */
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */

初始化

Link使用下面方式进行初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
/* Create a not yet connected link object. */
instanceLink *createInstanceLink(void) {
instanceLink *link = zmalloc(sizeof(*link));

link->refcount = 1;
link->disconnected = 1;
link->pending_commands = 0;
link->cc = NULL;
link->pc = NULL;
link->cc_conn_time = 0;
link->pc_conn_time = 0;
link->last_reconn_time = 0;
link->pc_last_activity = 0;

act_ping_time设置为现在的时间,即使我们现在既没有建立连接,也没有ping。这个常用在我们没有连接成功时产生超时。

1
2
3
4
5
6
    link->act_ping_time = mstime();
link->last_ping_time = 0;
link->last_avail_time = mstime();
link->last_pong_time = mstime();
return link;
}

解析地址

createSentinelAddr这个函数用来把hostname:port解析为sentinelAddr*,稍后将会设置到ri->addr上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* Create a sentinelAddr object and return it on success.
* On error NULL is returned and errno is set to:
* ENOENT: Can't resolve the hostname.
* EINVAL: Invalid port number.
*/
sentinelAddr *createSentinelAddr(char *hostname, int port) {
char ip[NET_IP_STR_LEN];
sentinelAddr *sa;

if (port < 0 || port > 65535) {
errno = EINVAL;
return NULL;
}
if (anetResolve(NULL,hostname,ip,sizeof(ip)) == ANET_ERR) {
errno = ENOENT;
return NULL;
}
sa = zmalloc(sizeof(*sa));
sa->ip = sdsnew(ip);
sa->port = port;
return sa;
}

sentinelReconnectInstance

每一次sentinelHandleRedisInstance的事件都会调用sentinelReconnectInstance来尝试Reconnect(如果断线了的话)。
sentinelReconnectInstance函数中,在3.0版本中,检查如果对sentinelRedisInstance *ri尚未建立有网络连接,则调用redisAsyncConnect等函数建立网络连接。但现在进行了一些修改。
注意,只要cc和pc之间有一个连接断掉了,那么ri->link->disconnected就是true了。在3.0版本中,断线是作为一个单独的flag即SRI_DISCONNECTED展示的。但在目前版本中,整个连接相关的都放在link里面了。

1
2
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
if (ri->link->disconnected == 0) return;

port == 0是无效地址,那么你一开始在createSentinelAddr里面判断一下不就行了么?

1
if (ri->addr->port == 0) return; /* port == 0 means invalid address. */

我们对断线重连也要设置interval,为SENTINEL_PING_PERIOD

1
2
3
4
5
instanceLink *link = ri->link;
mstime_t now = mstime();

if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
ri->link->last_reconn_time = now;

下面处理cc连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/* Commands connection. */
if (link->cc == NULL) {
link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
if (!link->cc->err && server.tls_replication &&
(instanceLinkNegotiateTLS(link->cc) == C_ERR)) {
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to initialize TLS");
instanceLinkCloseConnection(link,link->cc);
} else if (link->cc->err) {
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
link->cc->errstr);
instanceLinkCloseConnection(link,link->cc);
} else {
link->pending_commands = 0;
link->cc_conn_time = mstime();
link->cc->data = link;
redisAeAttach(server.el,link->cc);
redisAsyncSetConnectCallback(link->cc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->cc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,link->cc);
sentinelSetClientName(ri,link->cc,"cmd");

/* Send a PING ASAP when reconnecting. */
sentinelSendPing(ri);
}
}

对于Master和Slave节点,我们处理pc连接。这里我有点疑问,Sentinel肯定需要连接PubSub的发送Hello消息和配置的啊,为啥这里不初始化pc呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
    /* Pub / Sub */
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
if (!link->pc->err && server.tls_replication &&
(instanceLinkNegotiateTLS(link->pc) == C_ERR)) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to initialize TLS");
} else if (link->pc->err) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
link->pc->errstr);
instanceLinkCloseConnection(link,link->pc);
} else {
int retval;

link->pc_conn_time = mstime();
link->pc->data = link;
redisAeAttach(server.el,link->pc);
redisAsyncSetConnectCallback(link->pc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->pc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,link->pc);
sentinelSetClientName(ri,link->pc,"pubsub");
/* Now we subscribe to the Sentinels "Hello" channel. */
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "%s %s",
sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
SENTINEL_HELLO_CHANNEL);
if (retval != C_OK) {
/* If we can't subscribe, the Pub/Sub connection is useless
* and we can simply disconnect it and try again. */
instanceLinkCloseConnection(link,link->pc);
return;
}
}
}
/* Clear the disconnected status only if we have both the connections
* (or just the commands connection if this is a sentinel instance). */
if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
link->disconnected = 0;
}

连接断线和宕机有什么区别?

我们首先看看link->disconnected什么时候是1:

  1. 刚创建连接
  2. 关闭连接调用instanceLinkCloseConnection
    releaseInstanceLink
    sentinelUpdateSentinelAddressInAllMasters
    sentinelResetMaster
    sentinelReconnectInstance
    sentinelCheckSubjectivelyDown(?)
  3. instanceLinkConnectionError
    sentinelLinkEstablishedCallback
    sentinelDisconnectCallback

入口

Redis的时钟中断处理例程serverCronserver.hz为时间间隔触发。在这个函数中会异步处理很多逻辑,例如:

  1. 【databasesCron】主动expire key,当然也有在look up的时候lazy expire的,这个lazy逻辑我们介绍过
  2. Software watchdog:watchdogScheduleSignal
  3. 更新一些统计,例如cronUpdateMemoryStats
  4. 【databasesCron】调用dictRehashMilliseconds
  5. 触发BGSAVE / AOF
  6. 【clientsCron】客户端超时
  7. 【replicationCron】Replication相关
1
2
3
4
5
6
7
8
// server.h
#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
...
if (server.sentinel_mode) sentinelTimer();
}

sentinelTimer是Sentinel的主例程。

1
2
3
4
5
6
void sentinelTimer(void) {
sentinelCheckTiltCondition();
sentinelHandleDictOfRedisInstances(sentinel.masters);
sentinelRunPendingScripts();
sentinelCollectTerminatedScripts();
sentinelKillTimedoutScripts();

我们持续变更Redis的”timer interrupt”的频率,这样每个Sentinel的频率都是不同的。这种不确定性避免了Sentinel在同一个时间启动,同一时间发起投票,从而导致脑裂。
我们的CONFIG_DEFAULT_HZ默认是10毫秒。

1
2
    server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}

例程详解

由于sentinelTimer基本上包括了Redis Sentinel的所有逻辑,所以我们单独开一个章节来讲。

sentinelCheckTiltCondition

判定是否要进入TILT模式。当程序发现两次sentinelTimer之间的时间差为负值或者大于SENTINEL_TILT_TRIGGER = 2000时,就会进入 TILT 模式。这通常意味着:

  1. Sentinel进程被阻塞:需要载入的数据(?)大、机器IO繁重、进程被信号停止
  2. 系统时钟出现了问题
    对于这种情况,会设置tilt=1,此时这个sentinel就会罚站SENTINEL_TILT_PERIOD这么长时间。

sentinelHandleDictOfRedisInstances

这个其实是Sentinel的主要逻辑。它会遍历监视的所有节点(包括Master、Slave和其他Sentinel),首先执行sentinelHandleRedisInstance。如果发现这个节点是SRI_MASTER,则对它的所有Slave和Sentinel也调用这个函数。注意这个节点并不是一个DFS或者递归,事实上他最多就两层,主要就是对于每个Master,找到它所有的Slave和监视它的所有Sentinel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;

/* There are a number of things we need to perform against every master. */
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

sentinelHandleRedisInstance(ri);
if (ri->flags & SRI_MASTER) {
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}

当一个Slave在SENTINEL_FAILOVER_STATE_UPDATE_CONFIG时,说明此时所有Slave已经完成同步,要进行配置更新。此时需要调用sentinelFailoverSwitchToPromotedSlave函数

1
2
3
4
    if (switch_to_promoted)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}

sentinelFailoverSwitchToPromotedSlave

这个函数主要负责将旧的Master即master移出表,并把新的Slave即master->promoted_slave加进去。
master->promoted_slave指向被提升为新Master的Slave。

1
2
3
4
5
6
7
8
9
10
11
12
13
/* This function is called when the slave is in
* SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
* to remove it from the master table and add the promoted slave instead. */
void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
sentinelRedisInstance *ref = master->promoted_slave ?
master->promoted_slave : master;

sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d",
master->name, master->addr->ip, master->addr->port,
ref->addr->ip, ref->addr->port);

sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
}

sentinelResetMasterAndChangeAddress

主要逻辑是这个sentinelResetMasterAndChangeAddress,这个函数用来设置Master的ip:port,但保持Master名字不变,并最终调用sentinelResetMaster
这个函数通常用来处理+switch-master消息,检索源码,我们确实看到所有+switch-mastersentinelEvent后面都会有这个函数。
这个函数返回错误,当ip:port不能被resolve。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
sentinelAddr *oldaddr, *newaddr;
sentinelAddr **slaves = NULL;
int numslaves = 0, j;
dictIterator *di;
dictEntry *de;

newaddr = createSentinelAddr(ip,port);
if (newaddr == NULL) return C_ERR;

/* Make a list of slaves to add back after the reset.
* Don't include the one having the address we are switching to. */
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);

if (sentinelAddrIsEqual(slave->addr,newaddr)) continue;
slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
slaves[numslaves++] = createSentinelAddr(slave->addr->ip,
slave->addr->port);
}
dictReleaseIterator(di);

/* If we are switching to a different address, include the old address
* as a slave as well, so that we'll be able to sense / reconfigure
* the old master. */
if (!sentinelAddrIsEqual(newaddr,master->addr)) {
slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
slaves[numslaves++] = createSentinelAddr(master->addr->ip,
master->addr->port);
}

/* Reset and switch address. */
sentinelResetMaster(master,SENTINEL_RESET_NO_SENTINELS);
oldaddr = master->addr;
master->addr = newaddr;
master->o_down_since_time = 0;
master->s_down_since_time = 0;

/* Add slaves back. */
for (j = 0; j < numslaves; j++) {
sentinelRedisInstance *slave;

slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip,
slaves[j]->port, master->quorum, master);
releaseSentinelAddr(slaves[j]);
if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
}
zfree(slaves);

/* Release the old address at the end so we are safe even if the function
* gets the master->addr->ip and master->addr->port as arguments. */
releaseSentinelAddr(oldaddr);
sentinelFlushConfig();
return C_OK;
}

sentinelHandleRedisInstance 监控部分

1
2
3
4
5
6
/* Perform scheduled operations for the specified Redis instance. */
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* ========== MONITORING HALF ============ */
/* Every kind of instance */
sentinelReconnectInstance(ri);
sentinelSendPeriodicCommands(ri);

sentinelReconnectInstance

见上文连接部分

sentinelSendPeriodicCommands

sentinelSendPeriodicCommands负责向sentinelRedisInstance *ri发送命令。
这里的link是一个instanceLink对象。这个对象的作用是为了节约hiredis连接建立的数量,如果有5个Sentinel监视100个Master,那么会创建5个连接而不是500个连接。这一段有点令人费解,因为就算是这5个Sentinel共享链接,那也要它们两两互联的5*4共同对外的100个连接啊?

1
2
3
4
5
6
7
8
9
10
/* Send periodic PING, INFO, and PUBLISH to the Hello channel to
* the specified master or slave instance. */
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;

/* Return ASAP if we have already a PING or INFO already pending, or
* in the case the instance is not properly connected. */
if (ri->link->disconnected) return;

因为INFO/PING/PUBLISH都不是极端重要的指令,所以我们对pending_commands设定了一个上限SENTINEL_MAX_PENDING_COMMANDS。当然还有一个兜底的情况就是当命令太多了,连接就会断掉触发重连。

1
2
3
// SENTINEL_MAX_PENDING_COMMANDS = 100
if (ri->link->pending_commands >=
SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;

如果ri是一个Slave节点,并且它对应的Master节点出于ODOWN,或者FAILOVER状态,或者,那么我们应该加快发送INFO的频率到1000ms一次,否则就是SENTINEL_INFO_PERIOD = 10000ms,此时Slaves中会有被其他Sentinel或者sysadmin提升为Master的情况。此外,当master_link_down_time不等于0(应该是Slave从Master断开了,导致Replication过程中断),也要更频繁的INFO,从而有一个更加清楚的断连时间记录。

1
2
3
4
5
6
7
8
if ((ri->flags & SRI_SLAVE) &&
((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
(ri->master_link_down_time != 0)))
{
info_period = 1000;
} else {
info_period = SENTINEL_INFO_PERIOD;
}

下面设置PING指令的间隔。
down_after_period表示实例经过down_after_period之后会被认为SDOWN。
这个值等于master->down_after_period,如果自己是Master的话,就取SENTINEL_DEFAULT_DOWN_AFTER= 30000

1
2
3
4
5
6
/* We ping instances every time the last received pong is older than
* the configured 'down-after-milliseconds' time, but every second
* anyway if 'down-after-milliseconds' is greater than 1 second. */
// #define SENTINEL_PING_PERIOD 1000
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

下面调用redisAsyncCommand对发送INFO,我们不需要向Sentinel发送INFO,这是因为我们不需要知道Sentinel的Slave和主从关系(其实也没有)。如果info_refresh为0表示我们从来没收到过INFO消息。

1
2
3
4
5
6
7
8
9
10
/* Send INFO to masters and slaves, not sentinels. */
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"INFO"));
if (retval == C_OK) ri->link->pending_commands++;
}

下面是发送PING,我们同样也要PING。相比3.0版本,现在对于last_ping_time超过ping_period/2的情况也会发送PING了。这样的目的是什么呢?

1
2
3
4
5
6

/* Send PING to all the three kinds of instances. */
if ((now - ri->link->last_pong_time) > ping_period &&
(now - ri->link->last_ping_time) > ping_period/2) {
sentinelSendPing(ri);
}

下面通过PubSub发送一个Hello消息。这个消息是为了达成在“消息”章节中提到的两个目标。我们将在后面详细解读。

1
2
3
4
5
    /* PUBLISH hello messages to all the three kinds of instances. */
if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
sentinelSendHello(ri);
}
}

【接上】sentinelInfoReplyCallback

INFO命令的回复会被注册到来处理

1
2
3
4
5
6
7
8
9
10
11
12
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

if (r->type == REDIS_REPLY_STRING)
sentinelRefreshInstanceInfo(ri,r->str);
}

这个函数的主干实际上就是调用sentinelRefreshInstanceInfo,这个函数超级大,用来解析INFO返回的信息

sentinelSendHello

这个函数通过Pub/Sub发送Hello指令到ri,完成“消息”章节中提到的两个目标。消息的格式如下

1
sentinel_ip,sentinel_port,sentinel_runid,current_epoch,master_name,master_ip,master_port,master_config_epoch

如果PUBLISH指令成功入队,那么就返回成功。
在3.0版本中,通过anetSockName直接获得ip,直接用server.port。但在这里会复杂一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int sentinelSendHello(sentinelRedisInstance *ri) {
char ip[NET_IP_STR_LEN];
char payload[NET_IP_STR_LEN+1024];
int retval;
char *announce_ip;
int announce_port;
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);

if (ri->link->disconnected) return C_ERR;

/* Use the specified announce address if specified, otherwise try to
* obtain our own IP address. */
if (sentinel.announce_ip) {
announce_ip = sentinel.announce_ip;
} else {
if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1)
return C_ERR;
announce_ip = ip;
}
if (sentinel.announce_port) announce_port = sentinel.announce_port;
else if (server.tls_replication && server.tls_port) announce_port = server.tls_port;
else announce_port = server.port;

下面我们生成要广播的配置,其格式是

1
2
sentinel_ip,sentinel_port,sentinel_runid,current_epoch         ,master_name ,master_ip      ,master_port      ,master_config_epoch
announce_ip,announce_port,sentinel.myid ,sentinel.current_epoch,master->name,master_addr->ip,master_addr->port,master_config_epoch

下面就是执行一个PUBLISH命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    /* Format and send the Hello message. */
snprintf(payload,sizeof(payload),
"%s,%d,%s,%llu," /* Info about this sentinel. */
"%s,%s,%d,%llu", /* Info about current master. */
announce_ip, announce_port, sentinel.myid,
(unsigned long long) sentinel.current_epoch,
/* --- */
master->name,master_addr->ip,master_addr->port,
(unsigned long long) master->config_epoch);
retval = redisAsyncCommand(ri->link->cc,
sentinelPublishReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"PUBLISH"),
SENTINEL_HELLO_CHANNEL,payload);
if (retval != C_OK) return C_ERR;
ri->link->pending_commands++;
return C_OK;
}

这个命令的回调主要用来更新last_pub_time

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

/* Only update pub_time if we actually published our message. Otherwise
* we'll retry again in 100 milliseconds. */
if (r->type != REDIS_REPLY_ERROR)
ri->last_pub_time = mstime();
}

sentinelHandleRedisInstance 处理部分

首先是对tilt的处理。

1
2
3
4
5
6
/* ============== ACTING HALF ============= */
if (sentinel.tilt) {
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
}

下面检查是否是SDOWN状态

1
2
/* Every kind of instance */
sentinelCheckSubjectivelyDown(ri);

如果是Master的话,需要查看是否进入的ODOWN状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    /* Masters and slaves */
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
/* Nothing so far. */
}

/* Only masters */
if (ri->flags & SRI_MASTER) {
sentinelCheckObjectivelyDown(ri);
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
sentinelFailoverStateMachine(ri);
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}

sentinelCheckSubjectivelyDown

首先是计算上次PING之后的未响应时间。这里比3.0的代码多了一个ri->link->disconnected的判断,这个是为什么呢?
last_avail_time在收到对PING的三个合法回复的时候会进行更新。
act_ping_time表示最后一个PING发出的时间,如果为0,表示刚收到一个PONG(见sentinelPingReplyCallback函数),并且还没有发出下一个PING。
对应地,还有一个last_ping_time,这个不会在收到PONG的时候置0。

1
2
3
4
5
6
7
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;

if (ri->link->act_ping_time)
elapsed = mstime() - ri->link->act_ping_time;
else if (ri->link->disconnected)
elapsed = mstime() - ri->link->last_avail_time;

如果检测到连接的活跃度(activity)很低,那么考虑断开重连。
首先检查cc连接,如果连接了至少SENTINEL_MIN_LINK_RECONNECT_PERIOD,并且仍然有一个pending了一半超时时间的PING,那么调用instanceLinkCloseConnection关闭连接。

*
1
2
3
4
5
6
7
8
9
10
11
12
// #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
if (ri->link->cc &&
(mstime() - ri->link->cc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
ri->link->act_ping_time != 0 && /* There is a pending ping... */
/* The pending ping is delayed, and we did not receive
* error replies as well. */
(mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
(mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
{
instanceLinkCloseConnection(ri->link,ri->link->cc);
}

然后检查pc连接,如果连接了超过SENTINEL_MIN_LINK_RECONNECT_PERIOD,并且在SENTINEL_PUBLISH_PERIOD * 3时间内没有再有活动。

1
2
3
4
5
6
7
8
// #define SENTINEL_PUBLISH_PERIOD 2000
if (ri->link->pc &&
(mstime() - ri->link->pc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
(mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
{
instanceLinkCloseConnection(ri->link,ri->link->pc);
}

如果下面两个条件满足,设置SDOWN这个flag

  1. 对方Instance无应答
  2. Sentinel认为实例是Master,但它向 Sentinel 报告它将成为Slave,但经过down_after_period和两个SENTINEL_INFO_PERIOD之后,这个操作仍然没有完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
    if (elapsed > ri->down_after_period ||
(ri->flags & SRI_MASTER &&
ri->role_reported == SRI_SLAVE &&
mstime() - ri->role_reported_time >
(ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
{
/* Is subjectively down */
if ((ri->flags & SRI_S_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
} else {

否则移除SDOWN状态。注意,经过搜寻源码,只有这个地方会移除SRI_S_DOWN这个标签。

1
2
3
4
5
6
7
        /* Is subjectively up */
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
}

sentinelCheckObjectivelyDown

注意这里的ODOWN使用的是weak quorum,这里的weak体现在票数统计是在给定时间范围内,而不是在某个时间点上达成一致。这里可以了解一下strong quorum algorithm和流言协议(Gossip)。
下面这个循环遍历所有Sentinel,并且统计具有SRI_MASTER_DOWN的数量,是否达到master->quorum

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;

if (master->flags & SRI_S_DOWN) {
/* Is down for enough sentinels? */
quorum = 1; /* the current sentinel. */
/* Count all the other sentinels. */
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);

如果票数达标,就设置ODOWN。

1
2
    if (quorum >= master->quorum) odown = 1;
}

对于ODOWN的情况,需要调用sentinelEvent事件发送到日志、PubSub,以及用户提醒脚本,然后在本地也要进行设置。

1
2
3
4
5
6
7
8
/* Set the flag accordingly to the outcome. */
if (odown) {
if ((master->flags & SRI_O_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum);
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}

对从ODOWN恢复的情况,要同理进行恢复。

1
2
3
4
5
6
7
    } else {
if (master->flags & SRI_O_DOWN) {
sentinelEvent(LL_WARNING,"-odown",master,"%@");
master->flags &= ~SRI_O_DOWN;
}
}
}

检查quorum参数

目前版本提供一个ckquorum命令用来判断Quorum的数量是否合法。
首先统计有多少能够使用的Sentinel节点。voters表示从Master中记录的sentinels数量,加上自己。然后遍历所有的Sentinel,统计出没有SDOWN和ODOWN的节点的数量为usable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int sentinelIsQuorumReachable(sentinelRedisInstance *master, int *usableptr) {
dictIterator *di;
dictEntry *de;
int usable = 1; /* Number of usable Sentinels. Init to 1 to count myself. */
int result = SENTINEL_ISQR_OK;
int voters = dictSize(master->sentinels)+1; /* Known Sentinels + myself. */

di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

if (ri->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
usable++;
}
dictReleaseIterator(di);

如果需要的quorum数量大于usable,说明quorum永远达不到,返回错误。

1
if (usable < (int)master->quorum) result |= SENTINEL_ISQR_NOQUORUM;

如果usable没有达到多数,那么就说明这个决议即使达成也是无效力的。

1
2
3
4
    if (usable < voters/2+1) result |= SENTINEL_ISQR_NOAUTH;
if (usableptr) *usableptr = usable;
return result;
}

注意,对于quorum至少要占voters绝对多数的控制在sentinelGetLeader下面。

sentinelStartFailoverIfNeeded

在判断完ODOWN之后,就需要判断是否要启动故障转移(FailOver)过程。
在执行前,需要判断是否满足三个条件。

  1. 如果没ODOWN,肯定不需要FailOver
  2. 如果正在FailOver,肯定也要直接return
  3. 如果刚FailOver完,也不进行FailOver
    这个时间的默认值是SENTINEL_DEFAULT_FAILOVER_TIMEOUT =(60*3*1000),是3分钟。
    这点也许是为了防止过于频繁的进行FailOver操作,但是既然故障了,又不FailOver那怎么办呢?

下面查看代码,这对应了前两点条件

1
2
3
4
5
6
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
/* We can't failover if the master is not in O_DOWN state. */
if (!(master->flags & SRI_O_DOWN)) return 0;

/* Failover already in progress? */
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;

第三个条件将“CD时间”设置在了master->failover_timeout*2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* Last failover attempt started too little time ago? */
if (mstime() - master->failover_start_time <
master->failover_timeout*2)
{
if (master->failover_delay_logged != master->failover_start_time) {
time_t clock = (master->failover_start_time +
master->failover_timeout*2) / 1000;
char ctimebuf[26];

ctime_r(&clock,ctimebuf);
ctimebuf[24] = '\0'; /* Remove newline. */
master->failover_delay_logged = master->failover_start_time;
serverLog(LL_WARNING,
"Next failover delay: I will not start a failover before %s",
ctimebuf);
}
return 0;
}

在三个return之后,到了FailOver的执行过程。需要注意的是,执行FailOver的Sentinel不一定是发起FailOver的Sentinel,这是因为FailOver总是由Sentinel里面的Leader执行的。但当FailOver发起之后会导致选举,所以可能到时候Leader也换掉了。这个和Raft中有点不一样,Raft中新Leader是不能提交老Leader的日志条目的。

1
2
3
    sentinelStartFailover(master);
return 1;
}

下面查看这个函数

1
2
3
4
void sentinelStartFailover(sentinelRedisInstance *master) {
serverAssert(master->flags & SRI_MASTER);

master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;

首先,会更新failover_state字段,我们来看看这个字段的取值都是什么:

  1. SENTINEL_FAILOVER_STATE_NONE:没有在FailOver
  2. SENTINEL_FAILOVER_STATE_WAIT_START:等待开始FailOver
  3. SENTINEL_FAILOVER_STATE_SELECT_SLAVE:选择一个Slave作为新Master
  4. SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:向该Slave发送SLAVEOF指令以成为Master
  5. SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:向剩余Slave发送SLAVEOF指令,让它们跟随新Master
  6. SENTINEL_FAILOVER_STATE_RECONF_SLAVES:让剩余Slave开始复制新Master
  7. SENTINEL_FAILOVER_STATE_UPDATE_CONFIG:此时所有Slave已经完成同步

下面继续查看源码,会自增这个Master的failover_epoch,这个类似于Raft中任期的概念。

1
2
3
4
5
master->flags |= SRI_FAILOVER_IN_PROGRESS;
master->failover_epoch = ++sentinel.current_epoch;
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
sentinelEvent(LL_WARNING,"+try-failover",master,"%@");

这里随机一个FailOver开始时间,类似Raft中的Follower变成Candidate的方式。

1
2
3
4
    // #define SENTINEL_MAX_DESYNC 1000
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
master->failover_state_change_time = mstime();
}

sentinelAskMasterStateToOtherSentinels

如果这个Sentinel认为Master已下线,会向其他 Sentinel 发送 SENTINEL is-master-down-by-addr命令,尝试获得足够的票数,将Master标记为ODOWN状态,并开启FailOver。
这里有一点疑惑,因为在这个函数执行前sentinelStartFailoverIfNeeded肯定已经调用过sentinelStartFailover了,为什么还要调用这个函数呢?我理解这个是额外需要调用一次,正常情况是在sentinelFailoverStateMachine之后的一次调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#define SENTINEL_ASK_FORCED (1<<0)
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
dictIterator *di;
dictEntry *de;

di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
char port[32];
int retval;

/* If the master state from other sentinel is too old, we clear it. */
if (elapsed > SENTINEL_ASK_PERIOD*5) {
ri->flags &= ~SRI_MASTER_DOWN;
sdsfree(ri->leader);
ri->leader = NULL;
}

在下面三种情况下询问其他Sentinel:

  1. Master必须具有SRI_S_DOWN这个flag
  2. 必须能够连接到这个Sentinel
  3. 除非SENTINEL_ASK_FORCED,否则我们要在SENTINEL_ASK_PERIOD时间内都没有收到riSENTINEL is-master-down的回复,我们才会再次询问。
1
2
3
4
5
if ((master->flags & SRI_S_DOWN) == 0) continue;
if (ri->link->disconnected) continue;
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue;

下面是具体询问流程,实际上就是发送一个SENTINEL is-master-down-by-addr指令。如果启动了FailOver,那么把自己的sentinel.myid发出去,竞选FailOver Leader。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
        /* Ask */
ll2string(port,sizeof(port),master->addr->port);
retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply, ri,
"%s is-master-down-by-addr %s %s %llu %s",
sentinelInstanceMapCommand(ri,"SENTINEL"),
master->addr->ip, port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid : "*");
if (retval == C_OK) ri->link->pending_commands++;
}
dictReleaseIterator(di);
}

【接上】对sentinelAskMasterStateToOtherSentinels的回复

对端对sentinelAskMasterStateToOtherSentinels的处理过程直接在sentinelCommand里面。
首先复习一下这个请求的格式。

1
SENTINEL is-master-down-by-addr ip port current_epoch runid

其中ip:port是我们要检查的Master的地址。Note that the command will not check by name but just by master, in theory different Sentinels may monitor differnet masters with the same name.
current-epoch需要被用来判断我们是否能够对选举FailOver Leader进行投票。每个Sentinel每个epoch只能投一票。
如果runid*,表示我们并没有向这个Sentinel请求投票,否则设置为我们想要它投票的runid
下面看具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sentinelRedisInstance *ri;
long long req_epoch;
uint64_t leader_epoch = 0;
char *leader = NULL;
long port;
int isdown = 0;

if (c->argc != 6) goto numargserr;
if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != C_OK ||
getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL)
!= C_OK)
return;
ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
c->argv[2]->ptr,port,NULL);

/* It exists? Is actually a master? Is subjectively down? It's down.
* Note: if we are in tilt mode we always reply with "0". */
if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
(ri->flags & SRI_MASTER))
isdown = 1;

如果is-master-down-by-addr这个请求带了runid参数,说明这是一个带竞选的信息,此时会调用sentinelVoteLeader来Vote。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Vote for the master (or fetch the previous vote) if the request
* includes a runid, otherwise the sender is not seeking for a vote. */
if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
c->argv[5]->ptr,
&leader_epoch);
}

/* Reply with a three-elements multi-bulk reply:
* down state, leader, vote epoch. */
addReplyArrayLen(c,3);
addReply(c, isdown ? shared.cone : shared.czero);
addReplyBulkCString(c, leader ? leader : "*");
addReplyLongLong(c, (long long)leader_epoch);
if (leader) sdsfree(leader);

sentinelVoteLeader

sentinelVoteLeader函数对应了Raft中Follower投票的逻辑。
如果req_runid这一台Sentinel在req_epoch已经投过票了,就返回自己原来投的票。如果在比req_epoch更高的epoch投过票了,那么返回在这个更高的epoch投的票。
如果暂时不能投票(?)就返回NULL。否则:

  1. 设置master->leaderreq_runid,也是函数的返回值
  2. 设置master->leader_epochleader_epoch的值为sentinel.current_epoch
1
2
3
4
5
6
7
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
if (req_epoch > sentinel.current_epoch) {
sentinel.current_epoch = req_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}

当前Master的epoch是陈旧的,并且当前Sentinel的节点current_epoch不领先于req_epoch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
{
sdsfree(master->leader);
master->leader = sdsnew(req_runid);
master->leader_epoch = sentinel.current_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu",
master->leader, (unsigned long long) master->leader_epoch);
/* If we did not voted for ourselves, set the master failover start
* time to now, in order to force a delay before we can start a
* failover for the same master. */
if (strcasecmp(master->leader,sentinel.myid))
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
}

*leader_epoch = master->leader_epoch;
return master->leader ? sdsnew(master->leader) : NULL;
}

【接上】sentinelReceiveIsMasterDownReply

我们看看Sentinel如何处理sentinelAskMasterStateToOtherSentinels的返回结果,即投票结果。
下面是一些Sanity检查,可以跳过直接看主干。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* Receive the SENTINEL is-master-down-by-addr reply, see the
* sentinelAskMasterStateToOtherSentinels() function for more information. */
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;
/* Ignore every error or unexpected reply.
* Note that if the command returns an error for any reason we'll
* end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
r->element[0]->type == REDIS_REPLY_INTEGER &&
r->element[1]->type == REDIS_REPLY_STRING &&
r->element[2]->type == REDIS_REPLY_INTEGER)
{

更新ri的投票结果r->element[1]->strri->leader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

ri->last_master_down_reply_time = mstime();
if (r->element[0]->integer == 1) {
ri->flags |= SRI_MASTER_DOWN;
} else {
ri->flags &= ~SRI_MASTER_DOWN;
}
if (strcmp(r->element[1]->str,"*")) {
/* If the runid in the reply is not "*" the Sentinel actually
* replied with a vote. */
sdsfree(ri->leader);
if ((long long)ri->leader_epoch != r->element[2]->integer)
serverLog(LL_WARNING,
"%s voted for %s %llu", ri->name,
r->element[1]->str,
(unsigned long long) r->element[2]->integer);
ri->leader = sdsnew(r->element[1]->str);
ri->leader_epoch = r->element[2]->integer;
}
}
}

sentinelFailoverStateMachine

这个函数处理除了SENTINEL_FAILOVER_STATE_NONESENTINEL_FAILOVER_STATE_UPDATE_CONFIG(实际上是一头一尾)的所有命令。
每一次状态转移都需要更新failover_state_change_time,这个时间会在一些地方被用来处理Abort逻辑。

1
2
3
4
5
6
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);

if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

switch(ri->failover_state) {

这个函数发送sentinalSignal

  1. -failover-abort-not-elected
  2. +elected-leader+failover-state-select-slave

可能到达状态:

  1. SENTINEL_FAILOVER_STATE_NONE
  2. SENTINEL_FAILOVER_STATE_SELECT_SLAVE
1
2
3
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;

这个函数发送sentinalSignal

  1. -failover-abort-no-good-slave
  2. +selected-slave+failover-state-send-slaveof-noone

可能到达状态:

  1. SENTINEL_FAILOVER_STATE_NONE
  2. SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
1
2
3
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;

这个函数发送sentinalSignal

  1. -failover-abort-slave-timeout
  2. +failover-state-wait-promotion

可能到达状态:

  1. SENTINEL_FAILOVER_STATE_NONE
  2. SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
1
2
3
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;

这个函数发送sentinalSignal

  1. -failover-abort-slave-timeout

可能到达状态:

  1. SENTINEL_FAILOVER_STATE_NONE
1
2
3
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;

这个函数发送sentinalSignal
TODO
可能到达状态::
TODO

1
2
3
4
5
        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}

sentinelFailoverWaitStart

首先检查自己是不是这个epoch的FailOver的Leader。注意之前提到过,FailOver的发起者不一定是FailOver的执行者。这个过程涉及调用sentinelGetLeader函数,可能会触发选举。

1
2
3
4
5
6
7
8
9
/* ---------------- Failover state machine implementation ------------------- */
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
char *leader;
int isleader;

/* Check if we are the leader for the failover epoch. */
leader = sentinelGetLeader(ri, ri->failover_epoch);
isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
sdsfree(leader);

如果我自己不是Leader,并且这个FailOver不是由SENTINEL FAILOVER命令触发的强制FailOver,那么就不进行下面的FailOver流程。

1
2
3
4
5
6
7
/* If I'm not the leader, and it is not a forced failover via
* SENTINEL FAILOVER, then I can't continue with the failover. */
if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
int election_timeout = SENTINEL_ELECTION_TIMEOUT;

/* The election timeout is the MIN between SENTINEL_ELECTION_TIMEOUT
* and the configured failover timeout. */

如果超过了election_timeout,我们会直接Abort整个FailOver过程。

1
2
3
4
5
6
7
8
9
    if (election_timeout > ri->failover_timeout)
election_timeout = ri->failover_timeout;
/* Abort the failover if I'm not the leader after some time. */
if (mstime() - ri->failover_start_time > election_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}

否则切换状态到SENTINEL_FAILOVER_STATE_SELECT_SLAVE

1
2
3
4
5
6
7
    sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
sentinelSimFailureCrash();
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
}

【接上】sentinelGetLeader

这个函数用来返回给定的epoch对应的Leader。
扫描这个Master下面的所有Sentinel,检查对于epoch是否存在一个Leader。为了成为Leader,我们需要有majority的节点都投票给这个Leader,参与投票的节点由上次SENTINEL RESET的指定。
一个问题是,Leader选举是在什么时候开始的呢?我们需要注意的是sentinelGetLeader这个函数并不是只会触发一遍,而是在整个sentinelTimer上都会进行调用。在第一次调用的时候,肯定是一票都没有,winnerNULL,此时就给自己投一票,同时函数sentinelAskMasterStateToOtherSentinels会发送is-master-down-by-addr给另外一方。

1
2
3
4
5
6
7
8
9
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
dict *counters;
dictIterator *di;
dictEntry *de;
unsigned int voters = 0, voters_quorum;
char *myvote;
char *winner = NULL;
uint64_t leader_epoch;
uint64_t max_votes = 0;

首先,断言一下Master是ODOWN状态,并且FailOver在进行中

1
2
serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
counters = dictCreate(&leaderVotesDictType,NULL);

下面这个循环,对于每一个Sentinel选出的Leader,我们都投上一票。看起来这个很奇怪,感觉和墙头草一样?其实不然,这里“投上一票”的表述并不准确。ri->leader实际上记录了这个Sentinel的投票结果,因此我认为这个循环实际上是记录了每一个Sentinel到底得了多少票。所以不是“投上一票”,而是记上一票。

1
2
3
4
5
6
7
8
9
10
voters = dictSize(master->sentinels)+1; /* All the other sentinels and me.*/

/* Count other sentinels votes */
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
sentinelLeaderIncr(counters,ri->leader);
}
dictReleaseIterator(di);

在遍历统计完投票情况后,检查谁胜出,必须满足

  1. 绝对多数
  2. 必须大于master->quorum
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    di = dictGetIterator(counters);
    while((de = dictNext(di)) != NULL) {
    uint64_t votes = dictGetUnsignedIntegerVal(de);

    if (votes > max_votes) {
    max_votes = votes;
    winner = dictGetKey(de);
    }
    }
    dictReleaseIterator(di);

现在需要再统计一下自己的票数。如果自己没投票,那么就投给winner;如果没有winner,就投给自己。

1
2
3
4
if (winner)
myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
else
myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch);

上面只是发送Vote消息,sentinelLeaderIncr函数是自增counters的计数,这个发生在本地,把自己的一票也加上去。如果投票之后的票数比最大票数要大,那么更换Leader。

1
2
3
4
5
6
7
8
if (myvote && leader_epoch == epoch) {
uint64_t votes = sentinelLeaderIncr(counters,myvote);

if (votes > max_votes) {
max_votes = votes;
winner = myvote;
}
}

下面检查winner的票数是否满足quorum和master->quorum

1
2
3
4
5
6
7
8
9
    voters_quorum = voters/2+1;
if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
winner = NULL;

winner = winner ? sdsnew(winner) : NULL;
sdsfree(myvote);
dictRelease(counters);
return winner;
}

sentinelFailoverSelectSlave

这个函数主要逻辑是调用sentinelSelectSlave。从原来所有的Slave中,挑选一个作为新的Master,如果没有合格的新Master,那么返回NULL。这里同样有个sentinelAbortFailover的分支来处理选不到的情况。

1
2
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
sentinelRedisInstance *slave = sentinelSelectSlave(ri);

下面就是设置slave->flags,设置ri->promoted_slave

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    /* We don't handle the timeout in this state as the function aborts
* the failover or go forward in the next state. */
if (slave == NULL) {
sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
sentinelAbortFailover(ri);
} else {
sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
slave->flags |= SRI_PROMOTED;
ri->promoted_slave = slave;
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
slave, "%@");
}
}

sentinelSelectSlave

首先是计算能接受的Slave和Master之间的Replication断连的最大时长,超过这个时间的Slave我们不考虑,从而保证数据库的时效性。
down_after_period表示在这段时间后会被认为短连了,所以max_master_down_time等于Master的SDOWN到现在的时间,加上十倍的down_after_period。这个计算方式很神奇。因为从当前Sentinel来看,Master已经处于下线状态,所以正常来说,Slave与Master之间的连接断开时间不应该超过down-after-period * 10。这听上去有点像黑魔法,不过这个判断的原理是这样的:当Master下线之后,Master和Slave的连接就会断开,但只要先下线的是Master而不是Slave,即连接断开是由Master而不是Slave造成的,那么Master和Slave之间的连接断开时间就不会太长。不过这只是一个辅助手段,因为最终我们都会使用复制偏移量来挑选Slave。

1
2
3
4
5
6
7
8
9
10
11
12
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance **instance =
zmalloc(sizeof(instance[0])*dictSize(master->slaves));
sentinelRedisInstance *selected = NULL;
int instances = 0;
dictIterator *di;
dictEntry *de;
mstime_t max_master_down_time = 0;

if (master->flags & SRI_S_DOWN)
max_master_down_time += mstime() - master->s_down_since_time;
max_master_down_time += master->down_after_period * 10;

下面开始遍历所有的Slave来生成候选集instance,需要满足下面的条件:

  1. 不能SDOWN或者ODOWN
  2. 不能短连
  3. Link不能超时
  4. slave->slave_priority不能为0。这个值是从INFO得到的,默认为SENTINEL_DEFAULT_SLAVE_PRIORITY = 100
  5. 对PING(5xPERIOD)/INFO(3xPERIOD)的回复不能超时
  6. 和Master的Replication断连的时长不能超过上面的计算值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;

if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
if (slave->link->disconnected) continue;
if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
if (slave->slave_priority == 0) continue;

/* If the master is in SDOWN state we get INFO for slaves every second.
* Otherwise we get it with the usual period so we need to account for
* a larger delay. */
if (master->flags & SRI_S_DOWN)
info_validity_time = SENTINEL_PING_PERIOD*5;
else
info_validity_time = SENTINEL_INFO_PERIOD*3;
if (mstime() - slave->info_refresh > info_validity_time) continue;
if (slave->master_link_down_time > max_master_down_time) continue;
instance[instances++] = slave;
}

下面,我们对候选集进行排序,然后选到最优的

1
2
3
4
5
6
7
8
9
    dictReleaseIterator(di);
if (instances) {
qsort(instance,instances,sizeof(sentinelRedisInstance*),
compareSlavesForPromotion);
selected = instance[0];
}
zfree(instance);
return selected;
}

compareSlavesForPromotion

主要比较优先级分为下面几个:

  1. slave_priority越小的
  2. slave_repl_offset(replication offset)越大的,这表示从服务器的偏移量
  3. runid字母序越小的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* 
*
* Basically if runid is the same, the slave that processed more commands
* from the master is selected.
*
* The function returns the pointer to the selected slave, otherwise
* NULL if no suitable slave was found.
*/
int compareSlavesForPromotion(const void *a, const void *b) {
sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
**sb = (sentinelRedisInstance **)b;
char *sa_runid, *sb_runid;

if ((*sa)->slave_priority != (*sb)->slave_priority)
return (*sa)->slave_priority - (*sb)->slave_priority;

/* If priority is the same, select the slave with greater replication
* offset (processed more data from the master). */
if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
return -1; /* a < b */
} else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
return 1; /* a > b */
}

因为一下老的slave没有publishrunid,所以这里将其设置为NULL,默认NULL的字母序最大

1
2
3
4
5
6
7
    sa_runid = (*sa)->runid;
sb_runid = (*sb)->runid;
if (sa_runid == NULL && sb_runid == NULL) return 0;
else if (sa_runid == NULL) return 1; /* a > b */
else if (sb_runid == NULL) return -1; /* a < b */
return strcasecmp(sa_runid, sb_runid);
}

sentinelFailoverSendSlaveOfNoOne

如果link->disconnected,那么不断重试直到超时failover_timeout,触发sentinelAbortFailover
ri->promoted_slave是在sentinelFailoverSelectSlave被设置的。

1
2
3
4
5
6
7
8
9
10
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
int retval;

if (ri->promoted_slave->link->disconnected) {
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}

调用sentinelSendSlaveOf,传入NULL表示让这个ri->promoted_slave执行SLAVEOF NO ONE为Master。

1
2
3
4
5
6
7
    retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
if (retval != C_OK) return;
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
ri->promoted_slave,"%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
}

sentinelSendSlaveOf

这个命令先发送SLAVEOF。再发送CONFIG REWRITE,从而在可能的情况下将配置刷到配置上。可能的情况指Redis支持写Config,并且这个服务器是从Config文件启动的。

1
2
3
4
5
6
7
8
9
/* 
* The command returns C_OK if the SLAVEOF command was accepted for
* (later) delivery otherwise C_ERR. The command replies are just
* discarded. */
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
char portstr[32];
int retval;

ll2string(portstr,sizeof(portstr),port);

如果hostNULL,发送SLAVEOF NO ONE,让这台机器变为Master。

1
2
3
4
5
6
/* If host is NULL we send SLAVEOF NO ONE that will turn the instance
* into a master. */
if (host == NULL) {
host = "NO";
memcpy(portstr,"ONE",4);
}

出于安全因素,我们要开启一个事务,这个也是相比3.0升级的地方。包含:

  1. SLAVEOF
  2. 重新配置
  3. Disconnect all clients (but this one sending the commnad),从而触发ask-master-on-reconnection协议(?)
    注意CLIENT KILL TYPE <type>这个指令从2.8才开始有的。不过往低版本的发并没有为题,因为CLIENT是一个variadic command,所以不会被认为是错误,所以事务不会失败。

我们不检查命令的返回值,而是观察下一个INFO。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"MULTI"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"SLAVEOF"),
host, portstr);
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s REWRITE",
sentinelInstanceMapCommand(ri,"CONFIG"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

for (int type = 0; type < 2; type++) {
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s",
sentinelInstanceMapCommand(ri,"CLIENT"),
type == 0 ? "normal" : "pubsub");
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
}

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"EXEC"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

return C_OK;
}

实验

我们的配置如下(推荐用自己编译的Redis,否则Redis服务会自动重启,需要用/etc/init.d/redis-server stop关闭)

1
2
3
Master localhost 6379
Slave1 localhost 6479
Slave2 localhost 6579

对于Slave1

1
2
3
4
# redis2.conf
replicaof 127.0.0.1 6379
port 6479
pidfile /var/run/redis_6479.pid

运行

1
2
./src/redis-server redis2.conf &
redis-cli -h 127.0.0.1 -p 6479

对于Slave2同理
INFO一下主节点

1
2
3
4
5
6
7
8
9
10
# Replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6479,state=online,offset=27029,lag=1
slave1:ip=127.0.0.1,port=6579,state=online,offset=27559,lag=0
master_repl_offset:27559
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:2
repl_backlog_histlen:27558

对于Sentinel2,我们配置如下。注意log文件配置相对路径似乎会打印不了log

1
2
3
4
5
port 26479
logfile "/tmp/sentinel2.log"
daemonize yes
pidfile /var/run/redis-sentinel2.pid
sentinel monitor mymaster 127.0.0.1 6379 2

其他如法炮制

1
./src/redis-sentinel sentinel.conf

启动后可以

1
2
3
4
5
6
7
8
9
10
$ redis-cli -p 26379
127.0.0.1:26379> sentinel masters
1) 1) "name"
2) "mymaster"
3) "ip"
4) "127.0.0.1"
5) "port"
6) "6379"
7) "runid"
8) "183af86ac492235bf97739bcdad0353f1d4e61df"

还可以INFO一下

1
2
3
4
5
6
7
8
127.0.0.1:26379> info sentinel
# Sentinel
sentinel_masters:1
sentinel_tilt:0
sentinel_running_scripts:0
sentinel_scripts_queue_length:0
sentinel_simulate_failure_flags:0
master0:name=mymaster,status=ok,address=127.0.0.1:6379,slaves=2,sentinels=3

但似乎没有办法从Master找到其他的Sentinel
注意,如果有Duplicated master name.的错误,可以考虑删除掉conf下面这段自动生成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
protected-mode no
user default on nopass ~* +@all
sentinel known-replica mymaster 127.0.0.1 6479
sentinel known-sentinel mymaster 127.0.0.1 26479 0833444de1d44b73ce6382af2ff0aeb21951b19a
sentinel known-sentinel mymaster 127.0.0.1 26379 2ddc47e7af3cebe8b1642127ed72002bb918550c
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel config-epoch mymaster 0
sentinel leader-epoch mymaster 0
sentinel known-replica mymaster 127.0.0.1 6579
sentinel known-replica mymaster 127.0.0.1 6479
sentinel known-sentinel mymaster 127.0.0.1 26479 0833444de1d44b73ce6382af2ff0aeb21951b19a
sentinel known-sentinel mymaster 127.0.0.1 26379 2ddc47e7af3cebe8b1642127ed72002bb918550c
sentinel current-epoch 0