公开课课程网页:MIT6.824 Spring 2025

Lab1 MapReduce

mapreduce 是 google 提出的一种分布式模型,用于大规模的数据并行处理,核心思路就是 map 和 reduce 两个阶段,分别对应的是分片处理和汇总聚合。(简单来说就是拆分大问题,合并小问题)。

Papar

这是 MapReduce 的论文 rfeet.qrk,其中主要的思想就是下面这张图

img

实现方案

The design of a practical system for fault-tolerant virtual machines

主要的实现部分是在 src/mr中则介绍相关部分

coordinator.go

实现流程主要是

  1. MakeCoordinator() 对 master 结点进行相关初始化,另外这里我在这了解到 golang 的生命周期以及衍生的逃逸分析问题

  2. Register() 这部分是接收来自 worker 节点的信息,每当有新的 worker 节点来的时候进行注册,为其分配相关的 ID,方便后面进行任务的申请

  3. AssignTask() 此函数是接收 worker 的任务请求,需要注意的是此处包含几个问题:

    1. 首先需要保证节点是在 master 中的,防止后面 timeout 删除后导致的错误
    2. 保证 worker 是 Idle 的防止异常申请导致正常的任务被打断
    3. Map 与 Reduce 所需的请求不同,部分内容需要分别进行处理,比如 Map 需要传入文件名称,但是 Reduce 不需要

    另外,这个函数写完之后也觉得有几个问题,之后有时间进行修改:
    4. file,task直接进行切片,但是正常情况下 Files 信息需要进行保留,而且直接切片的效率可能也给更低(未经过验证)

  4. CompleteTask() 每个 worker 完成任务之后进行报告,方便进行下一次的任务分配。同时也是 Map 任务转为 Reduce 任务的函数节点

  5. 其他功能函数:

    1. HeartBeat() 心跳机制,保证 worker 可用
    2. MonitorTasks() 检测每个 worker 的状态,当 worker 超时的时候进行标记,然后进行删除
    3. Done() 结束标志,lab1 在 mrcoordinator.go 中调用,通过每秒检测返回值来判断是否完成任务

另外我学到一个新知识就是 golang 的生命周期

golang的生命周期指的是指:如果函数返回了相关的引用,那么GC不会立刻回收这部分的内存,而是会保留相关引用,因此如果在这里使用 go 启动协程是不会结束的。

示例代码如下,此段代码不会返回结束,因为 return &c 导致生命周期未结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
n := len(files)
c := Coordinator{
nMap: n,
LeftTask: n,
files: files,
nReduce: nReduce,
Workers: make(map[int]*Task),
Tasks: make([]int, 0),
TaskType: Map,
Worked: 0,
done: false,
}

for i := range c.nMap {
c.Tasks = append(c.Tasks, i+1)
}

// Your code here.
go c.MonitorTasks()

c.server()

return &c
}

worker.go

worker 的实现流程与 coordinator.go 的实现流程相互对应,主要是进行 RPC 调用。

相关的实现:

  1. Worker() 这是主要的调用函数,除了 WorkerRegister()WorkerHeartBeat() 函数,其他调用都在死循环中,完成从任务请求,任务执行,到任务结束向 master 汇报一整个流程然后进入到新的循环之中。WorkerHeartBeat() 需要使用协程来运行,同时需要注意,如果 worker 挂掉了,协程需要进行释放,否则会一直进行请求
  2. WorkerRegister() 无特别需要注意部分
  3. WorkerAssign() 无特别需要注意部分
  4. WorkerHeartBeat() 需要注意这里需要关注 worker 是否因为其他原因导致被 master 处理掉了,在处理掉的同时进行协程的关闭
  5. WorkerCompleted() 无特别需要注意部分
  6. Map 和 Reduce 函数结合论文以及提示中的内容来即可

Lab 2 Key/Value Server

这是后续的 shard kv 的一个初版,只是实现了简单的 kv 存储系统和一个简单的分布式锁。比较容易实现,主要是一些概念之类的东西。

Lab 3 Raft

Raft 是一种用于管理复制日志的共识算法。是基于 (multi-)Paxos ,目的是简化 Paxos,增加可理解性(将共识算法的关键要素分开, leader election, log replication, and safety)以及能适用于真实的系统设计

专业术语

这是我读论文的时候不认识的一些专有名词,顺手记下来几个

non-Byzantine & Byzantine

问题起源

拜占庭帝国想要进攻一个强大的敌人,为此派出了10支军队去包围这个敌人。这个敌人虽不比拜占庭帝国,但也足以抵御5支常规拜占庭军队的同时袭击。

基于一些原因,这10支军队不能集合在一起单点突破,必须在分开的包围状态下同时攻击。他们任一支军队单独进攻都毫无胜算,除非有至少6支军队同时袭击才能攻下敌国。他们分散在敌国的四周,依靠通信兵相互通信来协商进攻意向及进攻时间。

困扰这些将军的问题是,他们不确定他们中是否有叛徒,叛徒可能擅自变更进攻意向或者进攻时间。

在这种状态下,拜占庭将军们能否找到一种分布式的协议来让他们能够远程协商达成一致,从而赢取战斗?

img

问题实质

寻找到一个方法,使得系统在有错误节点,或者恶意节点的时候保证系统整体的一致性和正确性。(即非信任状态下的共识方法)

主要是需要保证以下两个特性

  • 一致性:所有非拜占庭的节点(即正确节点)需要有相同的输入(指令),然后给出相同的输出(即同时发起进攻)
  • 正确性:保证所有非拜占庭的节点都能接收到正确的指令。

分类

拜占庭容错系统

拜占庭容错系统要解决的正是分布式系统中存在恶意节点(即拜占庭节点)时,系统的一致性、正确性等问题

假设分布式系统拥有n台节点,并假设整个系统拜占庭节点不超过m台(n ≥ 3m + 1),拜占庭容错系统需要满足如下两个条件:

Note
为什么会是 n ≥ 3m + 1 ?

