Redis Sentinel实现原理分析

Sentinel(哨兵)监控Redis集群中Master状态,是Redis 的高可用性解决方案。它监视一系列Master-Slave集群(和其他Sentinel)。当某个Master下线时,自动将该Master下的某个Slave升级为Master。

Sentinel的代码阅读难度相对较大,这是因为它是时钟驱动的一个状态机,所以在逻辑上有点不是很连贯。
本文使用的版本和Redis底层对象实现原理分析这篇文章是相同的。有一些基础知识也在这篇文章中列出。

本文中【接上】暗示这个逻辑其实不属于父标题,但在时序上是上个标题的后续环节。例如上一个标题是Request的产生逻辑,下一个就是对端对Request的处理逻辑,下一个就是对对端Response的处理逻辑。

本文中 Master 指的是 Redis 主从复制中的主节点。(Sentinel) Leader 指的是 FailOver 流程中选举出来的领头的 Sentinel。

Redis主从复制

详见文章

总览

Sentinel用法与常见命令

表示监控一个叫 mymaster 的服务器,其 IP:Port 为 127.0.0.1:6379,并且这个 Master 要 Failover 至少需要2个。但注意这里的2并不是下确界,除了这个约束,还需要有满足 majority 的限制条件。

1
sentinel monitor mymaster 127.0.0.1 6379 2

down-after-milliseconds表示经过这一段时间,如果服务器没有对 Sentinel 的 PING 进行回复,或者一直返回无效回复(注意,回复错误也是有效回复),那么就会被标记为SDOWN。

1
sentinel down-after-milliseconds mymaster 60000

failover-timeout 表示故障恢复的超时时间。

1
sentinel failover-timeout mymaster 180000

parallel-syncs 表示在执行故障转移时,最多可以有多少 Slave 同时对新 Master 进行同步,数字越小,完成故障转移所需的时间就越长。

1
sentinel parallel-syncs mymaster 1

常用结构/模块

在本章节中,主要介绍两个基础对象,即全局上下文 sentinelState 和当前 Sentinel 对其监视的每个对象的描述类 sentinelRedisInstance

sentinelRedisInstance

每个被监视的 Redis 实例都会创建一个 sentinelRedisInstance 结构,它具有flag分别标注了实例的状态。
可以看到

1
2
3
#define SRI_MASTER  (1<<0)
#define SRI_SLAVE (1<<1)
#define SRI_SENTINEL (1<<2)

SDOWN 表示主观下线,即这个节点被一个 Sentinel 认为下线。ODOWN 是客观下线,表示被 Sentinel 集群认为下线。
如果另一个 Sentinel 觉得某个 Master 节点 SDOWN 了,那么它会通知我,并且让我把这个 Master 节点也设置为 SDOWN 么?是的,可以查看 sentinelAskMasterStateToOtherSentinels 函数。

1
2
#define SRI_S_DOWN (1<<3)   /* Subjectively down (no quorum). */
#define SRI_O_DOWN (1<<4) /* Objectively down (confirmed by others). */

SRI_MASTER_DOWNSRI_S_DOWN 有什么区别呢?

  1. SRI_MASTER_DOWN 表示对方 Sentinel 关于 Master 有没有宕机的投票结果。用来统计生成 ODOWN 的结果。
  2. SRI_S_DOWN 表示我自己认为某个节点有没有 SDOWN,包括 Slave、Master、Sentinel。
    1
    2
    3
    4
    5
    #define SRI_MASTER_DOWN (1<<5) /* A Sentinel with this flag set thinks that
    its master is down. */
    #define SRI_FAILOVER_IN_PROGRESS (1<<6) /* Failover is in progress for
    this master. */
    #define SRI_PROMOTED (1<<7) /* Slave selected for promotion. */

下面三个是和 Reconf 有关的 Flag,我们在稍后进行考虑。

1
2
3
4
5
#define SRI_RECONF_SENT (1<<8)     /* SLAVEOF <newmaster> sent. */
#define SRI_RECONF_INPROG (1<<9) /* Slave synchronization in progress. */
#define SRI_RECONF_DONE (1<<10) /* Slave synchronized with new master. */
#define SRI_FORCE_FAILOVER (1<<11) /* Force failover with master up. */
#define SRI_SCRIPT_KILL_SENT (1<<12) /* SCRIPT KILL already sent on -BUSY */

sentinelRedisInstance相关参数解释

  1. runid
    runid 是类似 183af86ac492235bf97739bcdad0353f1d4e61df 的串,在后面的实验中可以看到。
    注意,如果需要获得当前 Sentinel 进程的 runid,通过 sentinel.myid 来访问。在 3.0 版本前,可以通过 server.runid 来访问。其中 server

    1
    extern struct redisServer server;

    sentinel.myidsentinelIsRunning中被getRandomHexChars函数设置,似乎新版本中,使用Sentinel独立的runid了。

  2. name
    ri->runidri->name是两个不同概念。
    对于Master节点来说,表示这个Sentinel中Master节点的名字,这个名字是在配置文件里面用SENTINEL monitor设置的。
    对于Slave和Sentinel节点来说,这个是自动设置的。在createSentinelRedisInstance中可以看到,Slave的nameip:port;从sentinelProcessHelloMessage可以看到,Sentinel的name是runid。
    mastersslavessentinels这三个dict的key是name

  3. parallel_syncs
    在failover时,可以同时对新的Master进行同步的Slave数量。通过下面命令进行设置。

    1
    SENTINEL parallel-syncs <master-name> <number>
  4. leader
    如果这个 Instance 是 Master,那么 leader 表示将负责对这个 Master 进行 FailOver 的 Sentinel 的 runid。
    如果这个 Instance 是 Sentinel,那么 leader 就是被选举出来的 Sentinel Leader 的 runid。
    对于非 SRI_MASTER_DOWN 情况,这个不生效。

  5. pending_commands
    in-flight的命令的数量。即发出还未收到回复。

  6. failover_state
    注意,还有个flags中的标签SRI_FAILOVER_IN_PROGRESS。这两个标签都只对Master的Instance是有意义的。

  7. addr
    表示Master的addr,是一个sentinelAddr *

  8. master
    我们需要区分ri->master和另外一个sentinel.masters
    首先,ri->master只在这个Instance是Slave的时候有效,表示这个Slave对应的Master;而sentinel.masters表示这个Sentinel监控的所有Master实例。
    我们需要记住一点,整个进程里面做事的第一人称“我”是这个Sentinel,因此只有在sentinel.masters这个全局单例中才会看到我们监视了哪些Master。

  9. master->failover_epochmaster->config_epochmaster->leader_epochsentinel.current_epoch
    我们需要区分这三者,以及sentinelState中的current_epoch的区别:

    1. master->failover_epoch
      表示这次FailOver开始时的Epoch。当某个Sentinel进程决定调用sentinelStartFailover开始FailOver时,它需要自增自己的current_epoch,并且同步给failover_epoch
    2. master->config_epoch
      sentinelProcessHelloMessage中根据比较自己的和远端的config_epoch的大小,进行配置更新。
    3. master->leader_epoch
      leader_epochleader是联合使用的,表示当前Sentinel认为的处理这个Master的FailOver过程的Leader,发起竞选的epoch。
    4. sentinel.current_epoch
      因为一个Sentinel可能处理若干个Master的FailOver过程,所以sentinel.current_epoch作为当前Sentinel的epoch。这有点类似于Raft里面Term的机制。
      这是Sentinel自己的属性,而failover_epoch作为当前被监视的Master的epoch,两个是无法一一对应的。所以说导致有这两个看上去若即若离的对象。
      sentinelProcessHelloMessage里面会更新。
  10. role_reported
    注意,我们通过flag & SRI_MASTER判断是不是Master。
    但是这个是在INFO里面上报的信息。
    如果role发生变化了,就需要更新role_reported_time
    如果说原来的role是SRI_SLAVE,还需要更新slave_conf_change_time

各种索引/遍历sentinelRedisInstance的函数梳理

Redis Sentinel提供了多个函数用来索引/遍历sentinelRedisInstance

  1. getSentinelRedisInstanceByAddrAndRunID
    根据ip:portrunid来寻找Instance,这个要三个都相等。但是如果指定了runid或者ipNULL,那么它也可以不参与比较。
    其原理是获得dict *instances的迭代器,然后遍历一次以比较。
  2. sentinelGetMasterByName
    这个是根据name找,所以直接是dictFind+dictGetValue
  3. sentinelRedisInstanceLookupSlave
    这个是根据Slave的ip:port,从Master的slaves里面找对应的Instance。
    看上去这类似getSentinelRedisInstanceByAddrAndRunID也要遍历,但由于Slave的特殊取名规则,所以事实上直接生成name,dictFind即可。

各种time梳理

Instance

  1. last_pub_time
  2. last_hello_time
    我们上一次从Pub/Sub听到这个Sentinel的Hello时间(所以只对SRI_SENTINEL有效)。
    在sentinelProcessHelloMessage里面更新。
  3. last_master_down_reply_time
    SENTINEL is-master-down的最后一次回复的时间。
  4. s_down_since_time/o_down_since_time
  5. down_after_period
  6. info_refresh
  7. role_reported_time
  8. slave_conf_change_time
  9. master_link_down_time
  10. slave_reconf_sent_time
  11. failover_state_change_time
    故障转移中状态变更的时间
  12. failover_start_time
    下一次FailOver开启时间,一般更新于:
    1. 当我们投了票:sentinelVoteLeader
    2. 当我们开启了FailOver:sentinelStartFailover
  13. failover_delay_logged

Link

  1. cc_conn_time/pc_conn_time
  2. pc_last_activity
  3. last_avail_time
    上次对端对我们发送的PING进行回复的时间,注意这个回复我们要认为是合法的才计算。
  4. act_ping_time
    根据注释,最后一个PING发出的时间,如果为0,表示刚收到一个PONG,还没有来得及发送PING,详见后文的介绍。
  5. last_ping_time
    根据注释,是我们上一次发送PING的时间。
    相比act_ping_time,它的作用是避免在一段时间内发送较多的PING。
  6. last_pong_time
  7. last_reconn_time

各种timeout梳理

  1. failover_timeout
    默认为3分钟的故障转移超时时间。
  2. election_timeout
    是默认值和failover_timeout之间的最小值。

sentinelState类和sentinel对象

sentinelState是一个全局单例,表示全局上下文,我们一般用sentinel.*来访问这个单例的字段。

1
2
3
4
struct sentinelState {
char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
uint64_t current_epoch; /* Current epoch. */
...

masters是从实例名到sentinelRedisInstance指针的映射。

1
2
3
4
5
6
7
8
...
dict *masters;
int tilt; /* Are we in TILT mode? */
int running_scripts; /* Number of scripts in execution right now. */
mstime_t tilt_start_time; /* When TITL started. */
mstime_t previous_time; /* Last time we ran the time handler. */
list *scripts_queue; /* Queue of user scripts to execute. */
...

如果给定announce_ip:announce_port,那么在gossip(实际上就是发送Hello消息)的时候,会用这个地址而不是基于anetSockName函数检测的本地地址。

1
2
3
4
...
char *announce_ip;
int announce_port;
...

用来支持SIMULATE-FAILURE命令。

1
2
3
4
5
...
unsigned long simfailure_flags; /* Failures simulation. */
int deny_scripts_reconfig; /* Allow SENTINEL SET ... to change script
paths at runtime? */
} sentinel;

【辨析】sentinelState和sentinelRedisInstance的区别

在阅读源码的过程中,有一个混淆点,也就是说具有SRI_SENTINEL标识的sentinelRedisInstancesentinelState有什么区别呢?
其实,sentinelState表示这个Sentinel进程的描述的Sentinel的上下文,而各个sentinelRedisInstance表示这个Sentinel进程监视的所有实例,为了监视这些实例,需要从本进程创建连接。而具有SRI_SENTINEL标识的sentinelRedisInstance就表示这个进程监视的Sentinel实例。

初始化

消息

主要解决下面的问题:

  1. Sentinel 如何交流
  2. Sentinel 如何发现某个 Master 的所有 Slave

    INFO消息

    这个消息是由哨兵节点发送给Master/Slave的,目的是:
  3. 发现Slave节点
  4. 确认主从关系

这个消息会用sentinelRefreshInstanceInfo这个大函数来处理。

PING消息

检查实例(Master/Slave/Sentinel)是否超时/返回错误,从而决定是否进入SDOWN状态。

PUB/SUB __sentinel__:hello 频道

现在,我们有了INFO来检测节点的复杂状态,PING来检测节点心跳。那么如何让所有检测当前Master的Sentinel来互相发现和交流和交流呢?
因此,有了这个用来广播Hello的频道__sentinel__:hello,其作用是:

  1. 广播自己(Sentinel)的存在
  2. 发送Master的当前配置给其他Sentinel用于同步
    【一个疑问】看sentinelReconnectInstance的代码,Sentinel并没有订阅__sentinel__:hello频道啊?Redis文档中却强调

    Every Sentinel is subscribed to the Pub/Sub channel __sentinel__:hello of every master and replica, looking for unknown sentinels. When new sentinels are detected, they are added as sentinels of this master. (注意,这里的replica应该是由于zz正确的关系改名的,其实就是Slave。)
    我在爆栈网提了个问题。 但其实这个原因是这样的,sentinelReconnectInstance的代码表示这个Sentinel没有订阅自己所监视的Sentinel的__sentinel__:hello频道,它是订阅的自己监视的Master/Slave的__sentinel__:hello频道

这个消息的格式是:

1
2
0           1             2              3             4           5         6           7
sentinel_ip,sentinel_port,sentinel_runid,current_epoch,master_name,master_ip,master_port,master_config_epoch

这个协议在sentinelSendHello中通过PUBLISH命令发送,每个Sentinel在sentinelProcessHelloMessage里面处理这些消息。

此外,注意sentinelEvent这类事件机制,也可能通过PUB/SUB的方式广播,但不属于hello频道了。客户端可以订阅Sentinel的其他消息,例如配置变更信息

1
+switch-master <master name> <oldip> <oldport> <newip> <newport>

连接

相比3.0版本,目前版本的Redis把连接相关的逻辑都放到了instanceLink里面。
Redis Sentinel会对每个Instance维护2个Hiredis连接ccpc,其中cc用于发送命令,而pc用来做Pub/Sub。
为什么不直接用一个连接呢?有一种解释是为了防止command连接断开时,丢失广播的消息

