lab1的任务是实现一个分布式的MapReduce程序,其中,Map函数和Reduce函数都已经写好,输入输出的文件名已经给出,并且课程也给出了在单个程序里模拟MapReduce任务的示例代码。
分布式程序的框架由有两组程序组成:
- 第一组的代码是
coordinator.go
,只有一个进程,这个进程负责把任务分发给工作节点; - 第二组的代码是
worker.go
,有多个进程,从调度节点接收任务,完成后输出到文件; - 第三组的代码是
rpc.go
,主要用于实现rpc调用的参数和返回值的数据结构。
设计这个小的分布式程序的关键可能在于两点:
- 捋清楚每一步的输入和输出是什么,在模拟程序中,输入和输出是当前程序里的数据结构,而在分布式环境中,输入输出可能是文件名;
- 捋清楚每个节点自己的工作模式,对于调度节点,它需要知道当前整体任务的进度,把没做完的任务放到任务队列里,整理做完的任务返回的结果,对于工作节点,它需要不断地请求任务,上传结果;
- 捋清楚每个节点之间的通信协议,调度节点和工作节点之间主要有“申请任务”和“上传结果”两个交互过程,可以设计两个RPC调用。
三阶段分析
Map阶段
首先,可以看一下已经给我们写好的Map函数的输入和输出分别是什么。1
mapf func(string, string) []KeyValue
- 第一个输入是文件名;
- 第二个输入是文件内容;
- 返回值是键值对组成的数组。
对于单个mapf
函数可以完成的任务,我把它叫做MapJob
,而多个MapJob
组成MapTask
,MapTask
一次性下发给工作节点,MapTask
的数量取决于Map任务的数量。
1 | type MapJob struct { |
接下来一个问题就是如何调度这些任务?
- 从工作节点的角度看,任务的调度分为两个动作:请求任务和提交结果;
- 工作节点以轮询的方式请求任务,完成任务后提交任务结果;
- 从调度节点的角度看,每个任务都需要追踪进度:
- 一开始,把所有任务放进待调度channel中;
- 每次有工作节点请求任务,从channel中取出任务,发送给工作节点;
- 后台起一个watchdog goroutine,一旦发现有任务超时,就把任务重新放到待调度channel中;
- 可能对同一个任务,有多个结果发送过来,这个时候可以采用最新的结果覆盖。
当调度节点发现所有任务都已经完成后,进入下一个阶段。
Partition阶段
进入Partition阶段后,所有的MapTask
都已经完成,调度节点需要整理所有的任务结果,按Reduce阶段的任务数进行划分。
- 首先,对所有的
Result
进行汇总,汇总的逻辑是先把所有Key
相同的字段进行拼接,每个拼接好的结果都是一个ReduceJob
; - 然后,对
Key
哈希取模后把ReduceJob
放进对应ID的ReduceTask
中。
Reduce阶段
进入Reduce阶段后,和之前的Map阶段的逻辑一样,对任务进行调度、追踪、重新下发。
预先为我们写好的Reduce函数长成这样:1
reducef func(string, []string) string
- 第一个输入是
Key
; - 第二个输入是
Key
在所有MapJob
中的结果拼接组成的结果; - 返回值是汇总后的记录。
仿照Map阶段的设计,可以对ReduceJob
和ReduceTask
设计如下:1
2
3
4
5
6
7
8
9
10
11
12type ReduceJob struct {
Key string
ValueList []string
Result string
}
type ReduceTask struct {
ID int
Jobs []ReduceJob
StartTime time.Time
Status string // scheduling | in-progress | done
}
机制设计
RPC交互设计
调度节点与工作节点之间以RPC的形式进行交互,按照之前捋的逻辑,调度节点和工作节点之间主要有两种交互逻辑:请求任务和上传结果。Lab1要求要考虑工作节点崩溃的情况,需要设计心跳包机制来监控节点是否奔溃,但我这里偷个懒,因为工作节点在轮询的时候已经相当于起到了告诉调度节点自身状态的作用。
请求任务ApplyTask
的参数设计如下:1
2
3
4
5
6
7
8type ApplyTaskArgs struct {
}
type ApplyTaskReply struct {
TaskType string
MTask MapTask
RTask ReduceTask
}
上传结果SubmitTask
的参数设计如下:1
2
3
4
5
6
7
8type SubmitTaskArgs struct {
TaskType string
MTask MapTask
RTask ReduceTask
}
type SubmitTaskReply struct {
}
调度节点数据结构设计
思考调度节点需要完成哪些功能?
- 管理所有的Map任务 -> 需要
[]MapTask
; - 管理所有的Reduce任务 -> 需要
[]ReduceTask
; - 管理所有的待调度的Map任务 -> 需要
chan MapTask
; - 管理所有的待调度的Reduce任务 -> 需要
chan ReduceTask
; - 记录整体运行的状态 -> 需要
status string
; - 保证并发安全,避免data race -> 需要
sync.Mutex
; - 记录Map任务和Reduce任务的数量 -> 需要
nMap int
和nReduce int
。1
2
3
4
5
6
7
8
9
10
11type Coordinator struct {
// Your definitions here.
MapNum int
ReduceNum int
MapTaskList []MapTask
ReduceTaskList []ReduceTask
SchedulingMapTask chan MapTask
SchedulingReduceTask chan ReduceTask
Status string // map | partition | reduce | done
Mu sync.Mutex
}
最终,程序通过了lab1的所有测试。