# MIT6.824-2020 Lab1 MapReduce

# 前言

本文是本人学习MIT 6.824 Lab 1的笔记,包含自己一些实现和理解。

Lab 1的说明在 Lab1 Notes (opens new window),需要阅读论文MapReduce (opens new window)。请以这篇说明和论文为主,以本文为辅。在阅读本文之前,务必先通读这篇说明和论文。若实在无法阅读英文,再姑且直接阅读本文。

6.824所有Lab都使用Golang为主要编程语言,若你不熟悉,可以快速看看Golang官方入门 (opens new window)。我只花了半小时左右学习Go语言,就写出了这个Lab,证明要让代码work,不是特别困难。当然,要让代码优雅高效,充分学习Go语言还是非常有必要的。

本文代码中各种定义和调用非常复杂,很难三言两语说清楚。如果都要一行一行解释,这篇文章篇幅就太太太长了。在本文中呈现代码时,代码中肯定有未解释过的部分,甚至可能占到代码的主要篇幅。请你不要担心,也不要太早地刨根问底,文章中呈现的代码块都只是起到解释说明作用,更多算伪代码,而不是真正运行的代码。

要获得整理好、可直接运行的代码,请看我的GitHub仓库 (opens new window)(最好不要)。

# 思路

# 任务总览

Lab 1主要工作是实现一个分布式MapReduce,将一个master节点和worker节点相互协作,并行执行任务的过程,此处任务为文本索引器。对于基础不够好的读者,也不必担心,我们可以参考单线程内容来了解其工作原理。

输入数据以文件形式进入系统。一些进程运行map任务,拆分了原任务,产生了一些中间体,这些中间体可能以键值对形式存在。一些进程运行了reduce任务,利用中间体产生最终输出master进程用于分配任务,调整各个worker进程。

输入数据能够产生中间体,这说明原任务是可拆的,也就才有了写成分布式的可能性。若原问题不是可拆的,MapReduce也就无从谈起。

中间体应均匀地分配给各个reduce任务,每个reduce任务整合这些中间体,令中间体个数减少,直至无法再减少,从中整合出最终结果。

# 实现流程

网络通信

为了实现master进程和worker进程间数据交互,必须通过进程间通信机制来实现,而我们使用的进程间通信方式是RPC。因此需要定义双方的通信格式,来兼容不同的任务请求(map或者reduce)。lab 帮我们决定了使用 rpc 通信,同时 masterworker 通过 unix domain socket 作为通信方式,用 http 服务器提供 rpc 服务。

//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
	rpc.Register(m)
	rpc.HandleHTTP()
	sockname := masterSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

任务切分

我们程序的输入是n个文件,那么根据 paper,这里就有 nmap 任务,每个 map 任务读取一个文件中的内容并处理后输出。这里输出需要注意,需要将nReduce大小,才能让后续的reduce任务正常执行。

而在分布式设计中,我们使用hash 桶 的设计,将相应的 key 保证分在同一组文件中,保证了 reduce 编码的简单。具体为我们首先对 key 做 hash 编码。将 string 编码为 数字, 并对数字结果 取模 N 运算, N 为传入的 nReduce 值。运算的结果代表了相应 kv 对应该放于第几个桶中,在这里就是归入哪一个输出文件。

消息设计

我们完成了上述设计之后,开始实际逻辑的构建,我进行了以下设计,masterworker 通过 rpc 消息交互来分配任务,任务完成之后 worker 通知 master,并进行下一次任务分配。直到没有任务的时候,worker 退出。(如果没有任务的同时,所有任务都已完成 Master 也会退出)

在整个交互过程中,需要注意的是:master扮演的任务的发布者,而worker扮演的是任务的执行者,一些内容由master来协调,因此,我们可以采用worker退出设计,由worker来提交并请求任务,而master在来根据当前状态调整任务内容,该设计也让 master 状态管理代码逻辑更简洁。

type WorkArgs struct {
   Workerid string
}
type WorkReply struct {
   Isfinished bool
   Taskid int
   Filename string
   MapReduce string
   BucketNum int
}
type CommitArgs struct {
   Workerid string
   Taskid int
   MapReduce string
}
type CommitReply struct {
   IsOK bool
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

状态管理

众所周知,分布式一大问题就是时序。虽然状态是万恶之源,但现实世界就是现实世界,熵增不可改变,复杂度只能转移,我们尽量简化即可。

关于这个系统,我们需要管理

  1. map 任务状态: 有多少个 map 任务,完成了多少
  2. reduce 任务状态:有多少个 reduce 任务,完成了多少。同时 reduce 还有在 map 任务都完成后进行
  3. worker 状态: 谁在干话,干了多久?

任务则设计三个状态:

  1. TaskIdle: 任务闲置,没人管ing o(╥﹏╥)o
  2. TaskWorking: 任务工作中(你咋知道我在摸鱼?因为你超时了!)
  3. TaskCommit: 任务完成!
const (
   TaskIdle = iota
   TaskWorking
   TaskCommit
)
type Coordinator struct {
   // Your definitions here.
   files   []string
   nReduce int
   mapTasks    []int
   reduceTasks []int
   mapCount int
   workerCommit map[string]int
   allCommited  bool
   timeout time.Duration
   mu sync.RWMutex
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

超时机制

由于master 无法可靠地区分崩溃的work或存活但由于某种原因停止的work,以及正在执行但太慢而无法使用的work,因此需要设计一种重置机制,来对超过最大定时的任务进行重置或重新分配任务。

ctx, _ := context.WithTimeout(context.Background(), m.timeout)
go func() {
    select {
        case <-ctx.Done():
        {
            m.mu.Lock()
            if m.workerCommit[args.Workerid] != TaskCommit && m.reduceTasks[k] != TaskCommit {
                m.reduceTasks[k] = TaskIdle
                log.Println("[Error]:", "worker:", args.Workerid, "reduce task:", k, "timeout")
            }
            m.mu.Unlock()
        }
    }
}()
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 常见问题

  • 可能要对一些数据结构加锁Mutex
  • 成功的实现应至少通过脚本test-mr.sh测试