假设:
1. 总结点数为 n
2. 拜占庭节点数为 m
3. 诚实节点(非拜占庭节点数)为 h,h = n - m

条件:
1)拜占庭节点数必须少于诚实节点:
$n - m \geq m$ ==> $n \geq 2m$
2)允许拜占庭节点存在的情况下的一致性,由于大多数都是使用的投票机制,那么就是要保证诚实节点的数量要占大部分:
$n-m \geq 2m+1$ , so $n \geq 3m + 1$

  • 所有非拜占庭节点使用相同的输入信息,产生同样的结果。 在区块链系统中,可以理解为,随机数相同、区块算法相同、原账本相同的时候,计算结果相同。
  • 如果输入的信息正确,那么所有非拜占庭节点必须接收这个消息,并计算相应的结果。 在区块链系统中,可以理解为,非拜占庭节点需要对客户的请求进行计算并生成区块。

另外,拜占庭容错系统需要达成如下两个指标:

  • 安全性: 任何已经完成的请求都不会被更改,它可以在以后请求看到。在区块链系统中,可以理解为,已经生成的账本不可篡改,并且可以被节点随时查看。
  • 活性: 可以接受并且执行非拜占庭客户端的请求,不会被任何因素影响而导致非拜占庭客户端的请求不能执行。在区块链系统中,可以理解为,系统需要持续生成区块,为用户记账,这主要靠挖矿的激励机制来保证。

LSM(Log Structured Merge Tree)

是一种使用适用于 NoSQL 的一种储存结构。

LSM树的核心特点是利用顺序写来提高写性能,但因为分层(此处分层是指的分为内存和文件两部分)的设计会稍微降低读性能,但是通过牺牲小部分读性能换来高性能写,使得LSM树成为非常流行的存储结构。

img

  1. Memtable:MemTable是在 内存 中的数据结构,用于保存最近更新的数据,会按照 Key 有序地组织这些数据,LSM树对于具体如何组织有序地组织数据并没有明确的数据结构定义,例如Hbase使跳跃表来保证内存中key的有序。

  2. Immutable Memtable (Immutable : 不可改变的): 当 MemTable 达到一定大小后,会转化成Immutable MemTable。Immutable MemTable是将转MemTable变为SSTable的一种中间状态。写操作由新的 MemTable 处理,在转存过程中不阻塞数据更新操作。

  3. SSTable(Sorted String Table): 有序键值对 集合,是LSM树组在 磁盘 中的数据结构。为了加快SSTable的读取,可以通过建立key的索引以及布隆过滤器来加快key的查找。

    布隆过滤器:布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。

    但是存在有一些问题,由于 LSM 存储都是日志化的,会记录所有的变化,因此会存在有 数据冗余 的情况,只有最新的 key 的数据才是有效的(不同于 innodb 这种 B+ tree 的引擎,B+ tree 都是进行原地修改,因此不存在 数据冗余 的情况 ) ———— 日志压缩 (log compact),论文 第七小节并且读取的时候需要从后往前进行查询 ———— 布隆过滤器

Log Compact

日志压缩主要是围绕这三个部分:

1)读放大:读取数据时实际读取的数据量大于真正的数据量。例如在LSM树中需要先在MemTable查看当前key是否存在,不存在继续从SSTable中寻找。

2)写放大:写入数据时实际写入的数据量大于真正的数据量。例如在LSM树中写入时可能触发Compact操作,导致实际写入的数据量远大于该key的数据量。

3)空间放大:数据实际占用的磁盘空间比数据的真正大小更多。上面提到的冗余存储,对于一个key来说,只有最新的那条记录是有效的,而之前的记录都是可以被清理回收的。

size-tiered compaction

主要核心是将 多个相同大小的 SSTable 文件合并为一个更大的文件,然后放入到下一层中,逐步形成一个层次结构。

每层限制SSTable为N,当每层SSTable达到N后,则触发Compact操作合并这些SSTable,并将合并后的结果写入到下一层成为一个更大的sstable。

但是这样会引发空间放大的问题,因为 key 的旧版本会在多个 SSTable 中共存一段时间,直到被合并清除。

leveled compaction

leveled策略也是采用分层的思想,每一层限制总文件的大小。

但是呢,leveled会将每一层切分成多个大小相近的SSTable。这些SSTable是这一层是 全局有序 的,意味着一个key在每一层至多只有1条记录,不存在冗余记录。

合并方式:如图,L1中至少选择一个文件,然后把它和跟 L2 有交集的部分进行融合,然后将文件放置在 L2 中。 L3 与 L4 重复类似的过程。并且这是可以进行 并行化的。
img

共识算法特点

  1. 它们在所有非拜占庭条件下确保安全(永远不会返回错误的结果),包括网络延迟、分区、数据包丢失、重复和重排序。

  2. 只要大多数服务器正常运行并能相互以及与客户端通信,它们就完全可用。因此,一个典型的五服务器集群可以容忍任何两个服务器的故障。服务器假定通过停止来失败;它们可以从稳定存储上的状态恢复,并重新加入集群。

Raft

特点

  1. 强领导者:Raft 采用了比其他共识算法更强的领导形式。例如,日志条目仅从领导者流向其他服务器。这简化了复制日志的管理,并使 Raft 更易于理解。
  2. 领导者选举:Raft 使用随机计时器来选举领导者。这只会在任何共识算法所需的心跳基础上添加少量机制,同时还能简单快速地解决冲突。
  3. 成员变更:Raft 用于更改集群中服务器集的机制采用一种新的联合共识方法,其中两种不同配置的多数在转换期间重叠。这允许集群在配置更改期间继续正常运行。

流程

img

Follower: 默认状态且为被动状态,其中只有Follower会接收来自客户端的请求,但是自己不会主动发送请求,只是对Candidate && Leader的请求做出响应。

