Raft(A consensus algorithm)

参考链接

MIT-6.824课程里面给的raft论文是个扩展版本 <In Search of an Understandable Consensus Algorithm (Extended Version)>.

Notes

Labs

相比Paxos, Raft算法更强调可以理解性。Raft算法更符合工程师按照直觉涉及出来的算法:

  1. 我们要选一个leader出来,由这个leader决定要做什么事情
  2. 这个leader肯定是要多数servers选举出来的(quorum).
  3. 每个leader都有一个任期(term), 每次选举都会将term+1.
  4. leader election的选择标准则是根据两个指标决定的
    • 接受到的日志里面,谁接受到的最大term更大,谁就是leader
    • 如果最大term都一样的话,那么谁收到的日志数量更多,谁就是leader
  5. 日志是一个数组,维护着三个下标:
    • 接收下标 lastReceived. 收到一条日志就会+1, 但是有可能会回滚。
    • 提交下标 lastCommitted. leader会定时将自己确定commited的下标发送给followers, follower只需要更新到最大的下标就行。绝对不会回滚。
    • 执行下标 lastApplied. 这个下标相当于追随lastCommitted,只要committed的话就可以执行了。
  6. 如何决定某个log entry是否应该commmit
    • 在正常情况下,commit与否是由leader来判断的。只要leader确定有majority收到了某个log entry,那么就认为是commit了。
    • 但是如果期间出现leader election的话,新的leader上来会提交一个no-op的log entry. 如果确定这个log entry被commit的话,那么之前的log肯定也被commit了。
    • followers在接受log的时候,如果发现lastReceived和leader发送的log index对不上的话,可以要求补齐。

论文里面还提到了许多其他实现问题比如:

  1. 实现动态membership change
  2. 实现完整的状态(snapshot)传输
  3. 随机化选举超时时间来减少split votes的效果分析

Rafe FAQ

how does a server learn about newly elected leader?
  new leader sees yes votes from majority
  others see AppendEntries heart-beats with a higher term number
    i.e. from the new leader
  the heart-beats suppress any new election

what if old leader isn't aware a new leader is elected?
  perhaps old leader didn't see election messages
  perhaps old leader is in a minority network partition
  new leader means a majority of servers have incremented currentTerm
    so old leader (w/ old term) can't get majority for AppendEntries
    so old leader won't commit or execute any new log entries
    thus no split brain
    but a minority may accept old server's AppendEntries
      so logs may diverge at end of old term


Q: Does Raft sacrifice anything for simplicity?

A: Raft gives up some performance in return for clarity; for example:

- Every operation must be written to disk for persistence; performance
  probably requires batching many operations into each disk write.

- There can only usefully be a single AppendEntries in flight from the
  leader to each follower: followers reject out-of-order
  AppendEntries, and the sender's nextIndex[] mechanism requires
  one-at-a-time. A provision for pipelining many AppendEntries would
  be better.

- Servers may not be able to take much advantage of multi-core because
  operations must be executed one at a time (in log order).

下面一段文字是关于Raft和Paxos等共识算法在实际应用中的比较

Q: Is Raft used in real-world software, or do companies generally roll
their own flavor of Paxos (or use a different consensus protocol)?