1
2
3
typedef struct instanceLink {
redisAsyncContext *cc; /* Hiredis context for commands. */
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */

此外,如果我们用过redis-cli,就会发现,在键入PUBLISH命令之后,整个cli会被阻塞,从而无法再发送其他命令。但通过Hiredis客户端访问时,使用非Async命令,其实也不会阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include "server.h"
#include "hiredis.h"
#include "async.h"

#include <ctype.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <unistd.h>
#include <adapters/libev.h>

int main(){
redisContext * ctx = redisConnect("127.0.0.1", 6379);
redisReply * reply = redisCommand(ctx, "SUBSCRIBE hello");
printf("retval %d %d\n",reply->len,reply->elements);
for(int i = 0; i < reply->elements; i++){
printf("==> %s\n", reply->element[i]->str);
}
reply = redisCommand(ctx,"set a 1");
reply = redisCommand(ctx, "set a");
printf("retval 2 %d %d\n",reply->len,reply->elements);
redisFree(ctx);
}

通过Async指令,也不会

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
if (reply == NULL) return;
printf("argv[%s]: %s\n", (char*)privdata, reply->str);
redisAsyncDisconnect(c);
}
int main () {
uv_loop_t* loop = uv_default_loop();
redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
redisLibuvAttach(c,loop);
redisAsyncCommand(c, NULL, NULL, "subscribe");
redisAsyncCommand(c, NULL, NULL, "SET key a", argv[argc-1], strlen(argv[argc-1]));
uv_run(loop, UV_RUN_DEFAULT);
return 0;
}

提供Makefile如下,注意需要配置uv、ev和lua-devel

1
2
3
4
5
pubsub_test: pubsub_test.c $(REDIS_SERVER_NAME) $(REDIS_CHECK_AOF_NAME)
$(REDIS_LD) $(FINAL_CFLAGS) pubsub_test.c -o pubsub_test ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a -lpthread -luv -lev $(FINAL_LIBS)

p2ubsub_test: p2ubsub_test.c $(REDIS_SERVER_NAME) $(REDIS_CHECK_AOF_NAME)
$(REDIS_LD) $(FINAL_CFLAGS) p2ubsub_test.c -o p2ubsub_test ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a -lpthread -luv -lev $(FINAL_LIBS)

连接的概述

连接需要解决这三个问题:

  1. 如何连接到监视的Master?
    我们会创建到我们监视的Master的cc/pc连接。
    其中,cc连接表示Client连接,pc连接表示PubSub连接。
  2. 如何连接到监视的Slave?
    我们会创建到我们监视的Slave的cc/pc连接。
    可是,我们如何发现某个Master的新的Slave的呢?答案是通过监视的Master的INFO命令的返回来获取的,我们可以在源码中搜索"+slave"来追踪这个过程。
  3. 如何连接到监视的Sentinel?
    我们会创建到我们监视的Sentinel的cc连接,不会创建pc连接。
    答案是通过__sentinel__:hello来获取的。在sentinelReconnectInstance函数中会用SUBSRIBE订阅所有的Master/Slave的连接。这个命令的回调是sentinelReceiveHelloMessages,每当有消息过来,这个函数会被触发,从而来更新Sentinel。

连接的基础:Hiredis

Sentinel使用Hiredis中提供的Async系列方法进行通信。

1
2
3
4
// deps/hiredis/async.h

redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr);
int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...);

初始化

Link使用下面方式进行初始化

1
2
3
4
5
6
/* Create a not yet connected link object. */
instanceLink *createInstanceLink(void) {
instanceLink *link = zmalloc(sizeof(*link));

link->refcount = 1;
...

我们讨论下到底在哪些情况下disconnected会为1:

  1. link刚创建
  2. link被关闭:instanceLinkCloseConnection
  3. link出错:instanceLinkConnectionError
1
2
3
4
5
6
7
8
9
10
...
link->disconnected = 1;
link->pending_commands = 0;
link->cc = NULL;
link->pc = NULL;
link->cc_conn_time = 0;
link->pc_conn_time = 0;
link->last_reconn_time = 0;
link->pc_last_activity = 0;
...

act_ping_time设置为现在的时间,即使我们现在既没有建立连接,也没有ping。这个常用在我们没有连接成功时产生超时。

1
2
3
4
5
6
7
...
link->act_ping_time = mstime();
link->last_ping_time = 0;
link->last_avail_time = mstime();
link->last_pong_time = mstime();
return link;
}

解析地址

createSentinelAddr这个函数用来把hostname:port解析为sentinelAddr*,稍后将会设置到ri->addr上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* Create a sentinelAddr object and return it on success.
* On error NULL is returned and errno is set to:
* ENOENT: Can't resolve the hostname.
* EINVAL: Invalid port number.
*/
sentinelAddr *createSentinelAddr(char *hostname, int port) {
char ip[NET_IP_STR_LEN];
sentinelAddr *sa;

if (port < 0 || port > 65535) {
errno = EINVAL;
return NULL;
}
if (anetResolve(NULL,hostname,ip,sizeof(ip)) == ANET_ERR) {
errno = ENOENT;
return NULL;
}
sa = zmalloc(sizeof(*sa));
sa->ip = sdsnew(ip);
sa->port = port;
return sa;
}

sentinelReconnectInstance

每一次sentinelHandleRedisInstance的事件都会调用sentinelReconnectInstance来尝试Reconnect(如果断线了的话)。
sentinelReconnectInstance函数中,在3.0版本中,检查如果对sentinelRedisInstance *ri尚未建立有网络连接,则调用redisAsyncConnect等函数建立网络连接。但现在进行了一些修改。
注意,只要ccpc之间有一个连接断掉了,那么ri->link->disconnected就是true了。
这里补充说明一下,在3.0版本中,断线是作为一个单独的flag即SRI_DISCONNECTED展示的。但在目前版本中,整个连接相关的都放在link里面了,因此ri->link->disconnected也在里面。

1
2
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
if (ri->link->disconnected == 0) return;

port == 0是无效地址,那么你一开始在createSentinelAddr里面判断一下不就行了么?我想这个应该是因为port = 0表示内核为我们分配一个端口,在TCP/IP层面是有意义的,但是我们这里赋予了port = 0特殊的意义,在下面sentinelProcessHelloMessage的讲解中会进行说明。

1
2
3
...
if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
...

我们对断线重连也要设置interval,为SENTINEL_PING_PERIOD

1
2
3
4
5
6
7
...
instanceLink *link = ri->link;
mstime_t now = mstime();

if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
ri->link->last_reconn_time = now;
...

下面处理cc连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
/* Commands connection. */
if (link->cc == NULL) {
link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
if (!link->cc->err && server.tls_replication &&
(instanceLinkNegotiateTLS(link->cc) == C_ERR)) {
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to initialize TLS");
instanceLinkCloseConnection(link,link->cc);
} else if (link->cc->err) {
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
link->cc->errstr);
instanceLinkCloseConnection(link,link->cc);
} else {
link->pending_commands = 0;
link->cc_conn_time = mstime();
link->cc->data = link;
redisAeAttach(server.el,link->cc);
...

下面是两个回调函数,它们的作用都是在连接失败的时候调用instanceLinkConnectionError,将这个连接置为NULL,并且设置disconnected

1
2
3
4
5
6
7
8
9
10
11
12
13
...
redisAsyncSetConnectCallback(link->cc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->cc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,link->cc);
sentinelSetClientName(ri,link->cc,"cmd");

/* Send a PING ASAP when reconnecting. */
sentinelSendPing(ri);
}
}
...

对于Master和Slave节点,我们处理pc连接。这里我有点疑问,Sentinel肯定需要连接PubSub的发送Hello消息和配置的啊,为啥这里不初始化pc,甚至Sentinel之间都不用pc呢?
首先,我们注意到在sentinelSendHello中,SENTINEL_HELLO_CHANNEL这个通道的消息是通过cc而不是pc发送的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
...
/* Pub / Sub */
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
if (!link->pc->err && server.tls_replication &&
(instanceLinkNegotiateTLS(link->pc) == C_ERR)) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to initialize TLS");
} else if (link->pc->err) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
link->pc->errstr);
instanceLinkCloseConnection(link,link->pc);
} else {
int retval;

link->pc_conn_time = mstime();
link->pc->data = link;
redisAeAttach(server.el,link->pc);
redisAsyncSetConnectCallback(link->pc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->pc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,link->pc);
sentinelSetClientName(ri,link->pc,"pubsub");
/* Now we subscribe to the Sentinels "Hello" channel. */
...

下面的命令发出一个SUBSCRIBE __sentinel__:hello指令,并且用sentinelReceiveHelloMessages作为回调。
真相水落石出,原来pc端口是专门用来接收Hello信息的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "%s %s",
sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
SENTINEL_HELLO_CHANNEL);
if (retval != C_OK) {
/* If we can't subscribe, the Pub/Sub connection is useless
* and we can simply disconnect it and try again. */
instanceLinkCloseConnection(link,link->pc);
return;
}
}
}
/* Clear the disconnected status only if we have both the connections
* (or just the commands connection if this is a sentinel instance). */
if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
link->disconnected = 0;
}
...

连接断线和宕机有什么区别?

我们首先看看link->disconnected什么时候是1:

  1. 刚创建连接
  2. 关闭连接调用instanceLinkCloseConnection
    releaseInstanceLink
    sentinelUpdateSentinelAddressInAllMasters
    sentinelResetMaster
    sentinelReconnectInstance
    sentinelCheckSubjectivelyDown【?】
  3. instanceLinkConnectionError
    sentinelLinkEstablishedCallback
    sentinelDisconnectCallback

入口

Redis的时钟中断处理例程serverCronserver.hz为时间间隔触发。在这个函数中会异步处理很多逻辑,例如:

  1. 【databasesCron】主动expire key,当然也有在look up的时候lazy expire的,这个lazy逻辑我们介绍过
  2. Software watchdog:watchdogScheduleSignal
  3. 更新一些统计,例如cronUpdateMemoryStats
  4. 【databasesCron】调用dictRehashMilliseconds
  5. 触发BGSAVE/AOF
  6. 【clientsCron】客户端超时
  7. 【replicationCron】Replication相关

run_with_period表示每隔多少秒执行一次。

1
2
3
4
5
6
7
8
// server.h
#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
...
if (server.sentinel_mode) sentinelTimer();
}

sentinelTimer是Sentinel的主例程。

1
2
3
4
5
6
7
void sentinelTimer(void) {
sentinelCheckTiltCondition();
sentinelHandleDictOfRedisInstances(sentinel.masters);
sentinelRunPendingScripts();
sentinelCollectTerminatedScripts();
sentinelKillTimedoutScripts();
...

我们持续变更Redis的”timer interrupt”的频率,这样每个Sentinel的频率都是不同的。这种不确定性避免了Sentinel在同一个时间启动,同一时间发起投票,从而导致脑裂。当然,后面还会看到在设置failover_start_time时,给每个选举epoch也设置了不同的时长,这应该是也是避免此类问题的一个思路。
我们的CONFIG_DEFAULT_HZ默认是10,表示一秒钟触发10次。

1
2
3
...
server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}

主体流程综述

这些流程的时间线可能是互相重叠的。

检测S/ODOWN流程

  1. 定期发送心跳
    发送是在sentinelSendPeriodicCommands中。
  2. 检查有没有SDOWN
    sentinelCheckSubjectivelyDown
    注意,SDOWN的检测对Master、Slave和Sentinel都起作用。
    一般SDOWN有几种情况:
    1. 连接超时
      取决于ri->link->act_ping_time距离现在的时间。
    2. 连接关闭
      取决于断线,即ri->link->disconnected后,ri->link->last_avail_time距离现在的时间。
    3. Master报告要变成Slave,但是在一定时间内没有完成这个过程
      也就是说ri->role_reported是Slave,但是ri->flags还是SRI_MASTER
  3. 如果是Master,检查有没有ODOWN
    在发现了SDOWN之后,Sentinel要做下面的事:
    1. 发起的Sentinel询问其他Sentinel节点它们有没有觉这个节点SDOWN了
      sentinelAskMasterStateToOtherSentinels
      实际上就是发送is-master-down-by-addr命令,但这里参数里面不带自己的runid,而是传入*
    2. 其他Sentinel节点回复消息(投票)
      sentinelCommand里面对sentinelAskMasterStateToOtherSentinels的回复。
    3. 发起的Sentinel处理投票消息
      sentinelReceiveIsMasterDownReply
    4. 发起的Sentinel统计票数
      sentinelHandleRedisInstance里面的第二个sentinelCheckObjectivelyDown
      如何判断是否ODOWN,实际上就是统计具有SRI_MASTER_DOWN标签的实例数量。
    【Q】如果是Slave/Sentinel,为什么不需要?我们判断ODOWN的目的是决定是否对一个节点进行FailOver操作。所以并不需要对Slave和Sentinel进行FailOver。例如,一个Slave挂掉了,对于Sentinel的充其量无非就是不选择这个节点来promote了。

FailOver流程

基于官方文档整理:

  1. 检查Master是否已经ODOWN
    同上面的流程
  2. 检查自己是否需要开启FailOver
    这一部分是在sentinelStartFailoverIfNeeded中实现的,这个函数被sentinelHandleRedisInstance调用。
    在这里需要检查master->failover_timeout*2的要求,对于下面的两种情况就不能发起选举
    1. 如果自己在这个时间内投过票
      【Q】这是在哪里检查?这个应该是sentinelVoteLeader里面会设置failover_start_time
    2. 如果自己在这个时间内发起过FailOver
  3. 自增current_epoch进行,并开始竞选。
    这一部分逻辑以sentinelStartFailover开始,它设置FailOver状态SRI_FAILOVER_IN_PROGRESS,从而开启FailOver状态机。
    之后FailOver状态机函数sentinelFailoverStateMachine会发挥作用。
    然后,发送带runid的is-master-down-by-addr,这要和ODOWN状态的is-master-down-by-addr进行区分。
    更新failover_start_time,这个和election_timeout一并用来控制选举超时时刻
  4. 竞选部分
    详见“选举流程”的说明。
  5. 如果竞选失败,则在master->failover_timeout*2之后,重新尝试竞选。
  6. 如果竞选成功,则
    1. 选出一个Slave,并将它升级为Master
      这个选出的Slave被存放到promoted_slave里面。
      详情见下面“选择合适的Slave”的说明。
      在选定Slave之后,FailOver状态切换为SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
    2. 向被选中的Slave即promoted_slave发送SLAVEOF NO ONE,让它转变为Master
      在发送完SLAVEOF之后,FailOver状态会被切换到SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
      然后,我们会在sentinelRefreshInstanceInfo中监听这个节点的变化,如果这个转换是成功的,会看到role的转换。那么就可以将状态切换到SENTINEL_FAILOVER_STATE_RECONF_SLAVES了。否则,在sentinelFailoverWaitPromotion中就会让这个FailOver过程失败掉。
    3. 通知所有Sentinel更新配置
      通过Pub/Sub,将更新后的配置传播给所有其他Sentinel,其他Sentinel对它们自己的配置进行更新。
      【Q】应该通过这个函数来处理sentinelForceHelloUpdateForMaster
    4. 通知所有Slave更新Master
      向已下线Master的其他Slave发送SLAVEOF host port命令,让它们去复制新的Master。
      【Q】应该是sentinelFailoverReconfNextSlave
    5. 当所有Slave都已经开始复制新的Master时,Leader终止这次FailOver操作。
      对应于sentinelFailoverSwitchToPromotedSlave。主要是重新设置mastermaster->slaves到新的地址。期间会调用到sentinelResetMaster,这个函数会重置failover_state

