贴一个raft可视化网站,方便直观学习:Raft Consensus Algorithm

前置知识:

**CAP:**一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)这三项中的两项。

选择:

  • 当一套系统在发生分区故障后,客户端的任何请求都被卡死或者超时,但是,系统的每个节点总是会返回一致的数据,则这套系统就是 CP 系统,经典的比如 Zookeeper。

  • 如果一套系统发生分区故障后,客户端依然可以访问系统,但是获取的数据有的是新的数据,有的还是老数据,那么这套系统就是 AP 系统,经典的比如 Eureka。

raft笔记

相比于Paxos,一个易于理解的共识算法(共识即实现复制状态机:),并将其分解为以下三个子问题,主要有两钟RPC:Vote RPC和AppendEntries RPC

领导者选举

leader follow candicate

  1. 每个follow只有一张选票,先来先得原则
  2. follow什么时候会投票:requestVote rpc的任期号大于自己,否则requestVote rpc最新日志不比自己旧(日志索引+日志任期号 确定一个日志entry)
  3. 只有candicate的日志和大多数的follow一样新时,即包含之前任期提交过的日志才会能当选leader
  4. 什么时候当选leader,超过一半选票(为什么节点为奇数—在发生网络分区的时候…)
1
2
3
4
5
6
7
8
9
10
11
12
struct RequestVote
{
int term; // 当前任期号
int candidateId; // 自己的id
int lastLogIndex; // 自己最后一个日志索引
int lastLogTerm; // 自己最后一个日志的任期号
};
struct ResponseVote
{
int term; // 自己当前任期号
bool voteGranted; // 是否投票
};

这里有个细节:

当多个节点同时选举时就可能出现没有leader,raft引入随机超时选举机制来避免了活锁问题

日志复制

  • 提交:执行日志指令到状态机,只有日志复制到多数节点,并且是当期的日志才能提交
  • 一致性检查:在appendentries rpc中放入前一个日志索引和任期,follow找不到该日志则拒绝,leader发送前一个,直到相同。因此将通过一致性检查来确保状态一致:

当前日志任期不同时:

​ leader只会改变follow的日志中日志任期<=自己的,并且删除包含上一个日志任期到当前日志任期的之间的follow的日志

当前日志任期相同,索引不同时:

​ 补缺删增

其余情况在领导选举时便已经排除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct AppendEntriesRequest
{
int term; // 任期号
int leader; // 自己id
int preLogIndex; // 前一个日志索引
int preLogTerm; // 前一个日志任期号
repeated entries; //
int leaderCommit; // 最新已提交日志号
};
struct AppendEntriesResponse
{
int term; // 任期号
bool success; // 是否复制成功
};

安全性

  1. 选举限制:即leader选举中一定包含之前任期的所有被提交日志条,最后日志任期号大的日志更,否则日志索引更大的日志更
  2. 提交限制只有当前任期的日志才能提交,满足这个条件才会提交之前任期的日志;倘若leader当前任期为7,没有任期7的日志,提交了任期4的日志,大多数完成提交,而有一个恢复的包含任期5、6却不包含4的机器当选leader后就会删掉follower中已提交的4的日志,造成一致性失败
  3. 宕机处理:follower或candidate崩溃,发送给他们的requestVote和appendEntries和无限重复
  4. 时间限制:广播时间 << 选举超时时间 << 平均故障时间

其他

集群成员变更

使用更加简单的单节点集群成员变更,每次只变更一个节点,防止脑裂,这样新旧配置集群一定有重合

三个特征

strong leader leader election membership changes

日志压缩

使用快照,当日志占用空间太多,一个key只保留最新的一份value

解答:

raft通过集群和复制提高了服务的可用性,通过共识模块和日志实现一致性,在CAP中,存在网络分区时raft更坚定于一致性,牺牲可用性

raft并不适用于流量密集型服务,更适合一致性至关重要的低流量场景,他不是为高吞吐量,细粒度服务,更适合实现锁服务器,为更高级别的协议选举领导者、在分布式系统中复制关键配置数据等等,在一些极端情况下,发生网络分区节点不断超时选举,恢复时leader被迫成为follower再次选举,可见为了保持一致性raft牺牲了一些效率

Raft实现

分为四个模块实现,分别是领导选举日志复制持久化键值数据库

leader election