Leader: 处理所有客户端请求(如果客户端联系FollowerFollower会将它重定向到Leader

CadidateLeader 备选

系统架构

论文中相关的架构如下,简洁的描述了 raft 一致性算法的规则。本质上是对状态机

img

基本原则

安全性论证

首先证明方法是通过反证法进行证明的。

img

——对图的理解如下:
1. 当 S1 commit一个日志条目之后,那么一定是有大部分的 server 接收到该日志条目
2. 并且,如果 S5 想要成为 leader 那么,一定需要大部分的 follower 投票给 S5

但是,正是如此,那么一定会有一个 server 在图中为 S3 会接收到 T commit 的日志条目,并且给 S5 投票,那么就不符合我们之前的选举规则。投票给了比自己更旧的 cadidate

论文假设了两个 leader, T 和 U,其中 $term_T$ < $term_U$。其中 $leader_T$ 在任期之间 $commit$ 了一个日志条目,图中显示的是 AE 。并且 $leader_U$ 是没有储存该条目的。

论证步骤如下:

  1. 在leaderU选举时,一定没有那个被提交的日志条目(Leader从来不会删除或者覆盖日志条目)。——这是基于假设给定的,并且只有当 leaderfollower 出现任期不匹配的时候才会删除不匹配的日志,leader 是不会删除日志或者覆盖日志条目的

  2. $leader_T$ 在集群的大多数Server上复制了这个条目。因此,至少有一个Server(投票者)既接收了来自leaderT的日志条目,又投票给leader_U,如图-9所示。这个投票者是产生矛盾的关键。

  3. 投票者必须在给 leader_U 投票之前接收来自leader_T的已提交日志条目;否则它会拒绝来自leader_TAppendEntries请求(它的当前任期会比T要大)。——这里说明的是如果没有在 T 的任期加入 commit 的日志的话,那么之后就会拒绝相关的 rpc 请求

  4. 投票者会在它给 leader_U 投票时存储那个条目,因为任何中间的 Leader 都保有该条目(基于假设),Leader从来不会移除这个条目,并且Follower也只会在和Leader冲突时才会移除日志条目。

  5. 投票者给leader_U 投票了,所以 leader_U 的日志必须和投票者的一样新。这就导致了一个矛盾。——我理解的是,对于每一个在 T 崩溃周的中间的 leader 都要保存 T 所 commit 的日志条目,但是 leader 只有在冲突的时候才会删除日志条目,否则则会一直保存这已有的数据。但是呢,根据 voter 规则, voter 只会投给和自己相近的,或者更新的,否则就会拒绝投票。 根据假设,Leader_U 是不包含相关条目的,因此会产生矛盾

  6. 首先,如果投票者和leader_U 最后一条日志条目的任期编号相同,那么 leader_U 的日志一定和投票者的一样长,因此它的日志包含全部投票者的日志条目。这是矛盾的,因为在假设中投票者和 leader_U 包含的已提交条目是不同的。

  7. 除此之外,leader_U 的最后一条日志的任期编号一定比投票者的大。另外,它也比T要大,因为投票者的最后一条日志条目的任期编号最小也要是 T(它包含了所有任期T提交的日志条目)。创建leader_U 最后一条日志条目的上一任Leader必须包含已经提交的日志条目(基于假设)。那么,根据日志匹配原则(Log Matching),leader_U也一定包含那条提交的日志条目,这也是矛盾的。—— 其实就是说 leader_U 肯定不是凭空产生的,他也是基于之前的 follower 而来,因此更具 Log Matching Rules ,那么 leader_U 一定是包含 leader_T 的所 commit 的所有信息

  8. 这时就完成了矛盾推导。因此,所有比任期T大的Leader一定包含所有在任期T提交的日志条目。

  9. 日志匹配原则(Log Matching)保证了未来的Leader也会包含被间接提交的日志条目,就像图-8中(d)时刻索引为2的条目。
    img

实现

lab3A leader election

先贴个结果,lab 3B 报的错是 fail reach 感觉是并发测试开多了导致的,后续也没有太大的影响也就没管了。
img

原理

初始状态

初始化状态每一个 server 都是 follower,等待超时。

选举状态

当某些服务器在初始状态超时之后就会进入到 cadidate 状态

成为 cadidate 需要注意的是

  1. 任期必须先自加 1
  2. 向其他的服务器节点发送请求投票的 RPCs 来拉票,直到以下事件发生才能停止。
    1. 自己赢得选举,(先来先到,符合日志匹配原则,成为 leader 之后立刻发送心跳信息阻止选举,但是会比较任期,任期大的才有资格留下来)
    2. 其他服务器成为 leader
    3. 一段时间之后没有任何一个获胜的人(等待 cadidate 超时,重新开始选举)

实现过程中需要注意的相关事项或者问题大部分都可以在 Students’ Guide to Raft :: Jon Gjengset中找到答案,十分推荐去阅读一遍。

任期或者状态的更新
  1. 如果在投票的过程中,给其他的 server 发送相关 RPC 请求(RequestVote or AppendEntries )的时候需要对 reply 所返回的任期进行一个判断,如果 cadidate 的任期低于 follower, 或者说有其他比该任期高的 server 需要将 cadidate 或者 leader 重置为 follower,并且需要更新时间。

    1
    2
    3
    4
    5
    6
    7
    8
    // if reply term is higher than currentTerm, convert to Follower
    if reply.Term > currentTerm {
    DPrintf("[term_error] vote RPC for peer %d: reply term %d higher than currentTerm %d, converting to Follower\n", peer, reply.Term, currentTerm)
    rf.currentTerm = reply.Term
    rf.lastPing = time.Now()
    rf.identity = Follower
    rf.isleader = false
    return
  2. 如果出现任期比当前 leader 大的那么需要立刻转变为 follower,因为此时他已经是过时的 leader,数据不是最新的(可能经历过网络分区后重新加入集群)现在不具备向其他的 server 发送心跳的权力。

  3. 如果出现了 leader(任期最新,数据最新),当前所有的 cadidate 那么需要立刻停止选举。

2,3 情况的实现方法相似,我是通过 Context 包来控制 Goroutine 的。贴一个心跳的伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background())
    defer heartbeatCancel()

for condition {
if rf.identity != Leader {
return
}
RPCRequest()
if condition {
    ...
    heartbeatCancel()
    ...
    }
}
时间的更新

时间的更新关系到选举的进行,这是十分重要的,如果时间更新没注意十分容易出现活锁(即 leader快速变化)的情况,具体的更新情况有三种(来自 [stduents guide](Students’ Guide to Raft :: Jon Gjengset))

  • 从当前领导者收到 AppendEntries RPC(即如果 AppendEntries 参数中的任期已过时,您不应重置计时器)
  • 正在启动选举
  • 授予另一个对等节点投票权
Vote 的变化

起初我在写的时候一直在想,如何重置 vote 的变化,是不是每个新的任期出现就需要更新一遍初始状态,但是其实实现方法很简单,只需要和任期相互绑定即可,实现方法就是对参数中的任期进行判断。

如果 args 中的任期大于现在 raft 节点的任期,那么就说明此时又开始了一轮新的选举,那么只需要简单的重置一下此时的 termvote,那么就可以很简单的实现了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if args.Term < rf.CurrentTerm {
reply.Term = rf.CurrentTerm
reply.VoteGranted = false
return
}

// start a new election, reset votedFor and votes
if args.Term > rf.CurrentTerm {
rf.CurrentTerm = args.Term
reply.Term = rf.CurrentTerm
rf.identity = Follower
rf.isleader = false
rf.VoteFor = -1
rf.votes = 0
rf.persist()
}
心跳时间选择

感觉很玄学,有可能会影响测试,这是我最后的心跳时间选择。

1
2
3
4
5
// election
ms := 150 + (rand.Int63() % 450)

// HeartBeat
ms := 100

lab3B log

日志的发送与快速回退 AppendEntries

之前写 3A 的时候把 heartbeat 单独写出来的,后面发现不合理,重新改了一版,将 heartbeatappend_entries 合并在一起了(正确方式就是应该合并的)

另外,在 append_entries 中,加入了一些优化机制,主要是参考的论文中的这么一段文字:

If desired, the protocol can be optimized to reduce the number of rejected AppendEntries RPCs by having the follower include the term of the conflicting entry and the first index it stores for that term when rejecting an AppendEntries request; with this information, the leader can decrement nextIndex to bypass all conflicting entries in that term, requiring only one AppendEntries RPC per term with conflicting entries instead of one RPC per entry, though in practice this optimization may be unnecessary due to the infrequency of failures and the unlikely occurrence of many inconsistent entries.

参考这个思想在论文的基础上加入了两个变量,来保证 leader 能够知道 follower 需要哪一段日志,相关结构体定义如下

1
2
3
4
5
6
type AppendEntriesReply struct {
    Term    int  // currentTerm, for leader to update itself
    Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
    ConflictTerm  int // term of conflicting entry
    ConflictIndex int // index of conflicting entry
}

还需要注意的情况如下:

  • 如果 follower 长度不足,那么直接返回 follower 的 log 长度即可,然后 ConfilctTerm 返回 -1
  • 如果 follower 长度足够,那么返回 rf.log[args.PrevLogIndex].Term, 然后删除该任期的所有 log ,但是大于 follower 的 commitIndex,我认为这样可以减少发送一些 log

以上的优化机制是怎么运行的呢?简单的来举个例子:

server/loglen 1 2 3 4 5 6 7
S1 1 1 2 2 2 2
S2 1 1 2 2 2 2
S3 1 1 1 1 1

如果 首先 S3 为 leader , 然后在 2 的时候被 partition。

然后 S1 成为 leader, 和 S2 相互通信,如果此时 S3 rejoin,那么就会删除到 partition 之前的状态,即

server/loglen 1 2 3 4 5 6 7
S1 1 1 2 2 2 2
S2 1 1 2 2 2 2
S3 1 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if reply.Success {
rf.matchIndex[peer] = max(args.PrevLogIndex+len(entries), rf.matchIndex[peer])
if rf.nextIndex[peer] != rf.matchIndex[peer]+1 {
DPrintf("[append_success] %v -> %v, nextIndex %v, matchIndex %v, commitIndex %v\n", rf.me, peer, rf.nextIndex[peer], rf.matchIndex[peer], rf.commitIndex)
}
rf.nextIndex[peer] = rf.matchIndex[peer] + 1
} else {
if reply.ConflictTerm != -1 {
index := -1
for i := reply.ConflictIndex - 1; i >= 0; i-- {
if rf.Log[i].Term == reply.ConflictTerm {
index = i + 1
break
}
}
if index == -1 {
rf.nextIndex[peer] = reply.ConflictIndex
} else {
rf.nextIndex[peer] = index
}
} else {
rf.nextIndex[peer] = reply.ConflictIndex
}
}

提交日志 Commit

为了加快 commit 速度,从当前 commitIndex 判断到 log 的长度为止,然后是额外开启一个 goroutine 进行判断,而不是串行,防止日志过长导致 commitIndex 迟迟不更新而拖延 apply 的运行

应用日志 Apply

这里有个坑就是要防止 channel 长时间持有锁,主要情况就是,默认情况下,发送和接收会一直阻塞着,直到另一方准备好。这种方式可以用来在gororutine中进行同步,而不必使用显示的锁或者条件变量。(无缓冲 channel 的特点

如官方的例子中x, y := <-c, <-c这句会一直等待计算结果发送到channel中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import "fmt"
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // send sum to c
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c, <-c // receive from c
fmt.Println(x, y, x+y)
}

apply 是一个 condition lock,好像是在 student guide 里面里面有提到,这个锁和互斥锁的最大区别就是这个锁是包含一把锁的,在你初始化的时候就知道了,然后最大的问题应该就是使用个锁的 Wait() 时候要提前持有互斥锁,并且在执行 Wait() 的时候它会自动的释放锁,即

1
2
3
4
5
rf.mu.Lock()
for !condition {
rf.applyCond.Wait()
// 如果运行在这个时候,锁是被自动释放掉了,不需要你再度释放
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
func (rf *Raft) apply() {
applyMsgs := make([]raftapi.ApplyMsg, MAXBUFFERSIZE)
// producer
go func() {
for !rf.killed() {
select {
case <-time.After(10 * time.Millisecond):
case <-rf.commitCh:
}

rf.mu.Lock()
logLen := rf.getLogLen()
for applyIndex := rf.lastApplied + 1; applyIndex < logLen && applyIndex <= rf.commitIndex && applyIndex > rf.LastIncludedIndex && applyIndex-rf.lastApplied < MAXBUFFERSIZE; applyIndex++ {
command := rf.getLogCommand(applyIndex)
msg := raftapi.ApplyMsg{
CommandValid: true,
Command: command,
CommandIndex: applyIndex,
SnapshotValid: false,
}
applyMsgs[applyIndex%MAXBUFFERSIZE] = msg
}
rf.applyCond.Broadcast()
rf.mu.Unlock()
}
}()

go func() {
defer close(rf.applyCh)
for !rf.killed() {
rf.mu.Lock()
for rf.lastApplied >= rf.commitIndex && len(rf.snapshotQueue) == 0 && !rf.killed() {
rf.applyCond.Wait()
}
// 优先处理 snapshot
if len(rf.snapshotQueue) > 0 {
snapshot := rf.snapshotQueue[len(rf.snapshotQueue)-1]
// rf.snapshotQueue = nil
rf.snapshotQueue = rf.snapshotQueue[:0]
rf.lastApplied = snapshot.SnapshotIndex
DPrintf("[apply_snapshot] %v / %v -> lastapplied %v , lastIncluded %v", rf.me, rf.commitIndex, rf.lastApplied, rf.LastIncludedIndex)
rf.mu.Unlock()

rf.applyCh <- snapshot
continue
}

// 处理 log apply
nextIndex := rf.lastApplied + 1
msg := applyMsgs[nextIndex%MAXBUFFERSIZE]
if msg.CommandValid && msg.CommandIndex == nextIndex {
rf.lastApplied = nextIndex
DPrintf("[apply_msg] %v / %v -> index %v (%v), lastIncluded %v", rf.me, rf.commitIndex, nextIndex, msg.CommandIndex, rf.LastIncludedIndex)
rf.mu.Unlock()
select {
case rf.applyCh <- msg:
case <-rf.killedCh:
DPrintf("%v close channel", rf.me)
return
}

} else {
rf.mu.Unlock()
}
}
}()
}

lab 3C Persist

本次测试存在 unreliable network,这一部分的主要内容我认为应该是处理 丢包,乱序,过时问题。

persistreadPersist 函数主要是参考案例即可,然后注意编码时需要进行大写,然后在每次修改rf.CurrentTermrf.VoteForrf.Log的时候持久化即可,难度不大。

丢包问题

这个问题这是最简单的,只需要重新发送即可,比如说(其中不确定 select 是否有效,能跑就没动了):

1
2
3
4
5
6
7
8
9
 for !rf.sendRequestVote(peer, &args, &reply) && !rf.killed() {
// 我不知道这个 select 是否有用
select {
case <-ctx.Done():
return
default:
}
time.Sleep(10 * time.Millisecond)
}

延迟问题(乱序问题)

延迟问题有两个部分,分别是延迟接收和发送。也就是说需要在 leader 和 follower 处同时进行处理来避免这个问题。

  1. 延迟发送相同任期的数据:这一部分需要对 Log 进行一个检查,检查发送的 Log 是否已经包含了这部分,如果包含有这部分,那么就不应该修改 Log,举例来说:
index 1 2 3 4 5
S1 1 2 2 3 3

现在是最初的 Log,然后发送了一个 entries 任期为 3,但是由于网络问题,导致这个网络包一直还未到达。

然后现在,又发送了三个 entries,任期依然为 3,那么就会很顺利的加入到 Log 中,日志变化如下

index 1 2 3 4 5 6 7 8
S1 1 2 2 3 3 3 3 3

但是现在问题来了,那一个任期为 3 的 entries 此时也到了,你会发现他会很轻松的到达添加日志的地方,如果你没有做相关的防范,此时就会发生一个问题,你会发现日志会在 6 之后被截断

index 1 2 3 4 5 6 7 8
S1 1 2 2 3 3 3 - -

解决方法也很简单,那就是从 args 中的 pervLogIndex 开始检查,存在则不加入,不存在则加入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 4. Append new entries to the log
// After ensuring no conflicts, we append the new entries from the leader's log to the follower's log.
newIndex, i := args.PrevLogIndex+1, 0
for ; i < len(args.Entries); i++ {
if newIndex+i >= rf.getLogLen() || rf.getLogTerm(newIndex+i) != args.Entries[i].Term {
break
}
}

if i < len(args.Entries) {
// 3. delete entries from log if an existing entry conflicts with new ones
rf.Log = append(rf.Log[:newIndex+i-rf.LastIncludedIndex-1], args.Entries[i:]...)
rf.persist()
}

// 5. update commitIndex, if leader's commitIndex is greater than follower's commitIndex, update follower's commitIndex
if args.LeaderCommit > rf.commitIndex {
prev_commitIndex := rf.commitIndex
rf.commitIndex = min(args.LeaderCommit, rf.getLogLen()-1)
if rf.commitIndex != args.LeaderCommit {
DPrintf("[append_commit] %v -> %v (t%v, id %v), entries %v, commitIndex %v / %v\n", args.LeaderId, rf.me, args.Term, rf.identity, len(args.Entries), rf.commitIndex, prev_commitIndex)
}
}
reply.Success = true
  1. 延迟发送不同任期的数据:这一部分包括两种情况,一种是被网络分区之后重新加入到原来的网络分区,这种论文中有做处理,就不做说明了。然后另外的情况就是说网络分区中重新发生选举,任期增加,但是 Leader 不变。(其实也不是什么大问题)

数据损坏问题

在 reliable network 中,只要相关的 rpc 请求返回 true,那么数据就是正常的,但是实际测试中出现了这样的一个包,也就是说,他正常的请求到了rpc,但是确出现了 term 为 0 的情况,有时会引起莫名其妙的 bug。

1
2
3
4
5
6
reply {
term :0
success: false
conflictIndex: 1
conflictTerm: -1
}

解决方法就是直接丢弃这部分数据即可

1
2
3
if args.Term != rf.CurrentTerm || reply.Term == 0 {
return
}

坑(error)

  1. failed to reach agreement
1
Fatal: one(8778102686503907240) failed to reach agreement

我猜测应该是选举超时选的太小了,选举占用大量时间,调大选举时间后问题明显缓解。

  1. 这个问题一般都是 nextIndex 和 matchIndex 没有正确更新导致的,所以一般都要去看 nextIndex, matchIndex 的更新方式是否正确,需要认真检查 append 部分
1
panic: runtime error: index out of range [88] with length 1 
  1. apply error,我出现这个问题是由于 goroutine 问题,因为 persist 并不是真正的结束 raft 实体,而是将所有的数据保存到 rf.persister 实体中,然后重新加载,但是这样原本阻塞的的 channel 还在运行,还在继续发送,也就是说这样会提前的发送 readpersist 之前的 commitIndex

log中会出现类似的 ,这就是

1
2
3
4
[commit].... commitIndex 11
[readpersist] .....
// 但是这里实际应该是从 1 开始
[apply] 11

todo

测试还有 bug ,log 如下,查不动了,先这样留着做彩蛋。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2025/04/22 12:03:52 [election] 4 becomes Leader with 3 votes, term: 23, log length 11
2025/04/22 12:03:52 [start] 4 index 11, term 23, command 867928842208374459
2025/04/22 12:03:52 [append_success] 4 -> 3, nextIndex 10, matchIndex 11, commitIndex 0
2025/04/22 12:03:52 [append_success] 4 -> 2, nextIndex 10, matchIndex 11, commitIndex 0
2025/04/22 12:03:52 [commit] 4 -> index 11, log[N].Term 23, currentTerm 23, commitIndex 11, command {867928842208374459 23 false}
2025/04/22 12:03:52 [apply] 4 / 11 -> index 1 / 12, command 4052737179978994964, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 4 / 11 -> index 2 / 12, command 633407934433877521, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 4 / 11 -> index 3 / 12, command 3503516105229632134, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 4 / 11 -> index 4 / 12, command 7546022265086759426, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 4 / 11 -> index 5 / 12, command 2892773973269466003, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 2 / 11 -> index 1 / 12, command 4052737179978994964, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 2 / 11 -> index 2 / 12, command 633407934433877521, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 2 / 11 -> index 3 / 12, command 3503516105229632134, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 2 / 11 -> index 4 / 12, command 7546022265086759426, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 2 / 11 -> index 5 / 12, command 2892773973269466003, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 2 / 11 -> index 6 / 12, command 573625300185315643, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 2 / 11 -> index 7 / 12, command 8524430882984323996, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 2 / 11 -> index 8 / 12, command 1555779226060489955, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 2 / 11 -> index 9 / 12, command 3075377638808233825, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 2 / 11 -> index 10 / 12, command 6415635232418777474, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [apply] 2 / 11 -> index 11 / 12, command 867928842208374459, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}]
2025/04/22 12:03:52 [readpersist] 4 -> term 23, votefor 4, log [{<nil> 0 true} {4052737179978994964 1 false} {633407934433877521 3 false} {3503516105229632134 5 false} {7546022265086759426 6 false} {2892773973269466003 9 false} {573625300185315643 10 false} {8524430882984323996 15 false} {1555779226060489955 16 false} {3075377638808233825 17 false} {6415635232418777474 18 false} {867928842208374459 23 false}], commitIndex 0, applyIndex 0
info: wrote visualization to /tmp/porcupine-3674006959.html
2025/04/22 12:03:52 apply error: server 4 apply out of order 5