选举流程

选举流程是嵌套在FailOver流程中的,但是又自成体系,所以拿出来单独说:

  1. 检查自己是否可以竞选【同FailOver流程的相同步骤】
  2. 自增current_epoch进行,并开始竞选【同FailOver流程的相同步骤】
    在这里需要发送带runid的is-master-down-by-addr,表示请求其他Sentinel投票。
  3. 给自己投票
    这是在sentinelVoteLeader里面的。
    设置master->leadermaster->leader_epoch为自己。这里leader是在master下面的一个变量,这也暗指,Sentinel能同时处理多个Master的FailOver
  4. 其他Sentinel处理Candidate的is-master-down-by-addr
    在sentinelCommand的某个分支下面:
    1. 处理是否自己觉得SDOWN了。
    2. 如果是竞选消息,那么会调用sentinelVoteLeader来投票。
  5. Candidate在sentinelFailoverWaitStart中自旋,调用sentinelGetLeader统计票数,如果成功,会成为了Leader,进入下面FailOver的流程
    1. 如果在超时时间election_timeout内,Candidate没有获得超过quorum,自己的这次选举就失败了。
    2. 如果在超时时间内,没有一个Candidate获得更多的票数,那么等待master->failover_timeout*2后,Candidate增加epoch重新投票。由于failover_start_time在设置时会被随机增加一个值,所以导致选举超时的时间也是不定的。
  6. 与Raft协议不同,Leader并不会把自己成为Leader的消息发给其他Sentinel。
    其他Sentinel等待Leader从Slave选出Master后,检测到新的Master正常工作后,就会去掉客观下线的标识,从而不需要进入故障转移流程

容易看出选举流程借鉴了Raft的思想,但其中还是有相当的区别:

  1. Sentinel将选举和“RSM/日志”分开了,“维护RSM/日志”的是Master/Slave,而进行选举的是Sentinel
    换句话说,Sentinel管的是别的集群的FailOver,而不是自己的
  2. 选举Sentinel Leader和选择合适的Slave是分开的逻辑
    1. Sentinel Leader选举只考虑epoch,并不需要在is-master-down-by-addr中附上自己要Select成为新Master的Slave。
    2. 在Leader选举过程中,有两个共识需要达成:某个Master是否SDOWN/ODOWN;选择哪个Leader。
    3. 当Sentinel Leader被选出来之后,由它独自处理FailOver流程,包含Select Slave部分。
  3. 一个Sentinel节点能够处理多个Master-Slave集群的FailOver,而Raft协议只能处理自己集群的FailOver。
    所以说,我们能看到epoch,比如leader_epochcurrent_epoch啥的。详见“sentinelRedisInstance”这一章节中,对四个epoch的辨析。
  4. 在Raft中,Candidate竞选失败后,会过随机的时间后重新竞选,这个在Redis中并没有发现。

选择合适的Slave

这个过程,是由选举出来的Leader单独负责的。

需要辨析的点

  1. master->leader_epochsentinel.current_epoch
    见之前“sentinelRedisInstance”这一章节中,对四个epoch的辨析。

Sentinel机制的风险

1
2
min-replicas-to-write 1
min-replicas-max-lag 10

例程详解

“主体流程综述”中已经概括了FailOver等流程的主要内容了,本章节对源码进行说明。
由于sentinelTimer基本上包括了Redis Sentinel的所有逻辑,所以我们单独开一个章节来讲。

sentinelCheckTiltCondition

判定是否要进入TILT模式。当程序发现两次sentinelTimer之间的时间差为负值或者大于SENTINEL_TILT_TRIGGER = 2000时,就会进入 TILT 模式。这通常意味着:

  1. Sentinel进程被阻塞:需要载入的数据(?)大、机器IO繁重、进程被信号停止
  2. 系统时钟出现了问题

对于这种情况,会设置tilt=1,此时这个sentinel就会罚站SENTINEL_TILT_PERIOD这么长时间。

sentinelHandleDictOfRedisInstances

这个其实是Sentinel的主要逻辑。它会遍历监视的所有节点(包括Master、Slave和其他Sentinel),首先执行sentinelHandleRedisInstance。如果发现这个节点是SRI_MASTER,则对它的所有Slave和Sentinel也调用这个函数。注意这个函数并不是一个DFS或者递归。事实上他最多就两层,唯一的嵌套就是对于每个Master,找到它所有的Slave和监视它的所有Sentinel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;

/* There are a number of things we need to perform against every master. */
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

sentinelHandleRedisInstance(ri);
if (ri->flags & SRI_MASTER) {
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}
...

当一个Slave在SENTINEL_FAILOVER_STATE_UPDATE_CONFIG时,说明此时所有Slave已经完成同步,要进行配置更新。此时需要调用sentinelFailoverSwitchToPromotedSlave函数。这个是FailOver自动机的最后一步,我们将它迁移到后面进行讲解。

1
2
3
4
5
...
if (switch_to_promoted)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}

sentinelHandleRedisInstance 监控部分

1
2
3
4
5
6
7
/* Perform scheduled operations for the specified Redis instance. */
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* ========== MONITORING HALF ============ */
/* Every kind of instance */
sentinelReconnectInstance(ri);
sentinelSendPeriodicCommands(ri);
...

sentinelReconnectInstance

见上文连接部分

sentinelSendPeriodicCommands

sentinelSendPeriodicCommands负责向sentinelRedisInstance *ri发送命令。
这里的注释说link是一个instanceLink对象。这个对象的作用是为了节约hiredis连接建立的数量,如果有5个Sentinel监视100个Master,那么会创建5个连接而不是500个连接。这一段有点令人费解,因为就算是这5个Sentinel共享链接,那也要它们两两互联的5*4共同对外的100个连接啊?这个查看sentinelTryConnectionSharing会得到解答。TODO

1
2
3
4
5
6
7
8
9
10
11
/* Send periodic PING, INFO, and PUBLISH to the Hello channel to
* the specified master or slave instance. */
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;

/* Return ASAP if we have already a PING or INFO already pending, or
* in the case the instance is not properly connected. */
if (ri->link->disconnected) return;
...

因为INFO/PING/PUBLISH都不是极端重要的指令,所以我们对pending_commands设定了一个上限SENTINEL_MAX_PENDING_COMMANDS。当然还有一个兜底的情况就是当命令太多了,连接就会断掉触发重连。

1
2
3
4
5
...
// SENTINEL_MAX_PENDING_COMMANDS = 100
if (ri->link->pending_commands >=
SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
...

如果ri是一个Slave节点,并且它对应的Master节点出于ODOWN,或者FAILOVER状态,或者,那么我们应该加快发送INFO的频率到1000ms一次,否则就是SENTINEL_INFO_PERIOD = 10000ms,此时Slaves中会有被其他Sentinel或者sysadmin提升为Master的情况。此外,当master_link_down_time不等于0(应该是Slave从Master断开了,导致Replication过程中断),也要更频繁的INFO,从而有一个更加清楚的断连时间记录。

1
2
3
4
5
6
7
8
9
10
...
if ((ri->flags & SRI_SLAVE) &&
((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
(ri->master_link_down_time != 0)))
{
info_period = 1000;
} else {
info_period = SENTINEL_INFO_PERIOD;
}
...

下面设置PING指令的间隔。
down_after_period表示实例经过down_after_period之后会被认为SDOWN。
这个值等于master->down_after_period,如果自己是Master的话,就取SENTINEL_DEFAULT_DOWN_AFTER = 30000

1
2
3
4
5
6
7
8
...
/* We ping instances every time the last received pong is older than
* the configured 'down-after-milliseconds' time, but every second
* anyway if 'down-after-milliseconds' is greater than 1 second. */
// #define SENTINEL_PING_PERIOD 1000
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
...

下面调用redisAsyncCommand对发送INFO,我们不需要向Sentinel发送INFO,这是因为我们不需要知道Sentinel的Slave和主从关系(其实也没有)。如果info_refresh为0表示我们从来没收到过INFO消息。

1
2
3
4
5
6
7
8
9
10
11
12
...
/* Send INFO to masters and slaves, not sentinels. */
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"INFO"));
if (retval == C_OK) ri->link->pending_commands++;
}
...

下面是发送PING,我们同样也要PING。相比3.0版本,现在对于last_ping_time超过ping_period/2的情况也会发送PING了。这样的目的是什么呢?

1
2
3
4
5
6
7
...
/* Send PING to all the three kinds of instances. */
if ((now - ri->link->last_pong_time) > ping_period &&
(now - ri->link->last_ping_time) > ping_period/2) {
sentinelSendPing(ri);
}
...

下面通过PubSub发送一个Hello消息。这个消息是为了达成在“消息”章节中提到的两个目标。我们将在后面详细解读。

1
2
3
4
5
6
...
/* PUBLISH hello messages to all the three kinds of instances. */
if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
sentinelSendHello(ri);
}
}

【接上】sentinelInfoReplyCallback

INFO命令的回复会被注册到来处理

1
2
3
4
5
6
7
8
9
10
11
12
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

if (r->type == REDIS_REPLY_STRING)
sentinelRefreshInstanceInfo(ri,r->str);
}

这个函数的主干实际上就是调用sentinelRefreshInstanceInfo,这个函数超级大,用来解析INFO返回的信息,我们不在这里全部列举出来。在sentinelFailoverWaitPromotion等地方,我们会将这个函数中涉及的相关部分拎出来讲。

sentinelSendHello

这个函数通过Pub/Sub发送Hello指令到ri,完成“消息”章节中提到的两个目标。消息的格式如下。

1
sentinel_ip,sentinel_port,sentinel_runid,current_epoch,master_name,master_ip,master_port,master_config_epoch

如果PUBLISH指令成功入队,那么就返回成功。
在3.0版本中,通过anetSockName直接获得ip,直接用server.port。但在这里会复杂一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int sentinelSendHello(sentinelRedisInstance *ri) {
char ip[NET_IP_STR_LEN];
char payload[NET_IP_STR_LEN+1024];
int retval;
char *announce_ip;
int announce_port;
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);

if (ri->link->disconnected) return C_ERR;

/* Use the specified announce address if specified, otherwise try to
* obtain our own IP address. */
if (sentinel.announce_ip) {
announce_ip = sentinel.announce_ip;
} else {
if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1)
return C_ERR;
announce_ip = ip;
}
if (sentinel.announce_port) announce_port = sentinel.announce_port;
else if (server.tls_replication && server.tls_port) announce_port = server.tls_port;
else announce_port = server.port;
...

下面我们生成要广播的配置,其格式是

1
2
sentinel_ip,sentinel_port,sentinel_runid,current_epoch         ,master_name ,master_ip      ,master_port      ,master_config_epoch
announce_ip,announce_port,sentinel.myid ,sentinel.current_epoch,master->name,master_addr->ip,master_addr->port,master->config_epoch

下面就是执行一个PUBLISH命令。有趣的是这个PUBLISH是通过cc而不是pc发送的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
/* Format and send the Hello message. */
snprintf(payload,sizeof(payload),
"%s,%d,%s,%llu," /* Info about this sentinel. */
"%s,%s,%d,%llu", /* Info about current master. */
announce_ip, announce_port, sentinel.myid,
(unsigned long long) sentinel.current_epoch,
/* --- */
master->name,master_addr->ip,master_addr->port,
(unsigned long long) master->config_epoch);
retval = redisAsyncCommand(ri->link->cc,
sentinelPublishReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"PUBLISH"),
SENTINEL_HELLO_CHANNEL,payload);
if (retval != C_OK) return C_ERR;
ri->link->pending_commands++;
return C_OK;
}

【接上】sentinelReceiveHelloMessages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/* This is our Pub/Sub callback for the Hello channel. It's useful in order
* to discover other sentinels attached at the same master. */
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
redisReply *r;
UNUSED(c);

if (!reply || !ri) return;
r = reply;

/* Update the last activity in the pubsub channel. Note that since we
* receive our messages as well this timestamp can be used to detect
* if the link is probably disconnected even if it seems otherwise. */
ri->link->pc_last_activity = mstime();

/* Sanity check in the reply we expect, so that the code that follows
* can avoid to check for details. */
if (r->type != REDIS_REPLY_ARRAY ||
r->elements != 3 ||
r->element[0]->type != REDIS_REPLY_STRING ||
r->element[1]->type != REDIS_REPLY_STRING ||
r->element[2]->type != REDIS_REPLY_STRING ||
strcmp(r->element[0]->str,"message") != 0) return;

/* We are not interested in meeting ourselves */
if (strstr(r->element[2]->str,sentinel.myid) != NULL) return;

sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
}

sentinelProcessHelloMessage

这个函数会处理包括新Sentinel发现、Sentinel地址变更、TODO之类的逻辑。
这个函数在Master/Slave实例中处理收到的Hello信息,或者sent directly to this sentinel via the (fake) PUBLISH command of Sentinel.
再次复习一下消息的格式

1
2
0           1             2              3             4           5         6           7
sentinel_ip,sentinel_port,sentinel_runid,current_epoch,master_name,master_ip,master_port,master_config_epoch

下面看具体函数,主要过程是:

  1. 找到监控的master
  2. 尝试加入新Sentinel
    1. 检查runid
    2. 检查ip:port
  3. 尝试更新current_epoch
