贴一个raft可视化网站,方便直观学习:Raft Consensus Algorithm
前置知识: **CAP:**一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)这三项中的两项。
选择:
raft笔记 相比于Paxos,一个易于理解的共识算法(共识即实现复制状态机:),并将其分解为以下三个子问题,主要有两钟RPC:Vote RPC和AppendEntries RPC
领导者选举 leader follow candicate
每个follow只有一张选票,先来先得原则
follow什么时候会投票:requestVote rpc的任期号大于自己,否则requestVote rpc最新日志不比自己旧(日志索引+日志任期号 确定一个日志entry)
只有candicate的日志和大多数的follow一样新时,即包含之前任期提交过的日志 才会能当选leader
什么时候当选leader,超过一半选票(为什么节点为奇数—在发生网络分区的时候…)
1 2 3 4 5 6 7 8 9 10 11 12 struct RequestVote { int term; int candidateId; int lastLogIndex; int lastLogTerm; }; struct ResponseVote { int term; bool voteGranted; };
这里有个细节:
当多个节点同时选举时就可能出现没有leader,raft引入随机超时选举机制来避免了活锁问题
日志复制
提交:执行日志指令到状态机,只有日志复制到多数节点 ,并且是当期 的日志才能提交
一致性检查 :在appendentries rpc中放入前一个日志索引和任期,follow找不到该日志则拒绝,leader发送前一个,直到相同。因此将通过一致性检查来确保状态一致:
当前日志任期不同时:
leader只会改变follow的日志中日志任期<=自己的,并且删除包含上一个日志任期到当前日志任期 的之间的follow 的日志
当前日志任期相同,索引不同时:
补缺删增
其余情况在领导选举时便已经排除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 struct AppendEntriesRequest { int term; int leader; int preLogIndex; int preLogTerm; repeated entries; int leaderCommit; }; struct AppendEntriesResponse { int term; bool success; };
安全性
选举限制: 即leader选举中一定包含之前任期的所有被提交日志条,最后日志任期号大的日志更新 ,否则日志索引更大的日志更新
提交限制 只有当前任期的日志才能提交,满足这个条件才会提交之前任期的日志;倘若leader当前任期为7,没有任期7的日志,提交了任期4的日志,大多数完成提交,而有一个恢复的包含任期5、6却不包含4的机器当选leader后就会删掉follower中已提交的4的日志,造成一致性失败
宕机处理: follower或candidate崩溃,发送给他们的requestVote和appendEntries和无限重复
时间限制: 广播时间 << 选举超时时间 << 平均故障时间
其他 集群成员变更 使用更加简单的单节点集群成员变更,每次只变更一个节点,防止脑裂,这样新旧配置集群一定有重合
三个特征 strong leader leader election membership changes
日志压缩 使用快照,当日志占用空间太多,一个key只保留最新的一份value
解答: raft通过集群和复制提高了服务的可用性,通过共识模块和日志实现一致性,在CAP中,存在网络分区时raft更坚定于一致性,牺牲可用性
raft并不适用于流量密集型服务,更适合一致性至关重要的低流量场景,他不是为高吞吐量,细粒度服务,更适合实现锁服务器,为更高级别的协议选举领导者、在分布式系统中复制关键配置数据等等,在一些极端情况下,发生网络分区节点不断超时选举,恢复时leader被迫成为follower再次选举,可见为了保持一致性raft牺牲了一些效率
Raft实现 分为四个模块实现,分别是领导选举 ,日志复制 ,持久化 和键值数据库
leader election 从上面的vote结构中,一个实现elect的raft需要这些成员
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 std::vector<std::shared_ptr<RaftRpcUtil>> m_peers; int m_me; int m_currenTerm; int m_voteFor; int m_lastSnapshotIncludeIndex;int m_lastSnapshotIncludeTerm;enum Status { Follower, Candidate, Leader }; Status m_status; std::chrono::_V2::system_clock::time_point m_lastResetElectionTime;
有了这些基本信息就可以开始进行选举了,什么时候自己进行选举,当超过一定时间没有接收到leader发来的心跳包时,就会进入选举状态,这个时间从收到心跳包开始到结束是随机的,即在一段区间内生成随机超时时间,每个节点的超时时间都不一样,在electionTimeOutTicker中,判断suitableSleepTime是否还有剩余时间,有的话继续休眠,ticker触发的时间waketime肯定大于上次重置超时时间时的系统时间m_lastResetHearBeatTime,如果休眠结束后发现要比waketime大,说明这期间收到leader的心跳包了,否则超时进入选举状态,doElection,心跳的超时时间也是同理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 void Raft::electionTimeOutTicker () { while (true ) { while (m_status == Leader) { usleep (HeartBeatTimeout); } std::chrono::duration<signed long int , std::nano> suitableSleepTime{}; std::chrono::system_clock::time_point wakeTime{}; { m_mtx.lock (); wakeTime = now (); suitableSleepTime = getRandomizedElectionTimeout () + m_lastResetElectionTime - wakeTime; m_mtx.unlock (); } if (std::chrono::duration <double , std::milli>(suitableSleepTime).count () > 1 ) { auto start = std::chrono::steady_clock::now (); usleep (std::chrono::duration_cast <std::chrono::microseconds>(suitableSleepTime).count ()); auto end = std::chrono::steady_clock::now (); std::chrono::duration<double , std::milli> duration = end - start; std::cout << "\033[1;35m electionTimeOutTicker();函数设置睡眠时间为: " << std::chrono::duration_cast <std::chrono::milliseconds>(suitableSleepTime).count () << " 毫秒\033[0m" << std::endl; std::cout << "\033[1;35m electionTimeOutTicker();函数实际睡眠时间为: " << duration.count () << " 毫秒\033[0m" << std::endl; } if (std::chrono::duration <double , std::milli>(m_lastResetElectionTime - wakeTime).count () > 0 ) { continue ; } doElection (); } }
进入竞选状态,而doElection做的是开始向其他节点发送rpc请求,参数设置即RequestVote所需的四个参数,创建一个子线程来进行节点间通信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 for (int i = 0 ; i < m_peers.size (); i++){ if (i == m_me) continue ; int lastLogIndex = -1 , lastLogTerm = -1 ; getLastLogIndexAndTerm (&lastLogIndex, &lastLogTerm); std::shared_ptr<raftRpcProctoc::RequestVoteArgs> requestVoteArgs = std::make_shared <raftRpcProctoc::RequestVoteArgs>(); requestVoteArgs->set_term (m_currentTerm); requestVoteArgs->set_candidateid (m_me); requestVoteArgs->set_lastlogindex (lastLogIndex); requestVoteArgs->set_lastlogterm (lastLogTerm); auto requestVoteReply = std::make_shared <raftRpcProctoc::RequestVoteReply>(); std::thread t (&Raft::sendRequestVote, this , i, requestVoteArgs, requestVoteReply, votedNum) ; t.detach (); }
在子线程中执行sendRequestVote,利用rpc通信屏屏蔽通信细节,就好像在本地调用一样获取投票结果回应,然后进行判断
是否对方Term大,是则退出选举状态,重置状态
此时投票人应该已经修改Term和candidate一样,voteNum++
投票达到大多数,成为leader
开始进行日志复制
持久化操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 bool ok = m_peers[server]->RequestVote (args.get (), reply.get ());... std::lock_guad<std::mutex> lokc (m_mtx) ;if (reply->term () > m_currenTerm){ m_status = Follower; m_currenTerm = reply->term (); m_voteFor = -1 ; persisit (); return true ; } *voteNum = *votedNum + 1 ; if (*votedNum >= m_peers.size () / 2 + 1 ){ *votedNum = 0 ; m_status = Leader; int lastLogIndex = getLastLogIndex (); for (int i = 0 ; i < m_nextIndex.size (); i++) { m_nextIndex[i] = lastLogIndex + 1 ; m_matchIndex[i] = 0 ; } std::thread t (&Raft::doHeartBeat, this ) ; t.detach (); persist (); }
log replication 心跳机制跟日志复制一样,只是没有日志实体
实现日志复制我们主要需要这些成员信息,m_matchIndex和m_nextIndex在可视化中可以明显的看到是每个LogEntry右小角小黑点 和箭头,所以在日志复制中这两个成员变量很关键 ,同时随通信进行,收到reply后m_nextIndex和m_matchIndex前进,如果要进行日志复制优化就是实现快速查找m_nextIndex
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 int m_currenTerm; std::vector<raftRpcProctoc::LogEntry> m_logs; int m_commitIndex; int m_lastApplied; std::vector<int > m_nextIndex; std::vector<int > m_matchIndex std::shared_ptr<LockQueue<ApplyMsg>> applyChan; std::chrono::_V2::system_clock::time_point m_lastResetHearBeatTime; int m_lastSnapshotIncludeIndex;int m_lastSnapshotIncludeTerm;
上面当选leader后创建子线程调用doHeartBeat,同doElection一样,这里的参数稍微复杂,因为实现了日志压缩模块,即将一部分日志体压缩成快照了,需要一个appendNums,match为0,nextIndex为lastLogIndex的下一个
判断如果nextIndex<=快照,直接发送快照
否则根据快照index获取m_logs日志项
创建线程调用sendAppendEntries
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 auto appendNums = std::make_shared <int >(1 ); for (int i = 0 ; i < m_peers.size (); ++i){ if (m_nextIndex[i] <= m_lastSnapshotIncludeIndex) { std::thread t (&Raft::leaderSendSnapShot, this , i) ; t.detach (); continue ; } int preLogIndex = -1 ; int PreLogTerm = -1 ; getPreLogInfo (i, &preLogIndex, &PreLogTerm); std::shared_ptr<raft::raftRpcProctoc::AppendEntriesArgs> appendEntriesArgs = std::make_shared <raftRpcProctoc::AppendEntriesArgs>(); appendEntriesArgs->set_term (m_currenTerm); appendEntriesArgs->set_leaderid (pm_me); appendEntriesArgs->set_prelogindex (preLogIndex); appendEntriesArgs->set_prelogterm (PreLogTerm); appendEntriesArgs->clear_entries (); appendEntriesArgs->set_leadercommit (m_commitIndex); if (preLogIndex != m_lastSnapshotIncludeIndex) { for (int j = getSlicesIndexFromLogIndex (preLogIndex) + 1 ; j < m_logs.size (); ++j) { raftRpcProctoc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries (); *sendEntryPtr = m_logs[j]; } } else { for (const auto &item : m_logs) { raftRpcProctoc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries (); *sendEntryPtr = item; } } int lastLogIndex = getLastLogIndex (); myAssert (appendEntriesArgs->prevlogindex () + appendEntriesArgs->entries_size () == lastLogIndex, format("appendEntriesArgs.PrevLogIndex{%d}+len(appendEntriesArgs.Entries){%d} != lastLogIndex{%d}" , appendEntriesArgs->prevlogindex (), appendEntriesArgs->entries_size (), lastLogIndex)); const std::shared_ptr<raftRpcProctoc::AppendEntriesReply> appendEntriesReply = std::make_shared <raftRpcProctoc::AppendEntriesReply>(); appendEntriesReply->set_appstate (Disconnected); std::thread t (&Raft::sendAppendEntries, this , i, appendEntriesArgs, appendEntriesReply, appendNums) ; t.detach (); } m_lastResetHearBeatTime = now ();
而后sendAppendEntries也是类似,远程调用,进行判断,这里的reply比上面原理中多出两个类型,用来快速调整nextIndex和标识节点网络状态,AppState没什么必要,收不到回复就一直重发就好了
int32 UpdateNextIndex=3;
int32 AppState=4;
还是首先判断对方Term,大则说明自己已经过期,回到follower
判断受否复制成功,否,则什么也不做
判断有无UpdateNextIndex,有,说明不匹配,更新nextIndex
appendNums++,当大多数复制成功,则提交
当提交的日志属于currentTerm才可以更新m_commitIndex
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 bool ok = m_peers[server]->AppendEntries ((args.get (), reply.get ()));if (!reply->success ()){ if (reply->updatenextindex () != -100 ) m_nextIndex[server] = reply->updatenextindex (); else { *appendNums = *appendNums + 1 ; m_matchIndex[server] = std::max (m_matchIndex[server], args->prevlogindex () + args->entries_size ()); m_nextIndex[server] = m_matchIndex[server] + 1 ; int lastLogIndex = getLastLogIndex (); if (*appendNums >= 1 + m_peers.size () / 2 ) *appendNums = 0 ; if (args->entries_size () > 0 && args->entries (args->entries_size () - 1 ).logterm () == m_currentTerm) m_commitIndex = std::max (m_commitIndex, args->prevlogindex () + args->entries_size ()); } }
重写rpc方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void Raft::RequestVote (google::protobuf::RpcController *controller, const ::raftRpcProctoc::RequestVoteArgs *request, ::raftRpcProctoc::RequestVoteReply *response, ::google::protobuf::Closure *done) { RequestVote (request, response); done->Run (); } void Raft::AppendEntries (google::protobuf::RpcController *controller, const ::raftRpcProctoc::AppendEntriesArgs *request, ::raftRpcProctoc::AppendEntriesReply *response, ::google::protobuf::Closure *done) { AppendEntries1 (request, response); done->Run (); } void Raft::InstallSnapshot (google::protobuf::RpcController *controller, const ::raftRpcProctoc::InstallSnapshotRequest *request, ::raftRpcProctoc::InstallSnapshotResponse *response, ::google::protobuf::Closure *done) { InstallSnapshot (request, response); done->Run (); }
rpc方法调用
1 2 3 4 5 6 7 8 9 10 11 12 13 bool raftRpcUtil::AppendEntries (raftRpcProctoc::AppendEntriesArgs *args, raftRpcProctoc::AppendEntriesReply *response) { MprpcController controller; stub_->AppendEntries (&controller, args, response, nullptr ); return !controller.Failed (); } bool raftRpcUtil::RequestVote (raftRpcProctoc::RequestVoteArgs *args, raftRpcProctoc::RequestVoteReply *response) { MprpcController controller; stub_->RequestVote (&controller, args, response, nullptr ); return !controller.Failed (); }
raft是一种共识算法,目的实现分布式一致性,为实现目的采用了强领导模型,因此构建raft的关键模块是随机的选举计时器
follower and candidate始终运行一个runElectionTimer协程,electionTimeout为150~300ms,这还使用10毫秒的ticker负责判断状态,以快速响应该节点的状态变化,同一时间可能运行多个runElectionTimer协程,通过term判断旧的退出
leader运行一个协程,每50ms执行leaderSendheartbeats