至于为什么出现这个问题,我猜测大概是因为 rf.applyCh 的问题,为什么?
看下面日志:

1
2
3
4
5
6
7
8
9
10
2025/05/15 23:19:41 [apply_msg] 2 / 65 -> index 41 (41), lastIncluded -1
2025/05/15 23:19:41 [apply_msg] 2 / 65 -> index 42 (42), lastIncluded -
2025/05/15 23:19:41 server 2 apply order 41, command 3329250121487263777
2025/05/15 23:19:41 server 2 apply order 42, command 1348157186345728959
2025/05/15 23:19:41 [apply_msg] 2 / 65 -> index 43 (43), lastIncluded -1
2025/05/15 23:19:41 [apply_msg] 2 / 65 -> index 44 (44), lastIncluded -1
2025/05/15 23:19:41 [readpersist] 2 -> term 72, votefor 2, lastIncludedIndex -1 / commitIndex 0 / lastApplied 0, lastIncludedTerm -1, log 66
2025/05/15 23:19:41 server 2 apply order 43, command 4634105287464626619
info: wrote visualization to /tmp/porcupine-2290684499.html
2025/05/15 23:19:41 apply error: server 2 apply out of order 43

你发现在 readpersist 之后,server 端为什么又接收到43了,按照正常顺序来说,在 readpersist 之后,应该是从 1 开始了。我猜测应该是阻塞导致的,至于解决办法,我是去修改了测试代码,不知道是不是 bug,修改之后就美美的过 1w 次了。(给课题组发了邮件也没回我,估计被当垃圾邮件处理了)