1
2
3
4
5
6
7
8
9
void sentinelProcessHelloMessage(char *hello, int hello_len) {
int numtokens, port, removed, master_port;
uint64_t current_epoch, master_config_epoch;
char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
sentinelRedisInstance *si, *master;

if (numtokens == 8) {
/* Obtain a reference to the master this hello message is about */
...

首先根据master_name找到我们Sentinel中**对应的Instance对象master**,如果找不到,就忽略这个请求。
【Q】 TODO所以我蛮好奇这个频道像一个大杂烩一样的,会不会造成处理的开销;抑或,我们可以对每一个Master建立一个单独的频道,因为Pub/Sub是允许pattern matching的。

1
2
3
4
5
6
7
8
...
master = sentinelGetMasterByName(token[4]);
if (!master) goto cleanup; /* Unknown master, skip the message. */

/* First, try to see if we already have this sentinel. */
port = atoi(token[1]);
master_port = atoi(token[6]);
...

定位到这个Hello消息涉及的Master节点后,我们尝试更新监视这个Master的Sentinel列表。于是,我们去找符合sentinel_ip:sentinel_portmaster_name的Sentinel节点si。这里si是一个sentinelRedisInstance,表示sentinel_ip:sentinel_port这个Sentinel节点,也就是我们消息的发送方。

1
2
3
4
5
6
...
si = getSentinelRedisInstanceByAddrAndRunID(
master->sentinels,token[0],port,token[2]);
current_epoch = strtoull(token[3],NULL,10);
master_config_epoch = strtoull(token[7],NULL,10);
...

如果没找到,说明这是一个新发现的Sentinel,我们需要加到master->sentinels里面。在加入之前,需要做一些额外处理。首先,要移除掉所有具有相同runid的Sentinel,也就是说,可能有runid相同的Sentinel,但是他们的ip:port不同。这是因为可能同一个Sentinel的地址变了,我们要runid为准来区分不同的Sentinel。
这个过程需要扫一遍master->sentinels

1
2
3
4
5
6
7
...
if (!si) {
/* If not, remove all the sentinels that have the same runid
* because there was an address change, and add the same Sentinel
* with the new address back. */
removed = removeMatchingSentinelFromMaster(master,token[2]);
...

removeMatchingSentinelFromMaster函数会返回removed表示实际上移除了多少个节点。

1
2
3
4
5
6
...
if (removed) {
sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
"%@ ip %s port %d for %s", token[0],port,token[2]);
} else {
...

之前,我们检查了是否有runid相同的实例。然后,我们还要检查sentinels是否还有ip:port相同的Sentinel实例。

1
2
3
4
5
6
7
8
9
...
/* Check if there is another Sentinel with the same address this
* new one is reporting. What we do if this happens is to set its
* port to 0, to signal the address is invalid. We'll update it
* later if we get an HELLO message. */
sentinelRedisInstance *other =
getSentinelRedisInstanceByAddrAndRunID(
master->sentinels, token[0],port,NULL);
...

如果有的话,我们要把这个旧的实例的port改为0,从而表示这个地址是Invalid的。然后,我们要调用sentinelUpdateSentinelAddressInAllMastersother进行修改,这个修改的目的是把所有Master中other这个Sentinel的连接断开。

1
2
3
4
5
6
7
8
...
if (other) {
sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
other->addr->port = 0; /* It means: invalid address. */
sentinelUpdateSentinelAddressInAllMasters(other);
}
}
...

新建一个runid的token[2]的Sentinel,并使用请求发来的sentinel_ip:sentinel_port作为地址。token[2]是传过来的runid,在createSentinelRedisInstance中会被设置成Sentinel的name
createSentinelRedisInstance函数会直接把si加到master->sentinel里面。

1
2
3
4
5
...
/* Add the new sentinel. */
si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
token[0],port,master->quorum,master);
...

下面这个函数就是之前提到的500个连接复用到5个的函数。

1
2
3
4
5
6
7
8
9
...
if (si) {
if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
/* The runid is NULL after a new instance creation and
* for Sentinels we don't have a later chance to fill it,
* so do it now. */
si->runid = sdsnew(token[2]);
sentinelTryConnectionSharing(si);
...

下面又一次调用sentinelUpdateSentinelAddressInAllMasters,其目的是和上面的sentinelUpdateSentinelAddressInAllMasters是类似的。但是上面的调用是用来处理!removed的情况下,如果还有ip:port相同的实例的情况。而这个是处理remove的情况,也就是说已经找到一个runid相同的。

1
2
3
4
5
6
...
if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
sentinelFlushConfig();
}
}
...

接着,我们需要尝试通过对端的current_epoch来更新自己的current_epoch

1
2
3
4
5
6
7
8
9
...
/* Update local current_epoch if received current_epoch is greater.*/
if (current_epoch > sentinel.current_epoch) {
sentinel.current_epoch = current_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}
...

master_config_epoch字段,来源于其他Sentinel在发消息时传输的自己的master->config_epoch字段。通过比较远端的和自己的master->config_epoch,如果我们的较小,则需要更新Master的配置信息。

1
2
3
4
5
6
...
/* Update master info if received configuration is newer. */
if (si && master->config_epoch < master_config_epoch) {
// 我们同样将`config_epoch`同步到最新。
master->config_epoch = master_config_epoch;
...

如果我们发现对方传来的Master的地址token[5]:master_port和我们的地址master->addr是不同的,就TODO。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
...
if (master_port != master->addr->port ||
strcmp(master->addr->ip, token[5]))
{
sentinelAddr *old_addr;

sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
sentinelEvent(LL_WARNING,"+switch-master",
master,"%s %s %d %s %d",
master->name,
master->addr->ip, master->addr->port,
token[5], master_port);

old_addr = dupSentinelAddr(master->addr);
sentinelResetMasterAndChangeAddress(master, token[5], master_port);
sentinelCallClientReconfScript(master,
SENTINEL_OBSERVER,"start",
old_addr,master->addr);
releaseSentinelAddr(old_addr);
}
}

/* Update the state of the Sentinel. */
if (si) si->last_hello_time = mstime();
}

cleanup:
sdsfreesplitres(token,numtokens);
}
...

sentinelUpdateSentinelAddressInAllMasters

当我们发现某一个Sentinel在Hello信息中报告了一个新的地址,我们就需要在所有Master中更新这个Sentinel的上下文,并进行重连。返回有多少地址发生了重连。

1
2
3
4
5
6
7
8
9
int sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_SENTINEL);
dictIterator *di;
dictEntry *de;
int reconfigured = 0;

di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
...

这个用法看起来很奇怪,C语言里面的逗号表达式的含义是从左到右执行,并且以最后一个表达式的值作为返回值。我们展开来得到的是sentinelRedisInstance *master = ((de)->v.val), *match,可是我们还是不知道match是在哪里定义的。所

1
2
3
...
sentinelRedisInstance *master = dictGetVal(de), *match;
...

以我猜想它实际上是可以拆成下面这两行

1
2
sentinelRedisInstance *master = dictGetVal(de);
sentinelRedisInstance *match;

下面,我们查找这个Master中所有runid相等的Sentinel,并把它们关掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
NULL,0,ri->runid);
/* If there is no match, this master does not know about this
* Sentinel, try with the next one. */
if (match == NULL) continue;

/* Disconnect the old links if connected. */
if (match->link->cc != NULL)
instanceLinkCloseConnection(match->link,match->link->cc);
if (match->link->pc != NULL)
instanceLinkCloseConnection(match->link,match->link->pc);

if (match == ri) continue; /* Address already updated for it. */
...

释放原来的地址match->addr,复制新的地址ri->addr。这个地址包含addr和port。

1
2
3
4
5
6
7
8
9
10
11
12
        /* Update the address of the matching Sentinel by copying the address
* of the Sentinel object that received the address update. */
releaseSentinelAddr(match->addr);
match->addr = dupSentinelAddr(ri->addr);
reconfigured++;
}
dictReleaseIterator(di);
if (reconfigured)
sentinelEvent(LL_NOTICE,"+sentinel-address-update", ri,
"%@ %d additional matching instances", reconfigured);
return reconfigured;
}

sentinelTryConnectionSharing

这个函数尝试为传入的ri(它必须是一个Sentinel)分享已有的link,被分享的连接来自于我们这个Sentinel已有的对这个Master的连接。
有点难懂,我们先看下实现。这个函数会遍历所有的Master Instance,即sentinel.masters。一旦我们找到ri->master对应的实例,我们就复用这个实例的连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/*
* This function will attempt to share the instance link we already have
* for the same Sentinel in the context of a different master, with the
* instance we are passing as argument.
*
* This way multiple Sentinel objects that refer all to the same physical
* Sentinel instance but in the context of different masters will use
* a single connection, will send a single PING per second for failure
* detection and so forth.
*
* Return C_OK if a matching Sentinel was found in the context of a
* different master and sharing was performed. Otherwise C_ERR
* is returned. */
int sentinelTryConnectionSharing(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_SENTINEL);
dictIterator *di;
dictEntry *de;

if (ri->runid == NULL) return C_ERR; /* No way to identify it. */
if (ri->link->refcount > 1) return C_ERR; /* Already shared. */

di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *master = dictGetVal(de), *match;
/* We want to share with the same physical Sentinel referenced
* in other masters, so skip our master. */
if (master == ri->master) continue;
match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
NULL,0,ri->runid);
if (match == NULL) continue; /* No match. */
if (match == ri) continue; /* Should never happen but... safer. */

/* We identified a matching Sentinel, great! Let's free our link
* and use the one of the matching Sentinel. */
releaseInstanceLink(ri->link,NULL);
ri->link = match->link;
match->link->refcount++;
dictReleaseIterator(di);
return C_OK;
}
dictReleaseIterator(di);
return C_ERR;
}

【接上】sentinelPublishReplyCallback

这个命令的回调主要用来更新last_pub_time

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

/* Only update pub_time if we actually published our message. Otherwise
* we'll retry again in 100 milliseconds. */
if (r->type != REDIS_REPLY_ERROR)
ri->last_pub_time = mstime();
}

sentinelHandleRedisInstance 处理部分

首先是对tilt的处理。

1
2
3
4
5
6
7
    /* ============== ACTING HALF ============= */
if (sentinel.tilt) {
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
}
...

下面检查是否是SDOWN状态

1
2
3
4
...
/* Every kind of instance */
sentinelCheckSubjectivelyDown(ri);
...

如果是Master的话,需要通过sentinelCheckObjectivelyDown查看是否进入的ODOWN状态,并设置对应的SRI_O_DOWN位。在这之后,就需要通过sentinelStartFailoverIfNeeded查看是否需要启动FailOver了。
我们需要注意,每一次的主循环,无论是不是在FailOver/SDOWN状态,都会对监视的所有Master节点执行sentinelCheckObjectivelyDownsentinelFailoverStateMachinesentinelAskMasterStateToOtherSentinels这几个函数,所以在这些函数里面会看到,他们都会去判断是否是FailOver/ODOWN/SDOWN。

1
2
3
4
5
...
/* Masters and slaves */
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
/* Nothing so far. */
}

【Q】sentinelAskMasterStateToOtherSentinels会被以SENTINEL_ASK_FORCEDSENTINEL_NO_FLAGS这两个参数调用两次。可以发现,只要执行了sentinelStartFailoverIfNeeded并返回true,那sentinelStartFailover肯定已经被调用过了。这也就意味着此时已经ODOWN了,并且在FailOver了。为什么还要调用这个函数,问对方是不是is-master-down-by-addr呢?

  1. 调用A:sentinelStartFailoverIfNeeded返回值为true
    调用A的那一次是因为我们开启了FailOver,此时我们非常确定ODOWN了,所以加上了SENTINEL_ASK_FORCED
  2. 调用B:在sentinelFailoverStateMachine之后再调用一次
    调用B是routine work,这是必要的。比如说一个Sentinel发现SDOWN了,此时并不一定要开启FailOver,所以他需要通过这一个sentinelCheckObjectivelyDown,去询问其他Sentinel是不是这个节点DOWN掉了。收集到的SRI_MASTER_DOWN会被sentinelCheckObjectivelyDown用来判定是不是发生了ODOWN。
    如果集群状态正常的话,因为ODOWN条件不满足,在函数调用的开始就会被abort掉,我们不用担心副作用。
1
2
3
4
5
6
7
8
9
    /* Only masters */
if (ri->flags & SRI_MASTER) {
sentinelCheckObjectivelyDown(ri);
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
sentinelFailoverStateMachine(ri);
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}

sentinelCheckSubjectivelyDown

这个函数用来判断是不是SDOWN。我们判断Sentinel、Master和Slave是不是SDOWN。

首先是计算上次PING之后的未响应时间。这里比3.0的代码多了一个ri->link->disconnected的判断,这个是为什么呢?
last_avail_time在收到对PING的三个合法回复的时候会进行更新。
act_ping_time表示最后一个PING发出的时间,如果为0,表示刚收到一个PONG(见sentinelPingReplyCallback函数),并且还没有发出下一个PING。对应地,还有一个last_ping_time,这个不会在收到PONG的时候置0

1
2
3
4
5
6
7
8
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;

if (ri->link->act_ping_time)
elapsed = mstime() - ri->link->act_ping_time;
else if (ri->link->disconnected)
elapsed = mstime() - ri->link->last_avail_time;
...

