# MIT6.824-2020 Lab1 MapReduce
# 前言
本文是本人学习MIT 6.824 Lab 1
的笔记,包含自己一些实现和理解。
Lab 1
的说明在 Lab1 Notes (opens new window),需要阅读论文MapReduce (opens new window)。请以这篇说明和论文为主,以本文为辅。在阅读本文之前,务必先通读这篇说明和论文。若实在无法阅读英文,再姑且直接阅读本文。
6.824
所有Lab
都使用Golang
为主要编程语言,若你不熟悉,可以快速看看Golang官方入门 (opens new window)。我只花了半小时左右学习Go
语言,就写出了这个Lab
,证明要让代码work
,不是特别困难。当然,要让代码优雅高效,充分学习Go
语言还是非常有必要的。
本文代码中各种定义和调用非常复杂,很难三言两语说清楚。如果都要一行一行解释,这篇文章篇幅就太太太长了。在本文中呈现代码时,代码中肯定有未解释过的部分,甚至可能占到代码的主要篇幅。请你不要担心,也不要太早地刨根问底,文章中呈现的代码块都只是起到解释说明作用,更多算伪代码,而不是真正运行的代码。
要获得整理好、可直接运行的代码,请看我的GitHub仓库 (opens new window)(最好不要)。
# 思路
# 任务总览
Lab 1
主要工作是实现一个分布式MapReduce
,将一个master
节点和worker
节点相互协作,并行执行任务的过程,此处任务为文本索引器。对于基础不够好的读者,也不必担心,我们可以参考单线程内容来了解其工作原理。
输入数据以文件形式进入系统。一些进程运行map
任务,拆分了原任务,产生了一些中间体,这些中间体可能以键值对形式存在。一些进程运行了reduce
任务,利用中间体产生最终输出。master
进程用于分配任务,调整各个worker
进程。
输入数据能够产生中间体,这说明原任务是可拆的,也就才有了写成分布式的可能性。若原问题不是可拆的,MapReduce
也就无从谈起。
中间体应均匀地分配给各个reduce
任务,每个reduce
任务整合这些中间体,令中间体个数减少,直至无法再减少,从中整合出最终结果。
# 实现流程
网络通信
为了实现master
进程和worker
进程间数据交互,必须通过进程间通信机制来实现,而我们使用的进程间通信方式是RPC
。因此需要定义双方的通信格式,来兼容不同的任务请求(map
或者reduce
)。lab
帮我们决定了使用 rpc
通信,同时 master
和 worker
通过 unix domain socket
作为通信方式,用 http
服务器提供 rpc
服务。
//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
rpc.Register(m)
rpc.HandleHTTP()
sockname := masterSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
任务切分
我们程序的输入是n
个文件,那么根据 paper
,这里就有 n
个 map
任务,每个 map
任务读取一个文件中的内容并处理后输出。这里输出需要注意,需要将nReduce
大小,才能让后续的reduce
任务正常执行。
而在分布式设计中,我们使用hash 桶 的设计,将相应的 key
保证分在同一组文件中,保证了 reduce
编码的简单。具体为我们首先对 key 做 hash 编码。将 string
编码为 数字, 并对数字结果 取模 N 运算
, N
为传入的 nReduce
值。运算的结果代表了相应 kv
对应该放于第几个桶中,在这里就是归入哪一个输出文件。
消息设计
我们完成了上述设计之后,开始实际逻辑的构建,我进行了以下设计,master
和 worker
通过 rpc
消息交互来分配任务,任务完成之后 worker
通知 master
,并进行下一次任务分配。直到没有任务的时候,worker 退出。(如果没有任务的同时,所有任务都已完成 Master
也会退出)
在整个交互过程中,需要注意的是:master
扮演的任务的发布者,而worker
扮演的是任务的执行者,一些内容由master
来协调,因此,我们可以采用worker
退出设计,由worker
来提交并请求任务,而master
在来根据当前状态调整任务内容,该设计也让 master
状态管理代码逻辑更简洁。
type WorkArgs struct {
Workerid string
}
type WorkReply struct {
Isfinished bool
Taskid int
Filename string
MapReduce string
BucketNum int
}
type CommitArgs struct {
Workerid string
Taskid int
MapReduce string
}
type CommitReply struct {
IsOK bool
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
状态管理
众所周知,分布式一大问题就是时序。虽然状态是万恶之源,但现实世界就是现实世界,熵增不可改变,复杂度只能转移,我们尽量简化即可。
关于这个系统,我们需要管理
map
任务状态: 有多少个map
任务,完成了多少reduce
任务状态:有多少个reduce
任务,完成了多少。同时reduce
还有在map
任务都完成后进行worker
状态: 谁在干话,干了多久?
任务则设计三个状态:
TaskIdle
: 任务闲置,没人管ing o(╥﹏╥)oTaskWorking
: 任务工作中(你咋知道我在摸鱼?因为你超时了!)TaskCommit
: 任务完成!
const (
TaskIdle = iota
TaskWorking
TaskCommit
)
type Coordinator struct {
// Your definitions here.
files []string
nReduce int
mapTasks []int
reduceTasks []int
mapCount int
workerCommit map[string]int
allCommited bool
timeout time.Duration
mu sync.RWMutex
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
超时机制
由于master
无法可靠地区分崩溃的work
或存活但由于某种原因停止的work
,以及正在执行但太慢而无法使用的work
,因此需要设计一种重置机制,来对超过最大定时的任务进行重置或重新分配任务。
ctx, _ := context.WithTimeout(context.Background(), m.timeout)
go func() {
select {
case <-ctx.Done():
{
m.mu.Lock()
if m.workerCommit[args.Workerid] != TaskCommit && m.reduceTasks[k] != TaskCommit {
m.reduceTasks[k] = TaskIdle
log.Println("[Error]:", "worker:", args.Workerid, "reduce task:", k, "timeout")
}
m.mu.Unlock()
}
}
}()
2
3
4
5
6
7
8
9
10
11
12
13
14
# 常见问题
- 可能要对一些数据结构加锁
Mutex
- 成功的实现应至少通过脚本
test-mr.sh
测试