lab4D snapshot

Snapshot 最大的问题大概就是去重构了,由于之前没有意识,没对 log 预留接口,导致再 snapshot 之后日志截断就歇菜了,不过处理方法非常简单,只需要将所有 log 操作进行封装即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (rf *Raft) getLogLen() int {
    return len(rf.Log) + rf.LastIncludedIndex + 1
}

func (rf *Raft) getLogTerm(index int) int {
    if index <= rf.LastIncludedIndex {
        return rf.LastIncludedTerm
    }
    return rf.Log[rf.getLogIndex(index)].Term
}

func (rf *Raft) getLogCommand(index int) interface{} {
    return rf.Log[rf.getLogIndex(index)].Command
}

func (rf *Raft) getLogIndex(index int) int {
    return index - rf.LastIncludedIndex - 1
}

然后 snapshot 的操作就是增加两个参数,一个是 lastIncludedIndex, lastIncludedTerm. 需要对这两个进行持久化。然后读取

然后在发送心跳的时候将 lastIncludedIndex 与 nextIndex 进行判断,小于那么就直接发送 InstallSnapshot, 反之则不处理

InstallSnapshot 直接按照论文写即可,然后再结合 hint,丢弃 offset, done。实现起来非常简单。

需要注意的是,你要考虑这个情况,当你 installsnapshot 之后,需要向上层发送消息,那么你在发送 channel 的时候需要丢弃一些数据,回退一些数据。