如果检测到连接的活跃度(activity)很低,也就是说长时间没有反应,那么考虑断开,等待下一轮重连。这里重新连接是sentinelReconnectInstance控制的。
首先检查cc连接的活跃度,如果连接了至少SENTINEL_MIN_LINK_RECONNECT_PERIOD,并且仍然有一个pending了一半超时时间的PING,那么调用instanceLinkCloseConnection关闭连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
// #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
if (ri->link->cc &&
(mstime() - ri->link->cc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
ri->link->act_ping_time != 0 && /* There is a pending ping... */
/* The pending ping is delayed, and we did not receive
* error replies as well. */
(mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
(mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
{
instanceLinkCloseConnection(ri->link,ri->link->cc);
}
...

然后检查pc连接的活跃度,如果连接了超过SENTINEL_MIN_LINK_RECONNECT_PERIOD,并且在SENTINEL_PUBLISH_PERIOD * 3时间内没有再有活动。

1
2
3
4
5
6
7
8
9
10
...
// #define SENTINEL_PUBLISH_PERIOD 2000
if (ri->link->pc &&
(mstime() - ri->link->pc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
(mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
{
instanceLinkCloseConnection(ri->link,ri->link->pc);
}
...

下面就是SDOWN判断了,如果下面两个条件满足,设置SDOWN这个flag

  1. 对方Instance无应答
    这里的down_after_period即如下,默认是30秒。

    1
    sentinel down-after-milliseconds mymaster 60000
  2. 目前我们认为该Instance是Master
    但它报告它将成为Slave
    但经过down_after_period和两个SENTINEL_INFO_PERIOD之后,这个操作仍然没有完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
if (elapsed > ri->down_after_period ||
(ri->flags & SRI_MASTER &&
ri->role_reported == SRI_SLAVE &&
mstime() - ri->role_reported_time >
(ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
{
/* Is subjectively down */
if ((ri->flags & SRI_S_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
} else {
...

否则移除SDOWN状态。注意,经过搜寻源码,只有这个地方会移除SRI_S_DOWN这个标签。

1
2
3
4
5
6
7
8
...
/* Is subjectively up */
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
}

sentinelCheckObjectivelyDown

注意这里的ODOWN使用的是weak quorum,这里的weak体现在票数统计是在给定时间范围内,而不是在某个时间点上达成一致。这里可以了解一下strong quorum algorithm和流言协议(Gossip)。
下面这个循环遍历所有Sentinel,并且统计具有SRI_MASTER_DOWN的数量,是否达到master->quorum
【Q】容易看到,这里是直接算quorum了,那么是在哪里请求投票的呢?答案是sentinelAskMasterStateToOtherSentinels,后面会讲。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;

if (master->flags & SRI_S_DOWN) {
/* Is down for enough sentinels? */
quorum = 1; /* the current sentinel. */
/* Count all the other sentinels. */
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);
...

如果票数达标,就设置ODOWN。

1
2
3
4
...
if (quorum >= master->quorum) odown = 1;
}
...

对于ODOWN的情况,需要调用sentinelEvent事件发送到日志、PubSub,以及用户提醒脚本,然后在本地也要进行设置。

1
2
3
4
5
6
7
8
9
    /* Set the flag accordingly to the outcome. */
if (odown) {
if ((master->flags & SRI_O_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum);
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}
...

对从ODOWN恢复的情况,要同理进行恢复。

1
2
3
4
5
6
7
8
...
} else {
if (master->flags & SRI_O_DOWN) {
sentinelEvent(LL_WARNING,"-odown",master,"%@");
master->flags &= ~SRI_O_DOWN;
}
}
}

检查quorum参数

目前版本提供一个ckquorum命令用来判断Quorum的数量是否合法。
首先统计有多少能够使用的Sentinel节点。voters表示从Master中记录的sentinels数量,加上自己。然后遍历所有的Sentinel,统计出没有SDOWN和ODOWN的节点的数量为usable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int sentinelIsQuorumReachable(sentinelRedisInstance *master, int *usableptr) {
dictIterator *di;
dictEntry *de;
int usable = 1; /* Number of usable Sentinels. Init to 1 to count myself. */
int result = SENTINEL_ISQR_OK;
int voters = dictSize(master->sentinels)+1; /* Known Sentinels + myself. */

di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

if (ri->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
usable++;
}
dictReleaseIterator(di);
...

如果需要的quorum数量大于usable,说明quorum永远达不到,返回错误。

1
2
3
...
if (usable < (int)master->quorum) result |= SENTINEL_ISQR_NOQUORUM;
...

如果usable没有达到多数,那么就说明这个决议即使达成也是无效力的。

1
2
3
4
5
...
if (usable < voters/2+1) result |= SENTINEL_ISQR_NOAUTH;
if (usableptr) *usableptr = usable;
return result;
}

注意,对于quorum至少要占voters绝对多数的控制在sentinelGetLeader下面。

sentinelStartFailoverIfNeeded

在判断完ODOWN之后,就需要判断是否要启动故障转移(FailOver)过程。
在执行前,需要判断是否满足三个条件。

  1. 再次检查是否ODOWN
  2. 如果这个Master在FailOver过程中,我们没必要再重启一个FailOver
  3. 如果刚FailOver完,也不进行FailOver
    这个时间是master->failover_timeout*2,其中master->failover_timeout默认值是SENTINEL_DEFAULT_FAILOVER_TIMEOUT =(60*3*1000),是3分钟。
    【Q】这点也许是为了防止过于频繁的进行FailOver操作,但是既然故障了,又不FailOver那怎么办呢?这里的意思应该是此时这个Sentinel已收到了其他Sentinel的那票,说明其他Sentinel正在FailOver,所以要等待这么长时间再FailOver。这个有点类似于Raft里面的Election Timeout机制。

下面查看代码,这对应了前两点条件

1
2
3
4
5
6
7
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
/* We can't failover if the master is not in O_DOWN state. */
if (!(master->flags & SRI_O_DOWN)) return 0;

/* Failover already in progress? */
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
...

如上面所讲,第三个条件将“CD时间”设置在了master->failover_timeout*2,也就是说,要满足failover_start_time之后这么久时间才能开启FailOver。
那么failover_start_time是在哪里被设置的呢?可以参考上文的说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
...
/* Last failover attempt started too little time ago? */
if (mstime() - master->failover_start_time <
master->failover_timeout*2)
{
if (master->failover_delay_logged != master->failover_start_time) {
time_t clock = (master->failover_start_time +
master->failover_timeout*2) / 1000;
char ctimebuf[26];

ctime_r(&clock,ctimebuf);
ctimebuf[24] = '\0'; /* Remove newline. */
master->failover_delay_logged = master->failover_start_time;
serverLog(LL_WARNING,
"Next failover delay: I will not start a failover before %s",
ctimebuf);
}
return 0;
...
}

在三个return之后,到了FailOver的执行过程。需要注意的是,执行FailOver的Sentinel不一定是发起FailOver的Sentinel,这是因为FailOver总是由Sentinel里面的Leader执行的。但当FailOver发起之后会导致选举,所以可能到时候Leader也换掉了。这个和Raft中有点不一样,Raft中新Leader是不能提交老Leader的日志条目的。

1
2
3
4
...
sentinelStartFailover(master);
return 1;
}

下面查看函数sentinelStartFailover,这个函数被执行的时候我们可以assert现在这个Master是ODOWN了。

1
2
3
4
5
void sentinelStartFailover(sentinelRedisInstance *master) {
serverAssert(master->flags & SRI_MASTER);

master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
...

首先,会更新failover_state字段,我们来看看这个字段的取值都是什么:

  1. SENTINEL_FAILOVER_STATE_NONE:没有在FailOver
  2. SENTINEL_FAILOVER_STATE_WAIT_START:等待开始FailOver
  3. SENTINEL_FAILOVER_STATE_SELECT_SLAVE:选择一个Slave作为新Master,即promoted_slave
  4. SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:向该Slave发送SLAVEOF指令以成为Master
  5. SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:向剩余Slave发送SLAVEOF指令,让它们跟随新Master
  6. SENTINEL_FAILOVER_STATE_RECONF_SLAVES:让剩余Slave开始复制新Master
  7. SENTINEL_FAILOVER_STATE_UPDATE_CONFIG:此时所有Slave已经完成同步,需要进行配置更新。
    这里的配置更新指的是当前Sentinel节点上的mastermaster->slaves等的配置。

下面继续查看源码,会自增sentinel.current_epoch并赋值给Master的failover_epoch,这个类似于Raft中任期的概念。

1
2
3
4
5
6
7
...
master->flags |= SRI_FAILOVER_IN_PROGRESS;
master->failover_epoch = ++sentinel.current_epoch;
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
...

接着,我们要更新被FailOver的Master节点的failover_start_time。这里随机一个FailOver开始时间,类似Raft中的(Follower变成Candidate的?TODO)随机超时时间。

1
2
3
4
5
...
// #define SENTINEL_MAX_DESYNC 1000
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
master->failover_state_change_time = mstime();
}

sentinelAskMasterStateToOtherSentinels

这个函数有下面功能:

  1. 遍历所有Sentinel
    1. 清理过期状态
    2. 判断是否要询问这个Sentinel有关ODOWN的信息。
    3. 发送不带自己runid的is-master-down-by-addr消息,目的是确认是否ODOWN
    4. 发送带自己runid的is-master-down-by-addr消息,目的是竞选

如果这个Sentinel认为Master已下线(即SDOWN),会向其他Sentinel发送SENTINEL is-master-down-by-addr命令,尝试获得足够的票数,将Master标记为ODOWN状态,并开启FailOver。
这个函数会被以SENTINEL_ASK_FORCEDSENTINEL_NO_FLAGS这两个参数调用两次,原因在上文解释过了。

1
2
3
4
5
#define SENTINEL_ASK_FORCED (1<<0)
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
dictIterator *di;
dictEntry *de;
...

我们遍历所有的Sentinel。

1
2
3
4
5
6
7
8
...
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
char port[32];
int retval;
...

如果在5秒内都没有收到这个SentinelSENTINEL is-master-down-by-addr的回复,那么就清理这个Sentinel的投票状态,因为这个状态可能过期了。
清理过程是清除SRI_MASTER_DOWN标记,并且将ri->leader清空。这里再提一下,Sentinel的ri->leader只有在有SRI_MASTER_DOWN标记时有效力,指向自己投票的Sentinel Leader。不要误以为始终指向当前的Leader,这个和Raft不一样,Sentinel只有在FailOver时候才会选举。
【Q】TODO 容易看到,这里处理了Sentinel超时的情况,但我们在前面看到,Sentinel节点也会判断其他Sentinel的SDOWN情况,为啥这里不用SDOWN来判断呢?

1
2
3
4
5
6
7
8
...
// #define SENTINEL_ASK_PERIOD 1000
if (elapsed > SENTINEL_ASK_PERIOD*5) {
ri->flags &= ~SRI_MASTER_DOWN;
sdsfree(ri->leader);
ri->leader = NULL;
}
...

只有在满足下面三种情况下才询问这个Sentinel

  1. Master必须具有SRI_S_DOWN这个flag。
    显然,如果我自己都不觉得SDOWN,那为啥还要问是不是ODOWN呢?
    【Q】我们有必要在这里重复检查一下么?
  2. 必须能够连接到这个Sentinel。
    【Q】我们知道,如果认为SRI_S_DOWN了,那么也要关闭连接。而SDOWN对Master、Slave和Sentinal都检查的。为什么这里还需要检查呢?
    我们仔细看sentinelCheckSubjectivelyDown的实现,其实里面不仅处理了SDOWN的情况。断开连接实际上对应了包括连接超时和不活跃之类的情况。
  3. 除非SENTINEL_ASK_FORCED,否则我们要在SENTINEL_ASK_PERIOD时间内都没有收到riSENTINEL is-master-down的回复,我们才会再次询问。
1
2
3
4
5
6
7
...
if ((master->flags & SRI_S_DOWN) == 0) continue;
if (ri->link->disconnected) continue;
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue;
...

下面是具体询问流程,实际上就是发送一个SENTINEL is-master-down-by-addr指令。**注意,redisAsyncCommand中指定了处理回复的回调函数为sentinelReceiveIsMasterDownReply**,方便我们跟踪流程。
**我们会发送自己的sentinel.current_epoch**,对应到对端拿到的req_epoch

1
2
3
4
5
6
7
8
9
10
...
/* Ask */
ll2string(port,sizeof(port),master->addr->port);
retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply, ri,
"%s is-master-down-by-addr %s %s %llu %s",
sentinelInstanceMapCommand(ri,"SENTINEL"),
master->addr->ip, port,
sentinel.current_epoch,
...

如果启动了FailOver,那么把自己的sentinel.myid发出去,竞选FailOver Leader,否则就发送*,表示我们仅仅去监测有没有ODOWN。

1
2
3
4
5
6
7
...
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid : "*");
if (retval == C_OK) ri->link->pending_commands++;
}
dictReleaseIterator(di);
}

【接上】对sentinelAskMasterStateToOtherSentinels的回复

对端对sentinelAskMasterStateToOtherSentinels发来的is-master-down-by-addr消息的处理过程直接在sentinelCommand里面,主要是两点内容:

  1. 首先返回这个Sentinel有没有觉得SDOWN。
  2. 如果这是一个竞选请求,调用sentinelVoteLeader来计算是否投票。

首先复习一下这个请求的格式。

1
SENTINEL is-master-down-by-addr ip port current_epoch runid

其中ip:port是我们要检查的Master的地址。Note that the command will not check by name but just by master, in theory different Sentinels may monitor differnet masters with the same name.
current-epoch需要被用来判断我们是否能够对选举FailOver Leader进行投票。每个Sentinel每个epoch只能投一票。
如果runid*,表示我们并没有向这个Sentinel请求投票,否则设置为我们想要它投票的runid

下面看具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 在sentinelCommand函数中

sentinelRedisInstance *ri;
long long req_epoch;
uint64_t leader_epoch = 0;
char *leader = NULL;
long port;
int isdown = 0;

if (c->argc != 6) goto numargserr;
if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != C_OK ||
getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL)
!= C_OK)
return;
ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
c->argv[2]->ptr,port,NULL);

首先是返回有没有觉得SDOWN。如果没有处于tilt状态,那么满足下面的情况下,我们返回1

  1. 如果我们检查到节点SDOWN了
  2. 并且这个节点还是Master
1
2
3
4
5
/* It exists? Is actually a master? Is subjectively down? It's down.
* Note: if we are in tilt mode we always reply with "0". */
if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
(ri->flags & SRI_MASTER))
isdown = 1;

如果is-master-down-by-addr这个请求带了runid参数,说明这是一个带竞选的信息,此时会调用sentinelVoteLeader来Vote。

1
2
3
4
5
6
7
/* Vote for the master (or fetch the previous vote) if the request
* includes a runid, otherwise the sender is not seeking for a vote. */
if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
c->argv[5]->ptr,
&leader_epoch);
}

回复内容包含三点。
这个shared实际上是在server.c中定义的struct sharedObjectsStruct shared;,可以理解成一种共享小对象。

1
2
3
4
5
6
7
/* Reply with a three-elements multi-bulk reply:
* down state, leader, vote epoch. */
addReplyArrayLen(c,3);
addReply(c, isdown ? shared.cone : shared.czero);
addReplyBulkCString(c, leader ? leader : "*");
addReplyLongLong(c, (long long)leader_epoch);
if (leader) sdsfree(leader);

sentinelVoteLeader

sentinelVoteLeader函数对应了Raft中Follower投票的逻辑。这个函数返回一个char*,表示这个节点投票支持的,执行master这个节点FailOver过程的Leader,即master->leader。容易看出,Sentinel能同时处理多个Master的FailOver

这个函数的调用方有两个:

  1. 当对端收到带竞选的is-master-down-by-addr信息时
  2. 当自己开启竞选时
    这个发生在后面的sentinelGetLeader

这个函数的参数是req_epochreq_runid,表示处理req_runid这一台Sentinel在req_epoch投票请求。此外,leader_epoch是用来返回的,表示我们投出来的master->leader是哪个epoch的。
这里的req_epoch的取值规则如下:

  1. 如果是对方发来的is-master-down-by-addr,那么值是对方的sentinel.current_epoch
  2. 如果是自己的请求,那么是ri->failover_epoch,这里的ri是我们正在处理的Master节点。

投票逻辑如下:

  1. 如果自己已经投了
    1. 如果是一个来自较新的epoch的投票请求
      需要满足:
      1. 他发过来的req_epoch严格大于我自己的master->leader_epoch,这里leader_epoch表示我刚才vote的Leader是哪个epoch的。
        这个是对Master的epoch的判定。
      2. 他发过来的req_epoch大于等于我自己的sentinel.current_epoch
        这个是对Sentinel的epoch的判定。
    2. 否则
      返回自己原来投的票。
      这通常发生在,有多个Sentinel同时竞选,而我已经给比req_epoch更高的epoch投过票了,那么返回在这个更高的epoch投的票。
  2. 如果暂时不能投票(TODO 对应什么情况呢?)就返回NULL
  3. 如果还没有投:
    1. 设置master->leaderreq_runid,也是函数的返回值
    2. 设置master->leader_epochleader_epoch的值为sentinel.current_epoch
1
2
3
4
5
6
7
8
9
10
11
12
13
/* Vote for the sentinel with 'req_runid' or return the old vote if already
* voted for the specified 'req_epoch' or one greater.
*
* If a vote is not available returns NULL, otherwise return the Sentinel
* runid and populate the leader_epoch with the epoch of the vote. */
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
if (req_epoch > sentinel.current_epoch) {
sentinel.current_epoch = req_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}
...

