@SovietPower
2022-05-17T22:06:10.000000Z
字数 8314
阅读 967
Study
schedule:https://pdos.csail.mit.edu/6.824/schedule.html
中文视频:https://www.bilibili.com/video/BV1x7411M7Sf
Lab2参考:
实验介绍:
https://pdos.csail.mit.edu/6.824/labs/lab-raft.html
实现建议:
https://pdos.csail.mit.edu/6.824/labs/raft-structure.txt
https://thesquareplanet.com/blog/students-guide-to-raft/
关于锁:
https://pdos.csail.mit.edu/6.824/labs/raft-locking.txt
可能可参考:
https://zhuanlan.zhihu.com/p/264448558
https://zhuanlan.zhihu.com/p/476644274
https://zhuanlan.zhihu.com/p/388523882
https://github.com/Toma62299781/MIT-6.824-spring2021
介绍:
https://zhuanlan.zhihu.com/p/32052223
https://blog.csdn.net/qq_40378034/article/details/117404484
论文:https://pdos.csail.mit.edu/6.824/papers/raft-extended.pdf
论文翻译:https://zhuanlan.zhihu.com/p/343560811
currentTerm++
(所以孤立节点的Term
会很大)。代码可以用原始代码,也可用Lab1完成后的代码。要修改的内容在raft
文件夹中。
用deadlock
的Mutex
代替sync
,可在运行时进行死锁检测。
只给一两个int变量用的锁,可以换成atomic
。
测试不过时,可以看看test-test.go
的逻辑。
在src/raft
文件夹下:
部分测试
go test -run [测试部分] [-race]
测试部分如:2A 2B TestReElection2A...
全部测试
go test
多次测试
可以用go-test-many:
.\go-test-many.sh <测试次数> <并行数> <测试部分>
2A要完成选举,以及实现使用AppendEntries
的心跳。
Add the Figure 2 state for leader election to the Raft struct in raft.go. You'll also need to define a struct to hold information about each log entry.
首先根据论文的Figure 2完善Raft
的定义,然后定义LogEntry
。
在Make
中初始化Raft
,然后把GetState()
写完。
It's easiest to use time.Sleep() with a small constant argument to
drive the periodic checks. Don't use time.Ticker and time.Timer;
they are tricky to use correctly.
对于每个server,起一个线程,根据其当前的状态,执行三种可能的死循环(状态改变则退出):一种用来发送心跳(leader);一种定期检查是否有一段时间未收到心跳(follower),可能要进行状态转换;一种发起选举,等待结果(candidate),若有结果则改变状态,若超时则重新选举。
三个循环每次循环均要Sleep
某段时间。
但是这样写,到后面会不太方便:
// mainloop
func (rf *Raft) mainloop() {
for !rf.killed() {
switch rf.getState() {
case Follower:
rf.followerLoop()
break
case Candidate:
rf.candidateLoop()
break
case Leader:
rf.leaderLoop()
break
default:
println("!! Unknown state:", rf.state)
}
}
}
func (rf *Raft) followerLoop() {
rf.resetHBTimeoutAt()
for rf.getState() == Follower {
time.Sleep(HBCheckPeriod)
if rf.checkHBTimeout() {
if !rf.killed() {
rf.stateMu.Lock()
rf.becomeCandidate()
rf.stateMu.Unlock()
}
return
}
}
}
func (rf *Raft) candidateLoop() {
// become candidate just now.
for rf.getState() == Candidate {
time.Sleep(ElectionTimeoutPeriod)
rf.startElection()
}
}
Follower做的事也可以看做:等待心跳超时,超时后尝试成为Candidate、发起选举,如果等待期间或选举时收到heartbeat(或term更高的请求选票),则放弃,重新等待超时后再次尝试。
也就是将三个Loop写在一起,用"一段时间内是否收到heartbeat"替换超时时间。
因为是一个mainLoop、异步投票,可能要sleep一个完整的心跳超时时间,mainLoop才会退出candidateLoop、执行leaderLoop并发送心跳。
所以为了方便,在成为leader后就起一个线程,死循环发送心跳(直到不是leader)。但mainLoop要一直无事可做。
You must pick election timeouts (and thus heartbeat intervals) that are short enough that it's very likely that an election will complete in less than five seconds even if it requires multiple rounds.
The paper's Section 5.2 mentions election timeouts in the range of 150 to 300 milliseconds. Such a range only makes sense if the leader sends heartbeats considerably more often than once per 150 milliseconds. Because the tester limits you to 10 heartbeats per second, you will have to use an election timeout larger than the paper's 150 to 300 milliseconds, but not too large, because then you may fail to elect a leader within five seconds.
时间暂时设为:
leader发送心跳间隔100ms,心跳超时开始选举时间(election timeout)为600~1000ms,选举超时时间也即心跳超时时间。
根据Figure 2,完善RequestVote RPC
、AppendEntries RPC
及其相关结构体。
基本跟着流程写就好了。
注意:
atomic.AddInt64(), atomic.LoadInt64(), atomic.StoreInt64()
。int
和int32, int64
均不算同一类型,本机应该是用64。AppendEntries
也是异步的。AppendEntries
RPC Handler中,要检查如果当前不是Follower
(即是Candidate
),转为Follower
。votedFor
为-1:在每次被迫变为Follower时(当前是Candidate且收到term更高的请求);在第一次(实现没法,可以在每次)收到有效心跳时(标志着当前进行完一次选举)重设。logs
初始长度为1,logs[0]
为无效的空日志,所有server的logs[0]
都相同。call
调用RPC的地方不能加锁,因为call
不会立刻返回(如果无法通信,则等待超时),等待多个call
会很久。根据锁的种类和常用地方的不同,使用了多个锁,而不是只用一个rf.mu
:
{
state State
stateMu deadlock.Mutex // Lock to protect shared access to this peer's state. A mutex mainly for status
// persistent state on all servers
currentTerm int
votedFor int
logs []LogEntry
perStateMu deadlock.Mutex // A mutex mainly for persistent state(currentTerm, votedFor and logs)
// Be careful that stateMu MUST lock before perStateMu and unlock after perStateMu if used to avoid deadlock.
// volatile state on leaders
nextIndex []int // index of the next log entry to send to tha server (initialized to leader last log index+1)
matchIndex []int // index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)
leaderMu deadlock.Mutex // used for nextIndex, matchIndex
voteMu deadlock.Mutex // prevent tryElection while processing RequestVote() and vice versa
// voteMu MUST lock before mu, perStateMu and unlock after mu, perStateMu if used to avoid deadlock.
}
这样会比较非常麻烦,但应该性能更好?(一般都是靠rf.mu
锁整个结构体)
但是这样在2B有其它的问题:
尝试开始选举tryElection()
、正在给他人投票RequestVote()
,这两个之间是原子性操作,不能重合(因为在RequestVote()
结束前不能确定是否会投出票、重置超时时间),需要用锁voteMu
包住整个函数。
然后会有死锁(不太明白,已经保证了锁的先后顺序),虽然有deadlock检测,但还是很难查,所以还是放弃了,只用一个锁。
在之前的基础上,需要:
Start()
。election restriction
,即figure 2中的选举规则、投票时如何判定谁更新。这个在前面实现了。因为介绍很少,可以看看test_test.go
的测试:
TestBasicAgree2B
中用到三个config.go
函数:
make_config()
:初始化。
nCommitted()
:计算某个index的log被多少server提交了。会检查该log内容是否正确。
one()
:让leader发送一条日志,并检查其它服务器是否能同步commit。具体:枚举所有server,调用其rf.Start(cmd)
。若找到leader(Start()
返回true,并提交、向其它server同步日志cmd),则不断检查该日志cmd是否已被足够数量(expectedServers
)的服务器提交。若2s内未满足,则测试失败(若one
的参数retry
为true,则允许尝试寻找下一个可能的leader,不会立刻失败,但要在10s内完成)。
所以可知:TestBasicAgree2B
运行3个server,依次发送3个log,每次检查log是否被所有服务器提交(且log内容不能变,index要正确)。
由one()
可知,Start()
的参数为要提交的指令,返回值分别为:当前指令提交后的index(若为leader)(即lastLogIndex+1
)、当前term、当前server是否是leader。
func (rf *Raft) Start(command interface{}) (int, int, bool) {
// Your code here (2B).
rf.perStateMu.Lock()
defer rf.perStateMu.Unlock()
index := -1
term := rf.currentTerm
isLeader := rf.getState() == Leader
if isLeader {
// new log
index = rf.lastLogIndex + 1
rf.logs = append(rf.logs, LogEntry{index, rf.currentTerm, command})
rf.updateLastLog()
}
return index, term, isLeader
}
通过Start()
传递命令后:
下次心跳时,leader将新命令异步传给follower。leader需要检查传输结果,当大多数传输成功时,进行commit,更新commitIndex
。
在下一次心跳时,leader将新的LeaderCommit
传给follower,使follower也进行commit。
所以一个命令需要至少两次心跳,才可保证其被大多数server commit。
根据Figure 2的Rules for leader,继续完善“检查传输结果”部分:
- If last log index≥nextIndex for a follower: send AppendEntries RPC with log entries starting at nextIndex
- If successful: update nextIndex and matchIndex for follower
- If AppendEntries fails because of log inconsistency: decrement nextIndex and retry
- If there exists an N such that N > commitIndex, a majority of matchIndex[i]≥N, and log[N].term == currentTerm: set commitIndex = N
求a majority of matchIndex[i]≥N
的N
,可以给matchIndex
排序(如从小到大),取较小的中位数matchIndex[(n+1)/2]
即为N
。
但要注意me是最新的,但保证matchIndex[me]=0
上式就是正确的,,让它排到temp[0]
。可在初始化时将matchIndex[me]
设为math.MaxInt
,让它排到temp[0]
// If there exists an N such that N > commitIndex, a majority of matchIndex[i]≥N, and log[N].term == currentTerm: set commitIndex = N
temp := rf.matchIndex
sort.Ints(temp)
N := temp[(len(temp)-1)/2]
if N < len(rf.logs) && rf.logs[N].Term == rf.getCurrentTerm() {
rf.commit(N)
}
还有非leader的commit:
If commitIndex > lastApplied: increment lastApplied, apply log[lastApplied] to state machine.
Send each newly committed entry on applyCh(defined in Make) on each peer.
两个都可在每次修改commitIndex
时进行,即放到rf.commit()
中。
// Update CommitIndex and commit.
func (rf *Raft) commit(index int) {
rf.commitIndex = max(rf.commitIndex, index)
// If commitIndex > lastApplied: increment lastApplied, apply log[lastApplied] to state machine.
for rf.lastApplied < rf.commitIndex {
rf.lastApplied++
log := &rf.logs[rf.lastApplied]
rf.applyCh <- ApplyMsg{
CommandValid: true,
Command: log.Command,
CommandIndex: log.Index,
}
}
}
一个冲突问题:
sendAppendEntries()
的这条语句:
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
会读取args.Entries
。
而handler AppendEntries()
中的这句:
rf.logs = append(rf.logs[:conflictID], args.Entries[i:]...)
会写args.Entries
,导致race?
开始想的是在AppendEntriesArgs
加锁,但是Mutex的实现里有小写的结构体成员,不能用做RPC参数。。用atomic int32
又是忙等影响效率。
所以得copy一个??
temp := make([]LogEntry, len(args.Entries))
copy(temp, args.Entries)
...
rf.logs = append(rf.logs, temp[i:]...)
// Wrong: append(rf.logs, args.Entries[i:]...)