比如说,现在已经 installsnapshot 19, 那么你向上传递的数据就必须要大于 19。

总结

简单来说就是实现了 Raft 的四个过程,我写了接近一个月,写的很崩溃,查问题几乎耗了我半个多月的时间,但是不得不说这对我的代码能力提升很大,以及解决问题的能力也有不少提高,感觉有挑战才会有收获。

lab4 Fault-tolerant Key Value Service

整体结构如下,另外可能需要修改一下 raft 中的 apply 部分,保证能及时的关闭 applyCh,然后让 rsm 部分及时的响应。
img

4A replicated state machine (RSM)

优化 raft 之前。(Raft 做了两版,这是第二版)
img

这是优化之后的,出现了一点问题,不确定是不是当时测试 CPU 跑满的问题导致一些函数来不及调度导致的。我寻思问题也不算大,我就不管了。这是两次测试之后的。
img
![[./MIT6_824/Pasted%20image%2020250521115112.png]]

代码量在优化之后其实很少,但是之前的 raft 有点问题,在 kill 之后没有关闭 rf.applych 在 apply 结束关闭就行。需要注意关闭 channel 的原则:

  1. 只有发送方(Sender)才能关闭 channel
  2. 关闭的 channel 只能关闭一次
  3. 关闭 channel 是通知机制,不是必需操作
  4. 当有多个接收者时,更适合使用关闭通道来广播退出
  5. 通道一般在创建者处关闭