如果,当前Sentinel节点记录的Master的epoch是陈旧的,并且Sentinel节点自己的current_epoch不领先于req_epoch,那么说明这是一个来自于较新的epoch的投票请求,我们应该投票。

1
2
3
4
5
6
7
8
9
10
...
if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
{
sdsfree(master->leader);
master->leader = sdsnew(req_runid);
master->leader_epoch = sentinel.current_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu",
master->leader, (unsigned long long) master->leader_epoch);
...

下面,如果我们没有投票给自己,就需要设置**failover_start_time**。这里的myid指的是自己的runid,在3.0版本,是用的server.runid,可以参考前文的说明。
failover_start_time为当前时间,再加上随机值的偏移。这在前面提到过,作用是为了实现类似于Raft中的随机超时时间。

1
2
3
4
5
6
7
8
...
if (strcasecmp(master->leader,sentinel.myid))
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
}

*leader_epoch = master->leader_epoch;
return master->leader ? sdsnew(master->leader) : NULL;
}

【接上】sentinelReceiveIsMasterDownReply

我们看看Sentinel如何处理sentinelAskMasterStateToOtherSentinels的返回结果,即投票结果。
下面是一些Sanity检查,可以跳过直接看主干。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* Receive the SENTINEL is-master-down-by-addr reply, see the
* sentinelAskMasterStateToOtherSentinels() function for more information. */
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;
/* Ignore every error or unexpected reply.
* Note that if the command returns an error for any reason we'll
* end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
r->element[0]->type == REDIS_REPLY_INTEGER &&
r->element[1]->type == REDIS_REPLY_STRING &&
r->element[2]->type == REDIS_REPLY_INTEGER)
{
...

首先看第一个参数element[0],表示对Master是否宕机的Reply。

1
2
3
4
5
6
7
8
...
ri->last_master_down_reply_time = mstime();
if (r->element[0]->integer == 1) {
ri->flags |= SRI_MASTER_DOWN;
} else {
ri->flags &= ~SRI_MASTER_DOWN;
}
...

第二个参数element[1],如果不是"*",表示对方投票了,此时更新ri的投票结果r->element[1]->strri->leader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
if (strcmp(r->element[1]->str,"*")) {
/* If the runid in the reply is not "*" the Sentinel actually
* replied with a vote. */
sdsfree(ri->leader);
if ((long long)ri->leader_epoch != r->element[2]->integer)
serverLog(LL_WARNING,
"%s voted for %s %llu", ri->name,
r->element[1]->str,
(unsigned long long) r->element[2]->integer);
ri->leader = sdsnew(r->element[1]->str);
ri->leader_epoch = r->element[2]->integer;
}
}
}

sentinelFailoverStateMachine

这个函数是FailOver状态机的入口,每次时钟循环都会调用一次。他负责处理除了SENTINEL_FAILOVER_STATE_NONESENTINEL_FAILOVER_STATE_UPDATE_CONFIG(实际上是一头一尾)的所有命令。
每一次状态转移都需要更新failover_state_change_time,这个时间会在一些地方被用来处理Abort逻辑

这个函数需要在SRI_FAILOVER_IN_PROGRESS下才会运行,这个位是由sentinelStartFailover设置的。
主要内容是一个dispatch的逻辑,根据当前FailOver状态机的状态ri->failover_state去执行对应的函数。

我们容易看出,状态机状态为X的时候,表示前一个状态对应的行为已经完成,但是当前状态的行为并不一定完成。也就是状态X表示正在做X,而不是X已经做完了,这应该是状态机设计的时候一个比较通用的做法吧。

1
2
3
4
5
6
7
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);

if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

switch(ri->failover_state) {
...

这个函数发送sentinalSignal

  1. -failover-abort-not-elected
  2. +elected-leader+failover-state-select-slave

可能到达状态:

  1. SENTINEL_FAILOVER_STATE_NONE
  2. SENTINEL_FAILOVER_STATE_SELECT_SLAVE
1
2
3
4
5
...
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
...

这个函数发送sentinalSignal

  1. -failover-abort-no-good-slave
  2. +selected-slave+failover-state-send-slaveof-noone

可能到达状态:

  1. SENTINEL_FAILOVER_STATE_NONE
  2. SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
1
2
3
4
5
...
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;
...

这个函数发送sentinalSignal

  1. -failover-abort-slave-timeout
  2. +failover-state-wait-promotion

可能到达状态:

  1. SENTINEL_FAILOVER_STATE_NONE
  2. SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
1
2
3
4
5
...
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;
...

这个函数发送sentinalSignal

  1. -failover-abort-slave-timeout

可能到达状态:

  1. SENTINEL_FAILOVER_STATE_NONE
1
2
3
4
5
...
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;
...

这个函数发送sentinalSignal

  1. 自己
    1. -slave-reconf-sent-timeout
    2. +slave-reconf-sent
  2. sentinelFailoverDetectEnd
    1. +failover-end-for-timeout
    2. +failover-end
    3. +slave-reconf-sent-be

可能到达状态:

  1. SENTINEL_FAILOVER_STATE_UPDATE_CONFIG
1
2
3
4
5
6
...
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}

sentinelFailoverWaitStart

首先检查自己是不是这个epoch的FailOver的Leader,这个通常伴随着选举的过程,在选举过程中,Candidate会在这边自旋,并调用sentinelGetLeader统计票数。TODO 加一个Demo。
注意之前提到过,FailOver的发起者不一定是FailOver的执行者。这个过程涉及调用sentinelGetLeader函数,可能会触发选举。
这里的ri是我们正在处理的master节点所对应的Instance。

1
2
3
4
5
6
7
8
9
10
/* ---------------- Failover state machine implementation ------------------- */
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
char *leader;
int isleader;

/* Check if we are the leader for the failover epoch. */
leader = sentinelGetLeader(ri, ri->failover_epoch);
isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
sdsfree(leader);
...

如果我自己不是Leader,并且这个FailOver不是由SENTINEL FAILOVER命令触发的强制FailOver,那么就不进行下面的FailOver流程。这个逻辑很简单,Raft中发起AppendEntries也都是由Leader来发起的。

1
2
3
4
5
6
7
...
if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
int election_timeout = SENTINEL_ELECTION_TIMEOUT;

/* The election timeout is the MIN between SENTINEL_ELECTION_TIMEOUT
* and the configured failover timeout. */
...

如果超过了election_timeout,我们会直接Abort整个FailOver过程。

1
2
3
4
5
6
7
8
9
10
11
...
if (election_timeout > ri->failover_timeout)
election_timeout = ri->failover_timeout;
/* Abort the failover if I'm not the leader after some time. */
if (mstime() - ri->failover_start_time > election_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}
...

否则切换状态到SENTINEL_FAILOVER_STATE_SELECT_SLAVE,也就是说从Slave中挑选一个新的Master。

1
2
3
4
5
6
7
8
...
sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
sentinelSimFailureCrash();
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
}

【接上】sentinelGetLeader

这个函数用来返回给定的epoch对应的Leader。这里的epoch实际上是ri->failover_epoch,也就是当前进程需要FailOver的Master的failover_epoch
【Q】为什么不用sentinel.current_epoch呢?我认为这是因为同一个Sentinel可能处理若干个Master的FailOver过程,所以sentinel.current_epochri->failover_epoch不能一一对应。
扫描这个Master下面的所有Sentinel,检查对于epoch是否存在一个Leader。为了成为Leader,我们需要有majority的节点都投票给这个Leader,参与投票的节点由上次SENTINEL RESET的指定。
一个问题是,Leader选举是在什么时候开始的呢?我们需要注意的是sentinelGetLeader这个函数并不是只会触发一遍,而是在整个sentinelTimer上都会进行调用。在第一次调用的时候,肯定是一票都没有,winnerNULL,此时就给自己投一票,同时函数sentinelAskMasterStateToOtherSentinels会发送is-master-down-by-addr给另外一方。

1
2
3
4
5
6
7
8
9
10
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
dict *counters;
dictIterator *di;
dictEntry *de;
unsigned int voters = 0, voters_quorum;
char *myvote;
char *winner = NULL;
uint64_t leader_epoch;
uint64_t max_votes = 0;
...

首先,断言一下Master是ODOWN状态,并且FailOver在进行中

1
2
3
4
...
serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
counters = dictCreate(&leaderVotesDictType,NULL);
...

下面这个循环,对于每一个Sentinel选出的Leader,我们都投上一票。看起来这个很奇怪,感觉和墙头草一样?其实不然,这里“投上一票”的表述并不准确。ri->leader实际上记录了这个Sentinel的投票结果,因此我认为这个循环实际上是记录了每一个Sentinel到底得了多少票。所以不是“投上一票”,而是记上一票。

1
2
3
4
5
6
7
8
9
10
11
12
...
voters = dictSize(master->sentinels)+1; /* All the other sentinels and me.*/

/* Count other sentinels votes */
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
sentinelLeaderIncr(counters,ri->leader);
}
dictReleaseIterator(di);
...

在遍历统计完投票情况后,检查谁胜出,必须满足下面的条件的,才会成为winner

  1. 绝对多数
  2. 必须大于master->quorum
1
2
3
4
5
6
7
8
9
10
11
12
...
di = dictGetIterator(counters);
while((de = dictNext(di)) != NULL) {
uint64_t votes = dictGetUnsignedIntegerVal(de);

if (votes > max_votes) {
max_votes = votes;
winner = dictGetKey(de);
}
}
dictReleaseIterator(di);
...

现在需要再统计一下自己的票数。如果自己没投票,那么就投给winner;如果没有winner,就投给自己sentinel.myid
【Q】如果作为竞选的发起方,这里是走哪条路径的呢?我觉得根据上面的讨论,因为一开始协议刚发出去,一票都没有,肯定还没有决出winner,所以还是投自己一票的。

1
2
3
4
5
6
...
if (winner)
myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
else
myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch);
...

上面只是发送Vote消息,sentinelLeaderIncr函数是自增counters的计数,这个发生在本地,把自己的一票也加上去。如果投票之后的票数比最大票数要大,那么更换Leader。

1
2
3
4
5
6
7
8
9
10
...
if (myvote && leader_epoch == epoch) {
uint64_t votes = sentinelLeaderIncr(counters,myvote);

if (votes > max_votes) {
max_votes = votes;
winner = myvote;
}
}
...

下面检查winner的票数是否满足quorum和master->quorum

1
2
3
4
5
6
7
8
9
10
...
voters_quorum = voters/2+1;
if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
winner = NULL;

winner = winner ? sdsnew(winner) : NULL;
sdsfree(myvote);
dictRelease(counters);
return winner;
}

sentinelFailoverSelectSlave

这个函数用来选择需要promote成Master的Slave。主要逻辑是调用sentinelSelectSlave,然后根据他的返回值,去进行一些周边的处理,例如状态机的切换。
从原来所有的Slave中,挑选一个作为新的Master,如果没有合格的新Master,那么返回NULL。这里同样有个sentinelAbortFailover的分支来处理选不到的情况。

1
2
3
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
sentinelRedisInstance *slave = sentinelSelectSlave(ri);
...

下面就是设置slave->flags,设置ri->promoted_slave

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
...
/* We don't handle the timeout in this state as the function aborts
* the failover or go forward in the next state. */
if (slave == NULL) {
sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
sentinelAbortFailover(ri);
} else {
sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
slave->flags |= SRI_PROMOTED;
ri->promoted_slave = slave;
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
slave, "%@");
}
}
sentinelSelectSlave

首先是计算能接受的Slave和Master之间的Replication断连的最大时长,超过这个时间的Slave我们不考虑,从而保证数据库的时效性。
down_after_period表示在这段时间后会被认为短连了,所以max_master_down_time等于Master的SDOWN到现在的时间,加上十倍的down_after_period。这个计算方式很神奇。因为从当前Sentinel来看,Master已经处于下线状态,所以正常来说,Slave与Master之间的连接断开时间不应该超过down-after-period * 10。这听上去有点像黑魔法,不过这个判断的原理是这样的:当Master下线之后,Master和Slave的连接就会断开,但只要先下线的是Master而不是Slave,即连接断开是由Master而不是Slave造成的,那么Master和Slave之间的连接断开时间就不会太长。不过这只是一个辅助手段,因为最终我们都会使用复制偏移量来挑选Slave。

1
2
3
4
5
6
7
8
9
10
11
12
13
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance **instance =
zmalloc(sizeof(instance[0])*dictSize(master->slaves));
sentinelRedisInstance *selected = NULL;
int instances = 0;
dictIterator *di;
dictEntry *de;
mstime_t max_master_down_time = 0;

if (master->flags & SRI_S_DOWN)
max_master_down_time += mstime() - master->s_down_since_time;
max_master_down_time += master->down_after_period * 10;
...

下面开始遍历所有的Slave来生成候选集instance,需要满足下面的条件:

  1. 不能SDOWN或者ODOWN
  2. 不能短连
  3. Link不能超时
  4. slave->slave_priority不能为0。这个值是从INFO得到的,默认为SENTINEL_DEFAULT_SLAVE_PRIORITY = 100
  5. 对PING(5xPERIOD)/INFO(3xPERIOD)的回复不能超时
  6. 和Master的Replication断连的时长不能超过上面的计算值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
...
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;

if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
if (slave->link->disconnected) continue;
if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
if (slave->slave_priority == 0) continue;

/* If the master is in SDOWN state we get INFO for slaves every second.
* Otherwise we get it with the usual period so we need to account for
* a larger delay. */
if (master->flags & SRI_S_DOWN)
info_validity_time = SENTINEL_PING_PERIOD*5;
else
info_validity_time = SENTINEL_INFO_PERIOD*3;
if (mstime() - slave->info_refresh > info_validity_time) continue;
if (slave->master_link_down_time > max_master_down_time) continue;
instance[instances++] = slave;
}
...

下面,我们对候选集进行排序,然后选到最优的

1
2
3
4
5
6
7
8
9
10
...
dictReleaseIterator(di);
if (instances) {
qsort(instance,instances,sizeof(sentinelRedisInstance*),
compareSlavesForPromotion);
selected = instance[0];
}
zfree(instance);
return selected;
}
compareSlavesForPromotion

主要比较优先级分为下面几个:

  1. slave_priority越小的
  2. slave_repl_offset(replication offset)越大的,这表示从服务器的偏移量
  3. runid字母序越小的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/* 
*
* Basically if runid is the same, the slave that processed more commands
* from the master is selected.
*
* The function returns the pointer to the selected slave, otherwise
* NULL if no suitable slave was found.
*/
int compareSlavesForPromotion(const void *a, const void *b) {
sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
**sb = (sentinelRedisInstance **)b;
char *sa_runid, *sb_runid;

if ((*sa)->slave_priority != (*sb)->slave_priority)
return (*sa)->slave_priority - (*sb)->slave_priority;

/* If priority is the same, select the slave with greater replication
* offset (processed more data from the master). */
if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
return -1; /* a < b */
} else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
return 1; /* a > b */
}
...

