@SovietPower
2022-05-18T22:58:43.000000Z
字数 12766
阅读 1433
Study
schedule:https://pdos.csail.mit.edu/6.824/schedule.html
中文视频:https://www.bilibili.com/video/BV1x7411M7Sf
视频翻译:https://www.zhihu.com/column/c_1273718607160393728
(MapReduce论文翻译:https://zhuanlan.zhihu.com/p/122571315)
一个教程:https://zhuanlan.zhihu.com/p/260470258
Lab1参考:https://zhuanlan.zhihu.com/p/425093684
MapReduce
一个Map或Reduce任务,称作task;所有task,称作job。
一个worker通常执行多个task(运行多个进程)。
Reduce Worker通过RPC处理Map Worker输出的数据,然后发送最终结果。总是在本地运行,所以通信代价不高。
Map
处理每个键值对,根据key划分为r部分,分别存到r个mr-X-Y文件中。
Reduce
Worker进行Reduce前,要进行shuffle(按key划分),然后对于每一个key和相应的value集合,执行Reduce。
shuffle
将每个Map Worker生成的数据(包含多个不同的key)按照key重新分组(以便能将数据转移到Reduce Worker)。可以在Map中将键值对按key排序,提高此处的效率。
需要在Linux下进行。windows上可以安装WSL使用Ubuntu环境(默认为WSL2)。
https://docs.microsoft.com/zh-cn/windows/wsl/install
(随便可以安装Windows Terminal:https://github.com/microsoft/terminal/releases)
WSL配置,并使用VSCode:
https://docs.microsoft.com/zh-cn/windows/wsl/setup/environment#set-up-your-linux-username-and-password
然后在wsl上安装go:sudo apt install golang-go
(项目不能用windows上的go编译,因为-buildmode=plugin not supported on windows/amd64
)。
获取代码:
git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824lab
-race
启用并发访问冲突检测。race detector通过在程序运行时探测所有的内存访问,可监控对共享变量的非同步访问,在冲突时警告。
只有实际运行时,该工具才可能检测到问题。
一般只在开发测试时启用,在正式环境中会影响性能。
https://pdos.csail.mit.edu/6.824/labs/lab-mr.html 中有要求及提示。
代码中提供了非分布式实现的mrsequential.go
,使用
go build -buildmode=plugin ../mrapps/wc.go
go run mrsequential.go wc.so pg-*.txt
即可运行,并得到标准结果mr-out-0
。
该文件的代码都可以作为参考。
其中第一句是将map(), reduce()
通过go的插件包(.so
文件)动态装载。
如果修改了mr/
中的内容,运行前都需要重新编译wc.so
:go build -buildmode=plugin ../mrapps/wc.go
。
main/
中的mrmaster.go, mrworker.go
启动进程,加载map(), reduce()
动态库,并调用mr/
下的master, worker
。
我们要修改的就是mr/
下的master, worker
。
以下均main目录下。
自测
重新编译wc.go
:go build -race -buildmode=plugin ../mrapps/wc.go
运行master:
$ rm mr-out*
$ go run -race mrmaster.go pg-*.txt
(pg-*.txt
是要传给Map Worker的、分割后的输入文件)
在不同终端运行多个worker:go run -race mrworker.go wc.so
,
最终测试
直接运行./test-mr.sh
。它会重新编译文件并运行测试。
在文件中使用RACE=-race
打开冲突检测。
注
master需运行若干Map Worker(每个执行一个或多个Map Task),每个Worker生成给定的nReduce
个mr-X-Y
输出文件(X
为Map Task编号,Y
为Reduce Task编号)。
然后,master需运行nReduce
个Reduce Worker(每个执行一个Reduce Task),并生成nReduce
个mr-out-*
输出文件。
测试脚本test-mr.sh
会将这些输出文件mr-out-*
按key排序、合并成一个文件mr-wc-all
,并与mr-correct-wc.txt
进行比较。
通过cat mr-out-* | sort | more
可查看排序后的输出。
类似rpc.Register: method "Done" has 1 input parameters; needs exactly three
的错误可以忽略。
Ignore these messages; registering the coordinator as an RPC server checks if all its methods are suitable for RPCs (have 3 inputs); we know that Done is not called via RPC.
类似dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
的错误应该也可以忽略。
代码中已经基本实现了 利用RPC实现Worker与Master的通信。
在Worker中声明CallExample()
。
进行测试,即可看到worker通过RPC调用了master的Example()
,传入了ExampleArgs
,并接受了master的输出ExampleReply
即100
。
先简单实现Map任务,然后补充细节,最后比着实现Reduce任务。
Task
修改ExampleReply
为要传递的Task
(若一个Worker执行多个Task,可多次请求Task
)。
Task
为需要下面元素的struct:
int
。[]string
。Map Worker需要若干pg-*.txt
;Reduce Worker需要Map生成的w*r
个中间文件,用mr-X-Y
命名(X
为Map Task编号,Y
为Reduce Task编号)。
// task
type Task struct {
TaskType TaskType
TaskID int // 用于生成对应ID的文件
NReduce int // 用于取模
InputFiles []string
}
// type of task
type TaskType int
const (
MapTask = iota
ReduceTask
Wait // no available task for now
Kill // all tasks done
)
Master
通过go的channel可方便地存储可用的task,在需要时取出task传给Worker。
所以Master为如下struct:
type Master struct {
MapTaskChannel chan *Task
ReduceTaskChannel chan *Task
NMap int // number of Map tasks
NReduce int // number of Reduce tasks
NTask int // number of current tasks
Status MasterStatus
}
// status of master
type MasterStatus int
const (
MapPhase = iota
ReducePhase
AllDone
)
然后实现创建Master的函数func MakeMaster(files []string, nReduce int) *Master {}
。
生成Map Task
Master先生成需要的Map Task,并存到Master的chan中:
// The start of Map phase. Generate Map Tasks and store them in MapTaskChannel
func (m *Master) MakeMapTask(files []string) {
for _, v := range files {
id := m.GetTaskID()
task := Task{
TaskType: MapTask,
TaskID: id,
InputFiles: []string{v},
}
m.MapTaskChannel <- &task
}
println("finish MakeMapTask")
}
// Get an ID for a new task
func (m *Master) GetTaskID() int {
m.NTask++
return m.NTask - 1
}
分发Task
类似master.go
中的Example()
:
// When Worker is started or finishes its task, it calls DistributeTask() to get a new task.
// When there is no task available, Master sends a Wait task to tell Worker to wait for a few seconds before calling again.
// When everything is finished, Master sends a Kill task and the Worker then finish.
func (m *Master) DistributeTask(args *Request, reply *Task) error {
fmt.Println("DistributeTask")
switch m.Status {
case MapPhase:
if len(m.MapTaskChannel) > 0 {
*reply = *<-m.MapTaskChannel
// reply = <-m.MapTaskChannel // 错。reply为传值引用,修改reply不会影响worker处的reply
} else {
reply.TaskType = Wait
}
case ReducePhase:
if len(m.ReduceTaskChannel) > 0 {
*reply = *<-m.ReduceTaskChannel
} else {
reply.TaskType = Wait
}
case AllDone:
reply.TaskType = Kill
}
return nil
}
然后在worker.go
中的Worker()
调用CallDistributeTask()
(同CallExample()
),给出参数并获取task。
检查
测试一遍,3个worker可分别输出获得的task:
Get Task: &{0 0 10 [pg-being_ernest.txt]}
Get Task: &{0 1 10 [pg-dorian_gray.txt]}
Get Task: &{0 2 10 [pg-frankenstein.txt]}
在Worker()
中调用DoMapTask()
,然后实现DoMapTask()
:
首先与mrsequential.go
相同,读入task中的文件,并将mapf
处理后的结果存入中间值intermediate
。
然后将intermediate
中的键值对,根据key划分为r部分(用ihash(Key)%r
),分别存到r个mr-X-Y
文件中。
存储用 json.Encoder 编码。为方便此处处理,先将属于同一部分的键值对分别存储,再对每部分键值对一起编码、存储到指定文件。
注意,论文中Worker执行完Map后需对数据排序,在数据量大的情况下会使Reduce处的排序更快。但此处不需要。
func DoMapTask(mapf func(string, string) []KeyValue, task *Task) {
// read each input file, pass it to Mapf and accumulate the intermediate Mapf output.
filename := task.InputFiles[0]
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
intermediate := mapf(filename, string(content))
// divide the output into r parts and store them in mr-X-Y individually
r := task.NReduce
prefix := "mr-" + strconv.Itoa(task.TaskID) + "-"
// divide them in advance to encode them by json.Encoder conveniently
// the KVs with the same key need to be encoded and stored together
dividedKV := make([][]KeyValue, r)
for _, kv := range intermediate {
hs := ihash(kv.Key) % r
dividedKV[hs] = append(dividedKV[hs], kv)
}
for i := 0; i < r; i++ {
oname := prefix + strconv.Itoa(i)
ofile, _ := os.Create(oname)
enc := json.NewEncoder(ofile)
for _, kv := range dividedKV[i] {
enc.Encode(kv)
}
ofile.Close()
}
fmt.Println("finish DoMapTask")
}
测试
运行1个Worker,会发现有10个输出文件mr-0-0, mr-0-1, ..., mr-0-9
,且这些文件包含的 key 都不相同。
设计task信息结构
Master需要记录每个task的起始时间,以便在task超时时重新分发该task。
Master还需记录每个task的状态(未分配,已分配,已完成)。
task是传给Worker的结构,没必要在task中加这些信息,所以定义TaskInfo
结构表示这些信息(也需记录指向的Task):
// info of task
type TaskInfo struct {
TaskStatus TaskStatus
ExpiredAt time.Time
Task *Task
}
// status of task
type TaskStatus int
const (
TaskWaiting = iota
TaskRunning
TaskFinished
)
Master需维护TaskInfoMap map[int]*TaskInfo
来存储所有task对应的TaskInfo
(以 TaskID 为key)。
注意,要在MakeMapTask
和MakeReduceTask
中初始化map。
type Master struct {
MapTaskChannel chan *Task
ReduceTaskChannel chan *Task
NMap int // number of Map tasks
NReduce int // number of Reduce tasks
NTask int // number of current tasks
Status MasterStatus
TaskInfoMap map[int]*TaskInfo
}
记录并更新task信息
在MakeMapTask()
中生成task后,更新master的TaskInfoMap
。
{
...
taskInfo := TaskInfo{
TaskStatus: TaskWaiting,
Task: &task,
}
m.NewTaskInfo(&taskInfo)
}
// Store a taskInfo in Master.TaskInfoMap.
func (m *Master) NewTaskInfo(taskInfo *TaskInfo) {
id := taskInfo.Task.TaskID
value, _ := m.TaskInfoMap[id]
if value != nil {
fmt.Println("TaskInfo conflicted:", id, value, taskInfo)
} else {
m.TaskInfoMap[id] = taskInfo
}
}
在DistributeTask()
分发task后,更新TaskInfo的过期时间。
{
...
switch m.Status {
case MapPhase:
if len(m.MapTaskChannel) > 0 {
*reply = *<-m.MapTaskChannel
// reply = <-m.MapTaskChannel // 错。reply为传值引用,修改reply不会影响worker处的reply
if !m.UpdateTaskInfo(reply.TaskID) {
fmt.Println("No such TaskInfo or Task", reply.TaskID, "runs again.")
}
} else {
reply.TaskType = Wait
}
case ReducePhase:
if len(m.ReduceTaskChannel) > 0 {
*reply = *<-m.ReduceTaskChannel
if !m.UpdateTaskInfo(reply.TaskID) {
fmt.Println("No such TaskInfo or Task", reply.TaskID, "runs again.")
}
} else {
reply.TaskType = Wait
}
case Done:
reply.TaskType = Kill
}
}
// Update TaskStatus and ExpiredAt.
func (m *Master) UpdateTaskInfo(taskID int) bool {
taskInfo, ok := m.GetTaskInfo(taskID)
if !ok || taskInfo.TaskStatus != TaskWaiting {
return false
}
taskInfo.TaskStatus = TaskRunning
taskInfo.ExpiredAt = time.Now().Add(ExpireTime)
return true
}
task超时检测及处理
用协程周期运行一个周期性超时检测函数,遇到超时任务则将其放入队列等待重新分配。应该不需要通知正在执行的worker?因为使用生成临时文件(随机名字),再改文件名的方式,不会冲突。
// main/mrmaster.go calls TimeoutHandler() periodically to detect time-outs and redistribute these tasks.
func (m *Master) TimeoutHandler() {
for {
time.Sleep(WaitPeriod)
if m.Status == AllDone {
return
}
now := time.Now()
for _, v := range m.TaskInfoMap {
if v.TaskStatus == TaskRunning && v.ExpiredAt.Before(now) {
v.TaskStatus = TaskWaiting
switch v.Task.TaskType {
case MapTask:
m.MapTaskChannel <- v.Task
break
case ReduceTask:
m.ReduceTaskChannel <- v.Task
break
}
}
}
}
}
生成Reduce Task
该函数在所有Map Task完成后执行。
// The start of Reduce phase. Generate Reduce Tasks and store them in ReduceTaskChannel.
func (m *Master) MakeReduceTask() {
for i := 0; i < m.NReduce; i++ {
id := m.GetTaskID()
task := Task{
TaskType: ReduceTask,
TaskID: id,
InputFiles: m.GetReduceInputFiles(i),
}
taskInfo := TaskInfo{
TaskStatus: TaskWaiting,
Task: &task,
}
m.NewTaskInfo(&taskInfo)
m.ReduceTaskChannel <- &task
fmt.Println("Generate Reduce Task:", task)
}
fmt.Println("finish MakeReduceTask")
}
// Generate the file names that the reduce worker needs (mr-*-y).
func (m *Master) GetReduceInputFiles(rid int) []string {
var s []string
suffix := "-" + strconv.Itoa(rid)
for i := 0; i < m.NMap; i++ {
s = append(s, "mr-"+strconv.Itoa(i)+suffix) // mr-*-rid
// todo: need to validate the validity of the file
}
return s
}
实现DoReduceWork()
DoReduceWork()
需要从Map
生成的若干个文件中读入,然后对这些文件中的每一个key,将所有的value保存为一个集合。对于每一个key和相应的value集合,执行reducef
。
按照mrsequential
实现即可,只是需要先从多个文件中读入获取intermediate
,并使用临时文件输出。
func DoReduceTask(reducef func(string, []string) []KeyValue, task *Task) {
// Read each input file and get the intermediate (all key/value pairs). Need to decode them from json.
var tempKV KeyValue
var intermediate []KeyValue
for _, filename := range task.InputFiles {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
dec := json.NewDecoder(file)
for {
if dec.Decode(&tempKV) != nil {
break
}
intermediate = append(intermediate, tempKV)
}
file.Close()
}
sort.Sort(ByKey(intermediate))
...
}
修改Work()使Worker按照任务类型执行不同函数
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
done := false
for !done {
// use RPC to the master and get the task
task := CallDistributeTask()
fmt.Println("Get Task:", task)
// worker implementation here
switch task.TaskType {
case MapTask:
DoMapTask(mapf, task)
CallTaskDone(task)
case ReduceTask:
DoReduceTask(reducef, task)
CallTaskDone(task)
case Wait:
time.Sleep(TaskWaitPeriod)
case Kill:
done = true
fmt.Println("A worker finished.")
default:
fmt.Println("Worker(): Invalid TaskType:", task.TaskType)
}
// Request another task.
}
}
worker在任务完成时调用CallTaskDone(task)
,从而调用master的TaskDone()
,表示一个任务完成。
在Master
中加一个NUnfinishedTask int // number of unfinished tasks
,然后在TaskDone
中减少这个值,用来检测当前阶段所有task是否完成。
// A task is finished.
func (m *Master) TaskDone(task *Task, reply *string) error {
taskStatus_mutex.Lock()
defer taskStatus_mutex.Unlock()
info, ok := m.GetTaskInfo(task.TaskID)
if !ok {
fmt.Println("Invalid TaskID:", task.TaskID)
} else if info.TaskStatus != TaskFinished {
// Be aware that the TaskStatus of an undone task may be TaskWaiting or TaskRunning.
m.NUnfinishedTask--
info.TaskStatus = TaskFinished
fmt.Println("Task", task.TaskID, "finished.")
if m.CurrentPhaseDone() {
m.NextPhase()
}
} else {
fmt.Println("Task", task.TaskID, "got a timeout and finished again.")
}
return nil
}
每当一个task完成时(TaskDone()
被调用),检查Master是否已完成当前阶段,然后进行下一阶段(MapPhase->ReducePhase
,或ReducePhase->Done
)。
// Whether all the tasks are finished or not.
func (m *Master) CurrentPhaseDone() bool {
return m.NUnfinishedTask == 0
}
// Change status from MapPhase to ReducePhase, or from ReducePhase to All Done.
func (m *Master) NextPhase() {
switch m.Status {
case MapPhase:
m.Status = ReducePhase
m.MakeReduceTask()
case ReducePhase:
m.Status = AllDone
}
}
到这所有基本功能应该已经完成了。
-race
会在运行时检测可能的冲突。
简单自测可发现,master
在对m.Status
修改时出现了冲突访问。
所以对所有修改/访问m.Status
的函数需要加锁。
但是只有在NextPhase()
中会对该值有写操作,且该函数一共也只执行两次,其它都是对m.Status
的读。所以用read_count
+两个互斥锁限制写,允许同步读。
var mutex sync.Mutex // a mutex for m.Status
var rc_mutex sync.Mutex // a mutex for read_count
var read_count int // a read_count for m.Status
// Lock
func ReaderLock() {
rc_mutex.Lock()
if read_count++; read_count == 1 {
mutex.Lock()
}
rc_mutex.Unlock()
}
// Unlock
func ReaderUnlock() {
rc_mutex.Lock()
if read_count--; read_count == 0 {
mutex.Unlock()
}
rc_mutex.Unlock()
}
taskInfo.TaskStatus
也会出现冲突。
主要在UpdateTaskInfo()
、TaskDone()
和TimeoutHandler()
中冲突。因为和m.Status
的情况不太一样,所以直接再加一个锁。
var taskStatus_mutex sync.Mutex // a mutex for TaskStatus
上面的实现中有个小问题:DoMapTask()
也需使用输出临时文件,并在完成时重命名的方式。
任务完成、检查是否进入下一阶段的判定是:一个任务的状态变为Finished
。但在这时这个任务可能已因超时被重复执行,但在重复执行完成前,该任务可能导致整个阶段已完成。此时重复执行的Map Worker仍在更新文件,会导致Reduce阶段使用的输入文件有问题。
所以在Map阶段的DoMapTask()
也使用临时文件输出、然后更名即可。
gxb@GXB:/mnt/e/GitHub/6.824lab/src/main$ ./test-mr.sh
*** Starting wc test.
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting crash test.
--- crash test: PASS
*** PASSED ALL TESTS
完成。
Lab1总体不是很难,想清楚就好,要写的比较多。