起初,我的想法是维护一个 channel map,定义如下

1
2
3
4
5
6
7
type message struct {
    access   bool
    response any
    err      rpc.Err
}

pendingOp:    map[int]chan message

这样在低并发下可以正常通过测试,但是在使用脚本测试的时候会出现问题(已经解决,我认为就是因为高压测试导致的问题

channel 的花销肯定是大于正常结构体的,并且还需要关闭 channel, 并且这些 channel 是一次性的,这样 GC 的压力会很大,最初我是使用的 150 个进程同时测试,里面的测试大多数都是至少调用 1000 次,也就是说至少会创建一千次的 channel,并且初始化。由于不熟悉GC,害怕它过慢引发一系列问题 (可以尝试协程池,但我没有实现,后面优化我也把 channel 删除了)

新的定义就是

1
2
3
4
5
6
7
type message struct {
    access   bool
    response any
    err      rpc.Err
}

pendingOp map[int]message

其实,我写在这里还有个疑惑,就是这样是否能真正的保证线性一致性,因为我是轮询进行返回的,那么如果出现一个延迟就会导致乱序的情况,而且本身map就是无序的,那么这样乱序返回是否会有影响?

这个问题大概让我想了一天,其实本质上就是线性一致性没有理解透。

其实答案是否定的,由上述定义用通俗的话来说就是,你只要保证每个指令逐个向前执行就可以了,不用管谁先来,谁后来。因此 map 是可以承担重任的,并且后面也可以很容易确定 leader 操作(只要看 key 是否存在 map 中即可)。

线性化是对单个对象上单次操作的保证。它为单个操作集(通常是读取和写入)在单个对象(例如分布式寄存器或数据项)上的行为提供实时(即,墙钟时间)保证。

如果使用队列那就非常麻烦:

  1. 如果 rsm 重启,那么持久化的内容就是重新 apply,那么你就需要判断是否需要接收
  2. 要保证只能是 leader 接收,因为如果所有的 server 都接收,一是浪费资源,二是队列的 pop 只能是在 submit 中的,如果 old leader carsh,那么新 leader 接管就会被之前未使用的 op 卡住。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Submit a command to Raft, and wait for it to be committed.  It
// should return ErrWrongLeader if client should find new leader and
// try again.
func (rsm *RSM) Submit(req any) (rpc.Err, any) {
    // Submit creates an Op structure to run a command through Raft;
    // for example: op := Op{Me: rsm.me, Id: id, Req: req}, where req
    // is the argument to Submit and id is a unique id for the op.
    // your code here
    pprevTerm, isleader := rsm.rf.GetState()
    if !isleader {
        return rpc.ErrWrongLeader, nil
    }
    op := Op{
        Me:  rsm.me,
        Req: req,
    }
    // lock lock lock !!!
    // 这里必须要锁住,否则会出现已经 apply,但是 index 部分还没有加入到 map 中。
    rsm.mu.Lock()
    index, PrevTerm, isleader := rsm.rf.Start(op)
    if !isleader || pprevTerm != PrevTerm {
        rsm.mu.Unlock()
        return rpc.ErrWrongLeader, nil
    }
    rsm.pendingOp[index] = message{}
    rsm.mu.Unlock()
    defer func() {
        rsm.mu.Lock()
        delete(rsm.pendingOp, index)
        rsm.mu.Unlock()
    }()
    // 等待 reader 将处理好的数据发送到 map 中
    for {
        time.Sleep(5 * time.Millisecond)
        rsm.mu.Lock()
        msg, ok := rsm.pendingOp[index]
        rsm.mu.Unlock()
        if Term, isleader := rsm.rf.GetState(); !isleader || PrevTerm != Term {
            return rpc.ErrWrongLeader, nil
        }
        if atomic.LoadInt32(&rsm.dead) == 1 || !ok {
            rsm.mu.Lock()
            log.Printf("%v index %v exit map %v", rsm.me, index, rsm.pendingOp)
            rsm.pendingOp = make(map[int]message)
            rsm.mu.Unlock()
            return rpc.ErrShutDown, nil
        }
        rsm.mu.Lock()
        if msg.access && ok {
            log.Printf("[submit] %v -> index %v", rsm.me, index)
            rsm.mu.Unlock()
            return msg.err, msg.response
        }
        rsm.mu.Unlock()
    }
}

reader 实现就是照着 raft1/server.go 改的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (rsm *RSM) reader() {
    for m := range rsm.applyCh {
        if m.CommandValid {
            rsm.mu.Lock()
            cmd := m.Command.(Op)
            response := rsm.sm.DoOp(cmd.Req)
            if _, ok := rsm.pendingOp[m.CommandIndex]; ok {
                rsm.pendingOp[m.CommandIndex] = message{
                    access:   true,
                    err:      rpc.OK,
                    response: response,
                }
            }
            log.Printf("[reader] %v -> index %v, response %v, persist len %v / %v", rsm.me, m.CommandIndex, response, rsm.rf.PersistBytes(), rsm.maxraftstate)
            if rsm.maxraftstate != -1 && rsm.rf.PersistBytes() >= rsm.maxraftstate {
                snapshot := rsm.sm.Snapshot()
                rsm.rf.Snapshot(m.CommandIndex, snapshot)
            }
            rsm.mu.Unlock()
        } else if m.SnapshotValid {
            rsm.sm.Restore(m.Snapshot)
        } else {
            // ignore other conditions
        }
    }
    atomic.StoreInt32(&rsm.dead, 1)
    log.Printf("%v done", rsm.me)
}

Lab4B Key/value service without snapshots

img
img
img
到这里也算是完全把这个流程搞完了。
img

lab4C Key/value service with snapshots

img

img

client

这部分和 lab2 中的差不了多少,主要是增加对 leader 查找就可以用了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Get fetches the current value and version for a key.  It returns
// ErrNoKey if the key does not exist. It keeps trying forever in the
// face of all other errors.
//
// You can send an RPC to server i with code like this:
// ok := ck.clnt.Call(ck.servers[i], "KVServer.Get", &args, &reply)
//
// The types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. Additionally, reply must be passed as a pointer.
func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) {
    // You will have to modify this function.
    reply := rpc.GetReply{}
    start := time.Now()
    defer func() {
        log.Printf("[client] %v receive key %v, value %v, version %v, seq %v, time %v", ck.clerkId, key, reply.Value, reply.Version, ck.seq, time.Since(start))
    }()
    atomic.AddInt64(&ck.seq, 1)
    args := rpc.GetArgs{Key: key, ClientId: ck.clerkId, SeqNum: ck.seq}
    log.Printf("[client] %v get key %v", ck.clerkId, key)
    srv := ck.leaderId
    for {
        reply = rpc.GetReply{}
        retry := 0
        for !ck.clnt.Call(ck.servers[srv], "KVServer.Get", &args, &reply) {
        // 这里面应该是没啥用的,留着就留着把
            if retry > MAXRETRYNUM {
                break
            }
            retry++
        }
        log.Printf("id %v get reply err %v, srv %v", ck.clerkId, reply.Err, srv)
        if reply.Err == rpc.OK || reply.Err == rpc.ErrNoKey {
            ck.leaderId = srv
            return reply.Value, reply.Version, reply.Err
        }
        time.Sleep(100 * time.Millisecond)
        srv = (srv + 1) % len(ck.servers)
    }
}