因为一下老的slave没有publishrunid,所以这里将其设置为NULL,默认NULL的字母序最大

1
2
3
4
5
6
7
8
...
sa_runid = (*sa)->runid;
sb_runid = (*sb)->runid;
if (sa_runid == NULL && sb_runid == NULL) return 0;
else if (sa_runid == NULL) return 1; /* a > b */
else if (sb_runid == NULL) return -1; /* a < b */
return strcasecmp(sa_runid, sb_runid);
}

sentinelFailoverSendSlaveOfNoOne

这个函数负责对要提升的节点发送SLAVEOF NO ONE,让它提升为Master。
如果link->disconnected,那么不断重试直到超时failover_timeout,触发sentinelAbortFailover
ri->promoted_slave是在sentinelFailoverSelectSlave被设置的。

1
2
3
4
5
6
7
8
9
10
11
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
int retval;

if (ri->promoted_slave->link->disconnected) {
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}
...

调用sentinelSendSlaveOf,传入NULL表示让这个ri->promoted_slave执行SLAVEOF NO ONE为Master。

1
2
3
4
5
6
7
8
...
retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
if (retval != C_OK) return;
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
ri->promoted_slave,"%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
}
sentinelSendSlaveOf

这个命令先发送SLAVEOF。再发送CONFIG REWRITE,从而在可能的情况下将配置刷到配置上。可能的情况指Redis支持写Config,并且这个服务器是从Config文件启动的。

1
2
3
4
5
6
7
8
9
10
11
/* 
* The command returns C_OK if the SLAVEOF command was accepted for
* (later) delivery otherwise C_ERR. The command replies are just
* discarded. */
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
char portstr[32];
int retval;

ll2string(portstr,sizeof(portstr),port);

...

如果hostNULL,发送SLAVEOF NO ONE,让这台机器变为Master。

1
2
3
4
5
6
7
8
...
/* If host is NULL we send SLAVEOF NO ONE that will turn the instance
* into a master. */
if (host == NULL) {
host = "NO";
memcpy(portstr,"ONE",4);
}
...

出于安全因素,我们要开启一个事务,这个也是相比3.0升级的地方。包含:

  1. SLAVEOF
  2. 重新配置
  3. Disconnect all clients (but this one sending the commnad),从而触发ask-master-on-reconnection协议(?)
    注意CLIENT KILL TYPE <type>这个指令从2.8才开始有的。不过往低版本的发并没有为题,因为CLIENT是一个variadic command,所以不会被认为是错误,所以事务不会失败。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
...
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"MULTI"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"SLAVEOF"),
host, portstr);
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s REWRITE",
sentinelInstanceMapCommand(ri,"CONFIG"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

for (int type = 0; type < 2; type++) {
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s",
sentinelInstanceMapCommand(ri,"CLIENT"),
type == 0 ? "normal" : "pubsub");
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
}

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"EXEC"));
...

我们不通过命令的返回值来检查实际执行效果,而是观察下一个INFO。

1
2
3
4
5
6
...
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

return C_OK;
}

sentinelFailoverWaitPromotion

这个函数只负责处理失败的情况。对于成功的情况,我们是在监视INFO返回值的sentinelRefreshInstanceInfo函数里面看的。

1
2
3
4
5
6
7
8
9
10
/* We actually wait for promotion indirectly checking with INFO when the
* slave turns into a master. */
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
/* Just handle the timeout. Switching to the next state is handled
* by the function parsing the INFO command of the promoted slave. */
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
}

sentinelRefreshInstanceInfo 角色变化部分

函数sentinelRefreshInstanceInfo作用是解析INFO命令的返回值,通过它来更新节点从Slave到Master的变化。
这个函数很长,我们只节选需要的部分来进行讲解。
下面的代码,可以解析出这个节点认为自己的角色,是Master还是Slave。

1
2
3
4
5
6
7
// sentinelRefreshInstanceInfo
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
...
/* role:<role> */
if (sdslen(l) >= 11 && !memcmp(l,"role:master",11)) role = SRI_MASTER;
else if (sdslen(l) >= 10 && !memcmp(l,"role:slave",10)) role = SRI_SLAVE;
···

跳过后面一堆代码,看到下面这个if。现在ri在我的视野里是Slave,但是它却宣告自己是Master了。

1
2
3
4
···
/* Handle slave -> master role switch. */
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
...

这是怎么回事呢?如果ri就是我们钦定的要成为Master的节点,并且我们Master确实在FailOver,并且我们的FailOver的状态确实是SENTINEL_FAILOVER_STATE_WAIT_PROMOTION,这就说明我们对promoted_slave的提升动作成功了,于是我们就可以进入下一步流程,即切换到状态为SENTINEL_FAILOVER_STATE_RECONF_SLAVES。其实这些检查就是我们确定了Slave已经被Reconf为Master了,对应的失败情况,就是sentinelFailoverWaitPromotion中看到的。
我们要将Master的config_epoch设置为Master的failover_epoch,这一步的原因是我们将config_epoch同步为我们赢得选举胜利从而进行FailOver的epoch,这样如果没有一个更大的epoch的话,就会强迫其他Sentinel更新Config。

1
2
3
4
5
6
7
8
...
if ((ri->flags & SRI_PROMOTED) &&
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
(ri->master->failover_state ==
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
{
ri->master->config_epoch = ri->master->failover_epoch;
...

下面更新failover_state

1
2
3
4
5
6
7
8
9
10
11
12
13
...
ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
ri->master->failover_state_change_time = mstime();
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
if (sentinel.simfailure_flags &
SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
sentinelSimFailureCrash();
sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
ri->master,"%@");
sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
"start",ri->master->addr,ri->addr);
...

sentinelForceHelloUpdateForMaster这个函数会强制发送一个”Hello”信息到所有关联到ri->master的Redis和Sentinel Instance。
从技术上来说,这个并不需要,因为我们每隔SENTINEL_PUBLISH_PERIOD都会向所有Instance发送一个消息。但是当一个Sentinel更新了配置之后,尽快发送总是好的。
注意虽然Sentinel我们没有设置pc,但是它实际上是通过ccsentinelSendHello的。

1
2
3
...
sentinelForceHelloUpdateForMaster(ri->master);
...

否则,一个Slave意外地变成了Master,此时如果我们的主服务器看起来正常,并且riwait_time时间内没有SDOWN或者ODOWN(?),我们要强制它变回Slave。这个通常是因为重启,或者之前下线的Master突然上线了。
但在这之前,我们需要等待一段时间,让它可以接收新配置(指什么呢?)。
对于这种情况,兜底选项是啥呢?如果啥都没有,那么这边就会超时,导致上面sentinelFailoverWaitPromotion失败。但什么情况下会有这个分支呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
} else {
mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;

if (!(ri->flags & SRI_PROMOTED) &&
sentinelMasterLooksSane(ri->master) &&
sentinelRedisInstanceNoDownFor(ri,wait_time) &&
mstime() - ri->role_reported_time > wait_time)
{
int retval = sentinelSendSlaveOf(ri,
ri->master->addr->ip,
ri->master->addr->port);
if (retval == C_OK)
sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@");
}
}
}

看一下服务器正常的条件:

  1. 它在我们眼里是Master
  2. 它自己也说自己是Master
  3. 它还没有SDOWN或者ODOWN
1
2
3
4
5
6
7
int sentinelMasterLooksSane(sentinelRedisInstance *master) {
return
master->flags & SRI_MASTER &&
master->role_reported == SRI_MASTER &&
(master->flags & (SRI_S_DOWN|SRI_O_DOWN)) == 0 &&
(mstime() - master->info_refresh) < SENTINEL_INFO_PERIOD*2;
}

sentinelFailoverReconfNextSlave

这个函数调用来自于FailOver状态机的SENTINEL_FAILOVER_STATE_RECONF_SLAVES这个状态。它的主要功能是所有剩余的似乎还没有更新配置的Slave发送SLAVE OF指令,也就是让他们去跟随新的Master,即promoted_slave
我们对指令的发送进行限流,同时只能存在master->parallel_syncs条in-flight的指令。
这里的“似乎没有更新配置”指的是具有以下这个条件slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG),其实就表示已经在更新过程中,但是没更新完。

1
2
3
4
5
6
7
/* Send SLAVE OF <new master address> to all the remaining slaves that
* still don't appear to have the configuration updated. */
void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
int in_progress = 0;
...

下面这个循环统计已经在更新过程中的Slave数量到in_progress里面。

1
2
3
4
5
6
7
8
9
10
...
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);

if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
in_progress++;
}
dictReleaseIterator(di);
...

下面,我们又遍历一次循环,在这个循环里面,如果in_progress的数量还小于设定的master->parallel_syncs,我们将尝试对新的Slave发送SLAVE OF指令。
我感觉每一次Tick都跑这个遍历,开销是不是有点大呢?其实不然,因为这个循环还要做其他事情,反倒是上一个循环也许可以优化下,比如说搞一个全局的in_progress

1
2
3
4
5
6
7
8
9
10
11
...
di = dictGetIterator(master->slaves);
while(in_progress < master->parallel_syncs &&
(de = dictNext(di)) != NULL)
{
sentinelRedisInstance *slave = dictGetVal(de);
int retval;

// 跳过已经完成配置的Slave
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
...

如果对这个Slave已经发送了SLAVE OF,但是这个Slave还是停留在这个状态,我们就认为它已经被Reconf了。这个处理正确么?注释的解释是Sentinel能够在稍后检测出这个Slave被错误地配置了,并且进行修复。
【Q】这个过程在哪里呢?

1
2
3
4
5
6
7
8
9
10
...
if ((slave->flags & SRI_RECONF_SENT) &&
(mstime() - slave->slave_reconf_sent_time) >
SENTINEL_SLAVE_RECONF_TIMEOUT)
{
sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
slave->flags &= ~SRI_RECONF_SENT;
slave->flags |= SRI_RECONF_DONE;
}
...

对于过程中,或者断连了的节点,我们跳过。

1
2
3
4
5
6
...
/* Nothing to do for instances that are disconnected or already
* in RECONF_SENT state. */
if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
if (slave->link->disconnected) continue;
...

对于剩下来的情况,我们发送命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
...
/* Send SLAVEOF <new master>. */
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
if (retval == C_OK) {
slave->flags |= SRI_RECONF_SENT;
slave->slave_reconf_sent_time = mstime();
sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
in_progress++;
}
}
dictReleaseIterator(di);

下面的sentinelFailoverDetectEnd检查如果所有的Slave都被配置了,则切换到SENTINEL_FAILOVER_STATE_UPDATE_CONFIG

1
2
3
    /* Check if all the slaves are reconfigured and handle timeout. */
sentinelFailoverDetectEnd(master);
}
【Q】和sentinelFailoverSwitchToPromotedSlave的区别在哪里?

详见后面对sentinelFailoverSwitchToPromotedSlave的介绍。

  1. 阶段
    sentinelFailoverSwitchToPromotedSlave的处理阶段是SENTINEL_FAILOVER_STATE_UPDATE_CONFIG,而这个函数是SENTINEL_FAILOVER_STATE_RECONF_SLAVES

【Q】SRI_RECONF_SENT和SRI_RECONF_INPROG的区别是什么?

我们可以看到SRI_RECONF_有三个子状态SRI_RECONF_SENTSRI_RECONF_INPROGSRI_RECONF_DONE,是从左到右递进的。
在我们调用sentinelSendSlaveOf后,会设置SRI_RECONF_SENT
在我们在sentinelRefreshInstanceInfo中检测到这个Slave指向的Master的地址等于新Master即ri->master->promoted_slave的地址时,会取消SRI_RECONF_SENT,设置SRI_RECONF_INPROG

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// sentinelRefreshInstanceInfo
...
/* Detect if the slave that is in the process of being reconfigured
* changed state. */
if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
(ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
{
/* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
if ((ri->flags & SRI_RECONF_SENT) &&
ri->slave_master_host &&
strcmp(ri->slave_master_host,
ri->master->promoted_slave->addr->ip) == 0 &&
ri->slave_master_port == ri->master->promoted_slave->addr->port)
{
ri->flags &= ~SRI_RECONF_SENT;
ri->flags |= SRI_RECONF_INPROG;
sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
}

因为此时状态已经是SRI_RECONF_INPROG,所以当Slave到Master(肯定是新的而不是旧的)的连接是SENTINEL_MASTER_LINK_STATUS_UP时,切换到SRI_RECONF_DONE
这个是通过INFO消息中的master_link_status:字段来获得的。

1
2
3
4
5
6
7
8
9
10
        /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
if ((ri->flags & SRI_RECONF_INPROG) &&
ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
{
ri->flags &= ~SRI_RECONF_INPROG;
ri->flags |= SRI_RECONF_DONE;
sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
}
}
...

当然,后面我们还会看到因为超时等原因设置为SRI_RECONF_DONE等flag的。

sentinelFailoverDetectEnd

这个函数用来判断是否这个Master的所有Slave的同步都已经完成,此时,master还表示当前Fail的Master。
下面的两个条件,主要是:

  1. 检查是否有合法的master->promoted_slave
    主要检查是不是SDOWN,不过为啥不检查是否ODOWN呢?
  2. 具有Reconf完成的flag,或者SDOWN
    【Q】为什么Slave发生SDOWN是可以接受的呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
int not_reconfigured = 0, timeout = 0;
dictIterator *di;
dictEntry *de;
mstime_t elapsed = mstime() - master->failover_state_change_time;

/* We can't consider failover finished if the promoted slave is
* not reachable. */
if (master->promoted_slave == NULL ||
master->promoted_slave->flags & SRI_S_DOWN) return;

/* The failover terminates once all the reachable slaves are properly
* configured. */
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);

if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
if (slave->flags & SRI_S_DOWN) continue;
not_reconfigured++;
}
dictReleaseIterator(di);
...

下面是日经的超时检测,在前面的状态机中超时就经常会导致sentinelAbortFailover。但是我们会忽略未完成的Slave,这个可能也是Sentinel可以在后面进行修复吧。

1
2
3
4
5
6
7
8
...
/* Force end of failover on timeout. */
if (elapsed > master->failover_timeout) {
not_reconfigured = 0;
timeout = 1;
sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
}
...

如果此时,发现都Reconf了,那么就更新状态为SENTINEL_FAILOVER_STATE_UPDATE_CONFIG

1
2
3
4
5
6
7
...
if (not_reconfigured == 0) {
sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
master->failover_state_change_time = mstime();
}
...

但是,如果超时的话,就需要再发送一次?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    /* If I'm the leader it is a good idea to send a best effort SLAVEOF
* command to all the slaves still not reconfigured to replicate with
* the new master. */
if (timeout) {
dictIterator *di;
dictEntry *de;

di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
int retval;

if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
if (slave->link->disconnected) continue;

retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
if (retval == C_OK) {
sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
slave->flags |= SRI_RECONF_SENT;
}
}
dictReleaseIterator(di);
}
}