从上面的vote结构中,一个实现elect的raft需要这些成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
std::vector<std::shared_ptr<RaftRpcUtil>> m_peers;	// 每一个其他节点工具类,用来实现节点间通信
int m_me; // 我的id
int m_currenTerm; // 当前任期
int m_voteFor; // 我给谁投票
// 储存了快照中的最后一个日志的Index和Term
int m_lastSnapshotIncludeIndex;
int m_lastSnapshotIncludeTerm;
// 三种状态
enum Status
{
Follower,
Candidate,
Leader
};
Status m_status; // 自己的状态
// 选举超时
std::chrono::_V2::system_clock::time_point m_lastResetElectionTime;

有了这些基本信息就可以开始进行选举了,什么时候自己进行选举,当超过一定时间没有接收到leader发来的心跳包时,就会进入选举状态,这个时间从收到心跳包开始到结束是随机的,即在一段区间内生成随机超时时间,每个节点的超时时间都不一样,在electionTimeOutTicker中,判断suitableSleepTime是否还有剩余时间,有的话继续休眠,ticker触发的时间waketime肯定大于上次重置超时时间时的系统时间m_lastResetHearBeatTime,如果休眠结束后发现要比waketime大,说明这期间收到leader的心跳包了,否则超时进入选举状态,doElection,心跳的超时时间也是同理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void Raft::electionTimeOutTicker()
{
while (true)
{
while (m_status == Leader)
{
usleep(HeartBeatTimeout);
}
std::chrono::duration<signed long int, std::nano> suitableSleepTime{};
std::chrono::system_clock::time_point wakeTime{};
{
m_mtx.lock();
wakeTime = now();
suitableSleepTime = getRandomizedElectionTimeout() + m_lastResetElectionTime - wakeTime;
m_mtx.unlock();
}

if (std::chrono::duration<double, std::milli>(suitableSleepTime).count() > 1)
{
// 获取当前时间点
auto start = std::chrono::steady_clock::now();

usleep(std::chrono::duration_cast<std::chrono::microseconds>(suitableSleepTime).count());
// std::this_thread::sleep_for(suitableSleepTime);

// 获取函数运行结束后的时间点
auto end = std::chrono::steady_clock::now();

// 计算时间差并输出结果(单位为毫秒)
std::chrono::duration<double, std::milli> duration = end - start;

// 使用ANSI控制序列将输出颜色修改为紫色
std::cout << "\033[1;35m electionTimeOutTicker();函数设置睡眠时间为: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(suitableSleepTime).count() << " 毫秒\033[0m"
<< std::endl;
std::cout << "\033[1;35m electionTimeOutTicker();函数实际睡眠时间为: " << duration.count() << " 毫秒\033[0m"
<< std::endl;
}

if (std::chrono::duration<double, std::milli>(m_lastResetElectionTime - wakeTime).count() > 0)
{
// 说明睡眠的这段时间有重置定时器,那么就没有超时,再次睡眠
continue;
}
doElection();
}
}

进入竞选状态,而doElection做的是开始向其他节点发送rpc请求,参数设置即RequestVote所需的四个参数,创建一个子线程来进行节点间通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for (int i = 0; i < m_peers.size(); i++)
{
if (i == m_me) continue;
int lastLogIndex = -1, lastLogTerm = -1;
getLastLogIndexAndTerm(&lastLogIndex, &lastLogTerm); // 获取最后一个log的term和下标

std::shared_ptr<raftRpcProctoc::RequestVoteArgs> requestVoteArgs =
std::make_shared<raftRpcProctoc::RequestVoteArgs>();
requestVoteArgs->set_term(m_currentTerm);
requestVoteArgs->set_candidateid(m_me);
requestVoteArgs->set_lastlogindex(lastLogIndex);
requestVoteArgs->set_lastlogterm(lastLogTerm);
auto requestVoteReply = std::make_shared<raftRpcProctoc::RequestVoteReply>();
std::thread t(&Raft::sendRequestVote, this, i, requestVoteArgs, requestVoteReply,
votedNum); // 创建新线程并执行b函数,并传递参数
t.detach();
}

在子线程中执行sendRequestVote,利用rpc通信屏屏蔽通信细节,就好像在本地调用一样获取投票结果回应,然后进行判断

  1. 是否对方Term大,是则退出选举状态,重置状态
  2. 此时投票人应该已经修改Term和candidate一样,voteNum++
  3. 投票达到大多数,成为leader
  4. 开始进行日志复制
  5. 持久化操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
bool ok = m_peers[server]->RequestVote(args.get(), reply.get());
...
std::lock_guad<std::mutex> lokc(m_mtx);
if (reply->term() > m_currenTerm)
{
m_status = Follower;
m_currenTerm = reply->term();
m_voteFor = -1;
persisit();
return true;
}
*voteNum = *votedNum + 1;
if (*votedNum >= m_peers.size() / 2 + 1)
{
*votedNum = 0;
m_status = Leader;
int lastLogIndex = getLastLogIndex();
for (int i = 0; i < m_nextIndex.size(); i++)
{
m_nextIndex[i] = lastLogIndex + 1; // 有效下标从1开始,因此要+1
m_matchIndex[i] = 0; // 每换一个领导都是从0开始,见fig2
}
std::thread t(&Raft::doHeartBeat, this); // 马上向其他节点宣告自己就是leader
t.detach();

persist();
}

log replication

心跳机制跟日志复制一样,只是没有日志实体

实现日志复制我们主要需要这些成员信息,m_matchIndex和m_nextIndex在可视化中可以明显的看到是每个LogEntry右小角小黑点和箭头,所以在日志复制中这两个成员变量很关键,同时随通信进行,收到reply后m_nextIndex和m_matchIndex前进,如果要进行日志复制优化就是实现快速查找m_nextIndex

image-20241109210221949

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int m_currenTerm;		// 任期号
std::vector<raftRpcProctoc::LogEntry> m_logs; // 日志体
int m_commitIndex; // 最后一个提交索引
int m_lastApplied; // 最后一个

std::vector<int> m_nextIndex; // 每个节点需要进行日志复制的index
std::vector<int> m_matchIndex // 每个节点已经完成日志复制的index

std::shared_ptr<LockQueue<ApplyMsg>> applyChan; // client从这里取日志(2B),client与raft通信的接口
// ApplyMsgQueue chan ApplyMsg // raft内部使用的chan,applyChan是用于和服务层交互,最后好像没用上

// 心跳超时,用于leader
std::chrono::_V2::system_clock::time_point m_lastResetHearBeatTime;

// 储存了快照中的最后一个日志的Index和Term
int m_lastSnapshotIncludeIndex;
int m_lastSnapshotIncludeTerm;

上面当选leader后创建子线程调用doHeartBeat,同doElection一样,这里的参数稍微复杂,因为实现了日志压缩模块,即将一部分日志体压缩成快照了,需要一个appendNums,match为0,nextIndex为lastLogIndex的下一个

  1. 判断如果nextIndex<=快照,直接发送快照
  2. 否则根据快照index获取m_logs日志项
  3. 创建线程调用sendAppendEntries
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
auto appendNums = std::make_shared<int>(1); // 正确返回的节点的数量
for (int i = 0; i < m_peers.size(); ++i)
{
// 日志压缩加入后要判断是发送快照还是发送AE
if (m_nextIndex[i] <= m_lastSnapshotIncludeIndex)
{
std::thread t(&Raft::leaderSendSnapShot, this, i); // 创建新线程并执行b函数,并传递参数
t.detach();
continue;
}

int preLogIndex = -1;
int PreLogTerm = -1;
getPreLogInfo(i, &preLogIndex, &PreLogTerm);
std::shared_ptr<raft::raftRpcProctoc::AppendEntriesArgs> appendEntriesArgs =
std::make_shared<raftRpcProctoc::AppendEntriesArgs>();
appendEntriesArgs->set_term(m_currenTerm);
appendEntriesArgs->set_leaderid(pm_me);
appendEntriesArgs->set_prelogindex(preLogIndex);
appendEntriesArgs->set_prelogterm(PreLogTerm);
appendEntriesArgs->clear_entries();
appendEntriesArgs->set_leadercommit(m_commitIndex);
if (preLogIndex != m_lastSnapshotIncludeIndex)
{
for (int j = getSlicesIndexFromLogIndex(preLogIndex) + 1; j < m_logs.size(); ++j)
{
raftRpcProctoc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries();
*sendEntryPtr = m_logs[j];
}
}
else
{
for (const auto &item : m_logs)
{
raftRpcProctoc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries();
*sendEntryPtr = item;
}
}
int lastLogIndex = getLastLogIndex();
// leader对每个节点发送的日志长短不一,但是都保证从prevIndex发送直到最后
myAssert(appendEntriesArgs->prevlogindex() + appendEntriesArgs->entries_size() == lastLogIndex,
format("appendEntriesArgs.PrevLogIndex{%d}+len(appendEntriesArgs.Entries){%d} != lastLogIndex{%d}",
appendEntriesArgs->prevlogindex(), appendEntriesArgs->entries_size(), lastLogIndex));
// 构造返回值
const std::shared_ptr<raftRpcProctoc::AppendEntriesReply> appendEntriesReply =
std::make_shared<raftRpcProctoc::AppendEntriesReply>();
appendEntriesReply->set_appstate(Disconnected);

std::thread t(&Raft::sendAppendEntries, this, i, appendEntriesArgs, appendEntriesReply,
appendNums); // 创建新线程并执行b函数,并传递参数
t.detach();
}
m_lastResetHearBeatTime = now(); // leader发送心跳,就不是随机时间了

而后sendAppendEntries也是类似,远程调用,进行判断,这里的reply比上面原理中多出两个类型,用来快速调整nextIndex和标识节点网络状态,AppState没什么必要,收不到回复就一直重发就好了

int32 UpdateNextIndex=3;

int32 AppState=4;

  1. 还是首先判断对方Term,大则说明自己已经过期,回到follower
  2. 判断受否复制成功,否,则什么也不做
  3. 判断有无UpdateNextIndex,有,说明不匹配,更新nextIndex
  4. appendNums++,当大多数复制成功,则提交
  5. 当提交的日志属于currentTerm才可以更新m_commitIndex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bool ok = m_peers[server]->AppendEntries((args.get(), reply.get()));
if (!reply->success())
{
if (reply->updatenextindex() != -100) m_nextIndex[server] = reply->updatenextindex(); // 失败是不更新mathIndex的
else
{
*appendNums = *appendNums + 1;
m_matchIndex[server] = std::max(m_matchIndex[server], args->prevlogindex() + args->entries_size());
m_nextIndex[server] = m_matchIndex[server] + 1;
int lastLogIndex = getLastLogIndex();

if (*appendNums >= 1 + m_peers.size() / 2)
*appendNums = 0;
if (args->entries_size() > 0 && args->entries(args->entries_size() - 1).logterm() == m_currentTerm)
m_commitIndex = std::max(m_commitIndex, args->prevlogindex() + args->entries_size());
}
}

重写rpc方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 重写rpc方法
void Raft::RequestVote(google::protobuf::RpcController *controller, const ::raftRpcProctoc::RequestVoteArgs *request,
::raftRpcProctoc::RequestVoteReply *response, ::google::protobuf::Closure *done)
{
RequestVote(request, response);
done->Run();
}

void Raft::AppendEntries(google::protobuf::RpcController *controller,
const ::raftRpcProctoc::AppendEntriesArgs *request,
::raftRpcProctoc::AppendEntriesReply *response, ::google::protobuf::Closure *done)
{
AppendEntries1(request, response);
done->Run();
}
void Raft::InstallSnapshot(google::protobuf::RpcController *controller,
const ::raftRpcProctoc::InstallSnapshotRequest *request,
::raftRpcProctoc::InstallSnapshotResponse *response, ::google::protobuf::Closure *done)
{
InstallSnapshot(request, response);

done->Run();
}

rpc方法调用

1
2
3
4
5
6
7
8
9
10
11
12
13
bool raftRpcUtil::AppendEntries(raftRpcProctoc::AppendEntriesArgs *args, raftRpcProctoc::AppendEntriesReply *response)
{
MprpcController controller;
stub_->AppendEntries(&controller, args, response, nullptr);
return !controller.Failed();
}

bool raftRpcUtil::RequestVote(raftRpcProctoc::RequestVoteArgs *args, raftRpcProctoc::RequestVoteReply *response)
{
MprpcController controller;
stub_->RequestVote(&controller, args, response, nullptr);
return !controller.Failed();
}

raft是一种共识算法,目的实现分布式一致性,为实现目的采用了强领导模型,因此构建raft的关键模块是随机的选举计时器

follower and candidate始终运行一个runElectionTimer协程,electionTimeout为150~300ms,这还使用10毫秒的ticker负责判断状态,以快速响应该节点的状态变化,同一时间可能运行多个runElectionTimer协程,通过term判断旧的退出

leader运行一个协程,每50ms执行leaderSendheartbeats