Put 同理。

sever

这里面的相较于 lab2 的主要工作是加入了两个变量保证幂等性

1
2
    lastApplyied map[int]int64
    lastReplies  map[int]any
  1. lastapplied 是为了保证每个 client 都是使用的最新的,防止由于网络延迟或者其他原因导致旧的消息被再次执行。
  2. lastReplies 保存了最新的 Put 结果,由于 Get 天生不破坏幂等性,可以不做处理。

关于 snapshot 只需要保存以下三个变量即可。

1
2
3
    db           map[string]string
    lastApplyied map[int]int64
    lastReplies  map[int]any

另外,进行 snapshot 的时候需要实现进行注册,即在 StartKVServer 的时候加入类似语句:

1
2
    labgob.Register(rpc.GetReply{})
    labgob.Register(rpc.PutReply{})

img

Lab 5A: Moving shards

img
这里标注的是 hard,但是我觉得最难的地方是在于怎么 shard 是什么,怎么去维护 shard 机制。以及这些文件,函数有什么作用,应该怎么调用。

这里说明:

  1. grps:指的是 shardgrp 中的 server
  2. grpc:指的是 shardgrp 中的 client
  3. clerk:指的是实际用户调用的部分

另外,Lab 5A 其实并没有完全的写完,由于有其它的事情一直耽搁了,后面有空把他补全。

初始化

首先,不考虑 shard 的移动,而是想怎么让 grps 获得 shard 信息。怎么让他知道,它是负责哪一块的 shard。

这里我采取的是被动接收, grpc 只管接收来自 clerk 和 controller 的调用。当出现 put 信息的时候进行判断,如果是 version=0 那么就说明这是需要负责的 shard,将相关信息注册到 shardstatemap 中,否则直接返回 wrongGroup。

1
2
3
4
5
6
7
8
9
state, ok := kv.ShardStateMap[shardID]
if !ok {
if args.Version == 0 {
kv.ShardStateMap[shardID] = shardrpc.WORKING
} else {
response.Err = rpc.ErrWrongGroup
return response
}
}

FreezeShard

将该 shard 所有相关的信息全部打包,并且将该 shard 标记为 freeze 状态,然后返回。

InstallShard

将向对应的 shard 发送给指定的 grpc,标记为 install 状态,然后将相关信息解码,将相关信息添加到相关数据结构即可。

DeleteShard

将所有与该 shard 的信息全部删除,并且将 shardstate 状态一并删除。

Updateconfig

这是我自己添加的函数,目的是为了统一更新 grpc 中的 config