A: There are several real-world users of Raft: Docker
(https://docs.docker.com/engine/swarm/raft/), etcd (https://etcd.io),
and MongoDB. Other systems said to be using Raft include CockroachDB,
RethinkDB, and TiKV. Maybe you can find more starting at
http://raft.github.io/

On the other hand, my impression is that most state-machine
replication systems are based on the Multi-Paxos and Viewstamped
Replication protocols.

Q: What is Paxos? In what sense is Raft simpler?

A: There is a protocol called Paxos that allows a set of servers to
agree on a single value. While Paxos requires some thought to
understand, it is far simpler than Raft. Here's an easy-to-read paper
about Paxos:

  http://css.csail.mit.edu/6.824/2014/papers/paxos-simple.pdf

However, Paxos solves a much smaller problem than Raft. To build a
real-world replicated service, the replicas need to agree on an
indefinite sequence of values (the client commands), and they need
ways to efficiently recover when servers crash and restart or miss
messages. People have built such systems with Paxos as the starting
point; look up Google's Chubby and Paxos Made Live papers, and
ZooKeeper/ZAB. There is also a protocol called Viewstamped
Replication; it's a good design, and similar to Raft, but the paper
about it is hard to understand.

These real-world protocols are complex, and (before Raft) there was
not a good introductory paper describing how they work. The Raft
paper, in contrast, is relatively easy to read and fairly detailed.
That's a big contribution.

Whether the Raft protocol is inherently easier to understand than
something else is not clear. The issue is clouded by a lack of good
descriptions of other real-world protocols. In addition, Raft
sacrifices performance for clarity in a number of ways; that's fine
for a tutorial but not always desirable in a real-world protocol.


Q: How does Raft's performance compare to Paxos in real-world applications?

A: The fastest Paxos-derived protocols are probably faster than
Raft as described in the paper; have a look at ZAB/ZooKeeper and Paxos
Made Live. On the other hand, etcd3 (using Raft) claims to have
achieved better performance than zookeeper and many Paxos-based
implementations (https://www.youtube.com/watch?v=hQigKX0MxPw).

There are situations where Raft's leader is not so great. If the
datacenters containing replicas and clients are distant from each
other, people sometimes use agreement protocols derived from original
Paxos. The reason is that Paxos has no leader; any replica can start
an agreement; so clients can talk to the replica in their local
datacenter rather than having to talk to a leader in a distant
datacenter. ePaxos is an example.

关于Raft如何处理日志不一致的情况。

如果这里只考虑数据一致性问题的话,其实这里面选S2,S3都无所谓。S2,S3可以把自己的4,5都擦出掉,因为4,5都没有commit. 肯定不能将已经commit的日志回滚。

how can logs disagree after a crash?
  a leader crashes before sending last AppendEntries to all
    S1: 3
    S2: 3 3
    S3: 3 3
  worse: logs might have different commands in same entry!
    after a series of leader crashes, e.g.
        10 11 12 13  <- log entry #
    S1:  3
    S2:  3  3  4
    S3:  3  3  5

Raft forces agreement by having followers adopt new leader's log
  example:
  S3 is chosen as new leader for term 6
  S3 sends an AppendEntries with entry 13
     prevLogIndex=12
     prevLogTerm=5
  S2 replies false (AppendEntries step 2)
  S3 decrements nextIndex[S2] to 12
  S3 sends AppendEntries w/ entries 12+13, prevLogIndex=11, prevLogTerm=3
  S2 deletes its entry 12 (AppendEntries step 3)
  similar story for S1, but S3 has to back up one farther

为什么不选择日志最长的作为leader

why not elect the server with the longest log as leader?
  example:
    S1: 5 6 7
    S2: 5 8
    S3: 5 8
  first, could this scenario happen? how?
    S1 leader in term 6; crash+reboot; leader in term 7; crash and stay down
      both times it crashed after only appending to its own log
    Q: after S1 crashes in term 7, why won't S2/S3 choose 6 as next term?
    next term will be 8, since at least one of S2/S3 learned of 7 while voting
    S2 leader in term 8, only S2+S3 alive, then crash
  all peers reboot
  who should be next leader?
    S1 has longest log, but entry 8 could have committed !!!
    so new leader can only be one of S2 or S3
    i.e. the rule cannot be simply "longest log"

关于实现的一些细节问题:

Q: 如何让leader发现自己已经不在是leader了?

A: 我觉得可以记录和每个follower最近的心跳时间,这个心跳时间可以在发送日志或者是发送心跳成功之后更新。如果发现超过半数的follower的时间太长的话,那么自己就要drop leader

Q: 如何找到前面一个同步点?

A: 如果leader的prevLogIndex比自己的logs还要多的话,那么先选择自己的logs最后一条记录。leader prevLogTerm肯定比自己的要大。假设自己在prevLogIndex的term是A的话,那么向前找到term是A-1的记录就是同步点,但是不用小于自己的commitIndex

DPrintf("X%d: mismatch log entry. index = %v, leader term = %v, my term = %v",
    rf.me, req.PrevLogIndex, req.PrevLogTerm, rf.logs[idx].Term)
searchTerm := rf.logs[idx].Term - 1
rb := 0
for idx >= 0 && rf.logs[idx].Term > searchTerm && (idx+rf.baseLogIndex) > rf.commitIndex {
    idx -= 1
    rb += 1
}

或者是另外一种写法

DPrintf("X%d: mismatch log entry. index = %v, leader term = %v, my term = %v",
    rf.me, req.PrevLogIndex, req.PrevLogTerm, rf.logs[idx].Term)

searchTerm := rf.logs[idx].Term
rb := 0
for idx >= 0 && rf.logs[idx].Term == searchTerm && (idx+rf.baseLogIndex) > rf.commitIndex {
    idx -= 1
    rb += 1
}

Q: 如何确保获得有效的超过半数投票?

A: 首先reply.voteGranted要保证是true. 其次如果我们使用goroutine的话,在返回结果的时候,可能自己的term已经改变了。所以这个时候需要确保,自己的term和发起请求时候的term是相同的(否则可以认为这个投票并不是给自己的),才可以认为获得有效选票。

valid := true
rf.Lock()
// 如果修改了currentTerm的话,那么认为这轮就失败了
// 因为这里投票其实是投给req.Term
// 如果这里直接更新了currentTerm的话,那么就会出现两个leader.
if reply.Term > rf.currentTerm {
    rf.changeToFollower(reply.Term, "electLeaderResponse")
}
if req.Term != rf.currentTerm {
    valid = false
}
rf.Unlock()

// get majority votes
if valid && reply.VoteGranted {
    v := atomic.AddInt32(&votes, 1)
    if int(v) == (len(rf.peers)/2 + 1) {
        rf.changeToLeader()
    }
}

Q: 如何方便地打印RPC

A: 首先实现request/response的String()方法,然后可以在处理过程中间将某些过程记录在一个字符串上,然后使用golang的defer功能,在,在函数返回的时候一起打印

trace := strings.Builder{}

defer func() {
    DPrintf("X%d: AppendEntries:%v -> %v %s", rf.me, req, reply, trace.String())
}()

if req.Term < rf.currentTerm {
    trace.WriteString("[ignore lower term]")
    return
}

Q: 如何判断某个提交失败或者是成功

A: Lab2实现的Raft里面任何提交都是异步的。只要当时提交的是leader, 那么就认为是成功的,但实际上可能永远不会成功。

等待结果返回有两个办法,一个是关注提交的index是否被commit了,另外一个方法则是在提交中带上提交id(唯一).

关注提交的index需要注意一个问题就是,这个index可能会被重复提交。想象一下,从某个leader上在index=5上提交,之后 这个节点的logs被覆盖阶段,那么理论上还有再次在index=5上提交的可能。但是两次提交的term肯定是不同的,可以拿term来做区分。

Q: 实现LogCompaction有什么要注意的吗?

A: 我在Lab3中实现的log compaction策略非常简单,我觉得有两个关注点: 1. 什么时候触发compaction 2. 应该清除多少logs.

我的策略是在有提交的时候检查logs size来决定是否触发compaction。触发由应用层决定而不是raft层决定。

理论上只需要保留到最近一次提交的index就行。但是如果每次只是保留到最近提交的index, 如果在传输snapshot期间如果leader 再次出现compaction的话,那么follower还是跟不上leader. 所以为了保险还是需要存一定的余量。

func (kv *KVServer) logCompactionWorker() {
    if kv.maxraftstate == -1 {
        return
    }

    const COMPACTION_RATIO = 4
    const CHECK_INTERVAL = 20
    for {
        if kv.killed() {
            break
        }
        kv.Lock()
        size := kv.persister.RaftStateSize()
        if float64(size) < float64(kv.maxraftstate)*COMPACTION_RATIO {
            kv.moreLogsCond.Wait()
            kv.Unlock()
            continue
        }
        kv.Unlock()
        DPrintf("kv%d: make log compaction, current size = %d, threshold = %d", kv.me, size, kv.maxraftstate)
        kv.doLogCompaction()
        SleepMills(CHECK_INTERVAL)
    }
}

func (kv *KVServer) doLogCompaction() {
    // log compaction 和 install snapshot 过程要对应上
    // 这个过程先对kv加锁,在对rf加锁
    kv.Lock()
    defer kv.Unlock()
    snapshot := kv.encodeSnapshot()
    applyIndex := kv.lastApplyIndex
    // 向前保留几个log可能可以减少同步次数
    kv.rf.LogCompaction(snapshot, applyIndex-10)
}

Q: 实现Snapshot上有什么需要注意的吗?

A: Lab3简化了传输问题,snapshot数据一次RPC就可以完全传输完成。Snapshot在安装的时候,因为涉及到state, 所以也只能由单线程来执行, 而且再安装也说明它无法成为leader,无法为client服务,所以性能就不是太大的问题。

在实现Lab3的时候最tricky的地方就是,一旦安装了snapshot,需要马上创建一次snapshot。这个trick的原因,是因为snapshot并不是共享的。 不是说我安装了snapshot A之后,我自己创建的snapshot就变成了snapshot A,我自己的创建的snapshot还是之前的snapshot. 实际生产系统中 这不会是个问题,因为snapshot通常都是放在存储系统上的。

if op == "install" {
    DPrintf("kv%d: install snapshot. rpcId=%d", kv.me, msg.RpcId)
    data := msg.Command.([]byte)
    wait := msg.WaitChan
    kv.doInstallSnapshot(data)
    wait <- "ok"
    // 这里安装完成了snapshot之后
    // 最好在做一个snapshot. 不然如果这个时候重启的话
    // applyIndex会回滚到之前的状态,而这个状态没有办法接着继续
    kv.doLogCompaction()
}