sentinelFailoverSwitchToPromotedSlave

【Q】和sentinelFailoverReconfNextSlave的关系是什么呢?见对sentinelFailoverReconfNextSlave的论述。

这个函数主要负责配置更新。什么是配置呢?

于是将旧的Master即master移出表,并把新的Slave即master->promoted_slave加进去,其中master->promoted_slave指向被提升为新Master的Slave。

1
2
3
4
/* This function is called when the slave is in
* SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
* to remove it from the master table and add the promoted slave instead. */
void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {

这里的ref表示最新的Master。

1
2
3
4
5
6
7
8
9
    sentinelRedisInstance *ref = master->promoted_slave ?
master->promoted_slave : master;

sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d",
master->name, master->addr->ip, master->addr->port,
ref->addr->ip, ref->addr->port);

sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
}

sentinelResetMasterAndChangeAddress

sentinelFailoverSwitchToPromotedSlave实际上是FailOver自动机的最后一步。
主要逻辑是sentinelResetMasterAndChangeAddress,这个函数用来设置Master的ip:port,但保持Master名字不变,并最终调用sentinelResetMastersentinelResetMaster函数会重置master这个sentinelRedisInstance对象。与此同时,failover_state这个状态也会被清空成SENTINEL_FAILOVER_STATE_NONE
这个函数通常用来处理+switch-master消息,检索源码,我们确实看到所有+switch-mastersentinelEvent后面都会有这个函数。
这个函数返回错误,当ip:port不能被resolve。

1
2
3
4
5
6
int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
sentinelAddr *oldaddr, *newaddr;
sentinelAddr **slaves = NULL;
int numslaves = 0, j;
dictIterator *di;
dictEntry *de;

newaddr是最新的Master的地址。

1
2
newaddr = createSentinelAddr(ip,port);
if (newaddr == NULL) return C_ERR;

我们遍历原来Master的所有Slave,如果这些Slave的指向的Master地址slave->addr还没有更新,我们就将它加入slaves列表中,等待后续更新。

1
2
3
4
5
6
7
8
9
10
11
12
/* Make a list of slaves to add back after the reset.
* Don't include the one having the address we are switching to. */
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);

if (sentinelAddrIsEqual(slave->addr,newaddr)) continue;
slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
slaves[numslaves++] = createSentinelAddr(slave->addr->ip,
slave->addr->port);
}
dictReleaseIterator(di);

如果Master的地址变了,我们就要把老Master放到slaves列表中。

1
2
3
4
5
6
7
8
/* If we are switching to a different address, include the old address
* as a slave as well, so that we'll be able to sense / reconfigure
* the old master. */
if (!sentinelAddrIsEqual(newaddr,master->addr)) {
slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
slaves[numslaves++] = createSentinelAddr(master->addr->ip,
master->addr->port);
}

下面,我们重置Master指针指向的对象,并且将slaves列表中的对象加到这个新的Master中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    /* Reset and switch address. */
sentinelResetMaster(master,SENTINEL_RESET_NO_SENTINELS);
oldaddr = master->addr;
master->addr = newaddr;
master->o_down_since_time = 0;
master->s_down_since_time = 0;

/* Add slaves back. */
for (j = 0; j < numslaves; j++) {
sentinelRedisInstance *slave;

slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip,
slaves[j]->port, master->quorum, master);
releaseSentinelAddr(slaves[j]);
if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
}
zfree(slaves);

/* Release the old address at the end so we are safe even if the function
* gets the master->addr->ip and master->addr->port as arguments. */
releaseSentinelAddr(oldaddr);
sentinelFlushConfig();
return C_OK;
}

实验

配置

我们的配置如下(推荐用自己编译的Redis,否则Redis服务会自动重启,需要用/etc/init.d/redis-server stop关闭)

1
2
3
Master localhost 6379
Slave1 localhost 6479
Slave2 localhost 6579

Slave

对于Slave1

1
2
3
4
# redis2.conf
replicaof 127.0.0.1 6379
port 6479
pidfile /var/run/redis_6479.pid

运行

1
2
./src/redis-server redis2.conf &
redis-cli -h 127.0.0.1 -p 6479

对于Slave2同理
INFO一下主节点

1
2
3
4
5
6
7
8
9
10
# Replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6479,state=online,offset=27029,lag=1
slave1:ip=127.0.0.1,port=6579,state=online,offset=27559,lag=0
master_repl_offset:27559
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:2
repl_backlog_histlen:27558

Sentinel

对于Sentinel2,我们配置如下。注意log文件配置相对路径似乎会打印不了log

1
2
3
4
5
port 26479
logfile "/tmp/sentinel2.log"
daemonize yes
pidfile /var/run/redis-sentinel2.pid
sentinel monitor mymaster 127.0.0.1 6379 2

其他如法炮制

1
./src/redis-sentinel sentinel.conf

启动后可以,注意我们可以看到runid,他是一个十六进制的串183af86ac492235bf97739bcdad0353f1d4e61df

1
2
3
4
5
6
7
8
9
10
$ redis-cli -p 26379
127.0.0.1:26379> sentinel masters
1) 1) "name"
2) "mymaster"
3) "ip"
4) "127.0.0.1"
5) "port"
6) "6379"
7) "runid"
8) "183af86ac492235bf97739bcdad0353f1d4e61df"

还可以INFO一下

1
2
3
4
5
6
7
8
127.0.0.1:26379> info sentinel
# Sentinel
sentinel_masters:1
sentinel_tilt:0
sentinel_running_scripts:0
sentinel_scripts_queue_length:0
sentinel_simulate_failure_flags:0
master0:name=mymaster,status=ok,address=127.0.0.1:6379,slaves=2,sentinels=3

但似乎没有办法从Master找到其他的Sentinel
注意,如果有Duplicated master name.的错误,可以考虑删除掉conf下面这段自动生成的代码。这是因为sentinelFlushConfig函数会触发对conf的回写。

1
2
3
4
5
6
7
8
9
10
11
12
13
protected-mode no
user default on nopass ~* +@all
sentinel known-replica mymaster 127.0.0.1 6479
sentinel known-sentinel mymaster 127.0.0.1 26479 0833444de1d44b73ce6382af2ff0aeb21951b19a
sentinel known-sentinel mymaster 127.0.0.1 26379 2ddc47e7af3cebe8b1642127ed72002bb918550c
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel config-epoch mymaster 0
sentinel leader-epoch mymaster 0
sentinel known-replica mymaster 127.0.0.1 6579
sentinel known-replica mymaster 127.0.0.1 6479
sentinel known-sentinel mymaster 127.0.0.1 26479 0833444de1d44b73ce6382af2ff0aeb21951b19a
sentinel known-sentinel mymaster 127.0.0.1 26379 2ddc47e7af3cebe8b1642127ed72002bb918550c
sentinel current-epoch 0

测试正常FailOver

现在我们Kill掉Master

Sentinel1

观察Sentinel1,它被选举为Leader

1
2
3
4
5
10467:X 22 Oct 2020 04:27:19.771 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
10467:X 22 Oct 2020 04:27:19.771 # Redis version=999.999.999, bits=64, commit=00000000, modified=0, pid=10467, just started
10467:X 22 Oct 2020 04:27:19.771 # Configuration loaded
10468:X 22 Oct 2020 04:27:19.772 * Increased maximum number of open files to 10032 (it was originally set to 1024).
10468:X 22 Oct 2020 04:27:19.772 * Running mode=sentinel, port=26379.

调用sentinelIsRunning生成myid。

1
2
3
4
10468:X 22 Oct 2020 04:27:19.773 # Sentinel ID is 2ddc47e7af3cebe8b1642127ed72002bb918550c
10468:X 22 Oct 2020 04:27:19.773 # +monitor master mymaster 127.0.0.1 6379 quorum 2
10468:X 22 Oct 2020 04:29:33.291 # +sdown master mymaster 127.0.0.1 6379
10468:X 22 Oct 2020 04:29:33.353 # +odown master mymaster 127.0.0.1 6379 #quorum 2/2

调用sentinelStartFailover

1
2
10468:X 22 Oct 2020 04:29:33.353 # +new-epoch 1
10468:X 22 Oct 2020 04:29:33.353 # +try-failover master mymaster 127.0.0.1 6379

通过sentinelVoteLeader投了自己一票,这也会产生一条消息。

1
10468:X 22 Oct 2020 04:29:33.355 # +vote-for-leader 2ddc47e7af3cebe8b1642127ed72002bb918550c 1

sentinelReceiveIsMasterDownReply回调中收到了其他两个Sentinel的投票。

1
2
10468:X 22 Oct 2020 04:29:33.358 # 0833444de1d44b73ce6382af2ff0aeb21951b19a voted for 2ddc47e7af3cebe8b1642127ed72002bb918550c 1
10468:X 22 Oct 2020 04:29:33.359 # 568af73c42a5dbdeb4dfcdfc7545a63226b8d626 voted for 2ddc47e7af3cebe8b1642127ed72002bb918550c 1

sentinelFailoverWaitStartsentinelGetLeader中的轮询结束了,进入下面的failover-state-select-slave

1
2
10468:X 22 Oct 2020 04:29:33.426 # +elected-leader master mymaster 127.0.0.1 6379
10468:X 22 Oct 2020 04:29:33.426 # +failover-state-select-slave master mymaster 127.0.0.1 6379

选择了6579这个Slave

1
2
3
4
5
6
7
8
9
10
11
12
13
10468:X 22 Oct 2020 04:29:33.489 # +selected-slave slave 127.0.0.1:6579 127.0.0.1 6579 @ mymaster 127.0.0.1 6379
10468:X 22 Oct 2020 04:29:33.489 * +failover-state-send-slaveof-noone slave 127.0.0.1:6579 127.0.0.1 6579 @ mymaster 127.0.0.1 6379
10468:X 22 Oct 2020 04:29:33.548 * +failover-state-wait-promotion slave 127.0.0.1:6579 127.0.0.1 6579 @ mymaster 127.0.0.1 6379
10468:X 22 Oct 2020 04:29:34.235 # +promoted-slave slave 127.0.0.1:6579 127.0.0.1 6579 @ mymaster 127.0.0.1 6379
10468:X 22 Oct 2020 04:29:34.235 # +failover-state-reconf-slaves master mymaster 127.0.0.1 6379
10468:X 22 Oct 2020 04:29:34.291 * +slave-reconf-sent slave 127.0.0.1:6479 127.0.0.1 6479 @ mymaster 127.0.0.1 6379
10468:X 22 Oct 2020 04:29:34.485 # -odown master mymaster 127.0.0.1 6379
10468:X 22 Oct 2020 04:29:35.326 * +slave-reconf-inprog slave 127.0.0.1:6479 127.0.0.1 6479 @ mymaster 127.0.0.1 6379
10468:X 22 Oct 2020 04:29:35.326 * +slave-reconf-done slave 127.0.0.1:6479 127.0.0.1 6479 @ mymaster 127.0.0.1 6379
10468:X 22 Oct 2020 04:29:35.389 # +failover-end master mymaster 127.0.0.1 6379
10468:X 22 Oct 2020 04:29:35.389 # +switch-master mymaster 127.0.0.1 6379 127.0.0.1 6579
10468:X 22 Oct 2020 04:29:35.389 * +slave slave 127.0.0.1:6479 127.0.0.1 6479 @ mymaster 127.0.0.1 6579
10468:X 22 Oct 2020 04:29:35.389 * +slave slave 127.0.0.1:6379 127.0.0.1 6379 @ mymaster 127.0.0.1 6579

Sentinel1

观察Sentinel2,它是一个Follower

1
2
3
4
5
6
7
8
10473:X 22 Oct 2020 04:27:22.770 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
10473:X 22 Oct 2020 04:27:22.770 # Redis version=999.999.999, bits=64, commit=00000000, modified=0, pid=10473, just started
10473:X 22 Oct 2020 04:27:22.770 # Configuration loaded
10474:X 22 Oct 2020 04:27:22.771 * Increased maximum number of open files to 10032 (it was originally set to 1024).
10474:X 22 Oct 2020 04:27:22.772 * Running mode=sentinel, port=26479.
10474:X 22 Oct 2020 04:27:22.772 # Sentinel ID is 0833444de1d44b73ce6382af2ff0aeb21951b19a
10474:X 22 Oct 2020 04:27:22.772 # +monitor master mymaster 127.0.0.1 6379 quorum 2
10474:X 22 Oct 2020 04:29:33.357 # +new-epoch 1

投票给了Sentinel1

1
10474:X 22 Oct 2020 04:29:33.358 # +vote-for-leader 2ddc47e7af3cebe8b1642127ed72002bb918550c 1

即使已经票,Follower还是会执行sentinelCheckSubjectivelyDownsentinelCheckObjectivelyDown

1
2
10474:X 22 Oct 2020 04:29:33.395 # +sdown master mymaster 127.0.0.1 6379
10474:X 22 Oct 2020 04:29:33.457 # +odown master mymaster 127.0.0.1 6379 #quorum 3/2

sentinelStartFailoverIfNeeded函数中,发现已经有人在FailOver了(这是因为自己刚投完票),所以不会开启。

1
10474:X 22 Oct 2020 04:29:33.457 # Next failover delay: I will not start a failover before Thu Oct 22 04:35:33 2020

在PubSub中获得最新配置,处理函数sentinelProcessHelloMessage

1
10474:X 22 Oct 2020 04:29:34.291 # +config-update-from sentinel 2ddc47e7af3cebe8b1642127ed72002bb918550c 127.0.0.1 26379 @ mymaster 127.0.0.1 6379
1
2
3
10474:X 22 Oct 2020 04:29:34.291 # +switch-master mymaster 127.0.0.1 6379 127.0.0.1 6579
10474:X 22 Oct 2020 04:29:34.291 * +slave slave 127.0.0.1:6479 127.0.0.1 6479 @ mymaster 127.0.0.1 6579
10474:X 22 Oct 2020 04:29:34.291 * +slave slave 127.0.0.1:6379 127.0.0.1 6379 @ mymaster 127.0.0.1 6579

FailOver完毕的配置

观察最后的配置

1
2
3
4
5
6
7
# Sentinel
sentinel_masters:1
sentinel_tilt:0
sentinel_running_scripts:0
sentinel_scripts_queue_length:0
sentinel_simulate_failure_flags:0
master0:name=mymaster,status=ok,address=127.0.0.1:6579,slaves=2,sentinels=3

在当前版本下,还可以借助sentinel simfailure_flags命令模拟Crash。

Reference

  1. https://cloud.tencent.com/developer/article/1021467
  2. https://wenfh2020.com/2020/06/15/redis-sentinel-master-down/
    这个讲解比较完整