6.824分布式课程讲稿翻译 - Lab1 MapReduce

概述

在这个实验中, 你将要实现一个MapReduce库作为Go编程的入门练习, 并用于之后实现一个容错的分布式系统. 首先你需要编写一个简单的MapReduce程序; 其次, 你需要实现一个Master用于向MapReduce Worker派发Task, 并处理Worker的错误. 这个库的接口定义及容错的实现可以仿照我们讨论过的MapReduce论文中的设计.

协作规则

除了我们给你的那部分代码, 其他所有你提交到6.824课程的代码都需要由你自己写完. 不允许抄袭他人的或往年的答案. 你可以与其他同学讨论这个作业, 但不能复制他人的代码. 至于为何采用这个规则, 是因为我们相信自己动手去设计实现每个Lab才能让你收获最大.

请不要向现在或将来上6.824课程的同学公开你的代码. github.com的repository默认是公开的, 所以请不要把你的代码放上去, 除非你把自己的repo设为私有的. 你会发现使用MIT的GitHub会很方便, 但要记得创建一个私有的repo.

运行环境

你将要使用Go语言实现这个实验(以及往后的实验). Go语言主页上有很多教程让你参考. 可能最方便的还是你在自己的电脑上安装Go来开发. 你也可以在Athena上使用Go. 我们会使用Go 1.7版本来对你们的实验评分, 由于我们不清楚其他版本会出现的问题, 因此你们也应该使用Go 1.7.

我们提供了一个MapReduce实现的部分代码, 支持分布式和非分布式操作. 你可以通过git(一个版本控制系统)获取初始的实验代码. 要详细了解Git可以学习Pro Git book, git用户手册, 或者如果你熟悉其他的版本控制系统的话, 你会发现这篇文章CS-oriented overview of git很有帮助.

如果你用Athena, 你需要用x86或x86_64的Athena机器, 也就是说uname -a会出现i386 GNU/Linux or i686 GNU/Linux or x86_64 GNU/Linux. 你可以通过ssh athena.dialup.mit.edu登录一台公共的i686 Athena主机. 在Athena上你需要先执行这些命令:

athena$ add git
athena$ setup ggo_v1.7

课程的git地址是git://g.csail.mit.edu/6.824-golabs-2017. 要在你的目录下使用这些文件, 你需要通过执行以下命令clone这个repository:

$ git clone git://g.csail.mit.edu/6.824-golabs-2017 6.824
$ cd 6.824
$ ls
Makefile src

Git允许你跟踪对代码的修改. 例如, 你想保存目前的进度, 可以通过执行以下命令commit这些修改:

$ git commit -am 'partial solution to lab 1'

我们给你的MapReduce实现有两种模式的操作: 串行的(seqential)和分布式的(distributed). 在第一种模式下, 每次执行一个Map或Reduce的Task: 首先执行完第一个Task, 再执行第二个, 第三个等等. 所有的Map Task执行完后, 再执行第一个Reduce Task, 接着执行第二个…. 这个模式虽然不快, 但是很方便调试(debugging).

而分布式的执行模式运行许多worker线程, 首先并行执行Map Task, 执行完再并行执行Reduce Task. 这种模式会快很多, 但也更难以实现和调试.

序言: 熟悉源码(Getting familiar with the source)

mapreduce包提供了一个简单的MapReduce库. 应用需要调用Distributed()(在master.go文件中)来启动一个job, 也可以调用Sequential()来串行执行以便调试.

代码按照以下流程执行一个job:

  1. 应用提供一系列的输入文件, 一个map函数, 一个reduce函数, 以及需要的Reduce Task的数量(nReduce).
  2. Master按照下述流程创建: 先启动一个RPC Server(见master_rpc), 然后等待worker来注册(register, 使用master.go中定义的Register()). 当这些Task变得可用(在第4, 5步)时, schedule()(schedule.go文件)决定如何将这些Task分配到worker上, 以及如何进行worker的失败处理.
  3. Master把每个输入文件对应一个Map Task, 并直接地(当使用Sequential())或通过调用Worker的DoTask RPC请求间接地, 对每个Map Task执行至少一次的doMap()(common_map.go文件). 每次doMap()的调用读取相应的文件, 对文件内容调用map函数, 并将结果以键值对的格式写入到nReduce个中间文件. 过程中doMap()需要对每个Key的结果哈希映射到某个的中间文件, 等待Reduce Task作后续处理. 所有Map Task结束后将会有nMap X nReduce个文件. 每个文件名包含一个前缀, Map Task号以及Reduce Task号. 假设有两个Map Task和三个Reduce Task, 那么Map Task将创建如下6个中间文件:
    mrtmp.xxx-0-0
    mrtmp.xxx-0-1
    mrtmp.xxx-0-2
    mrtmp.xxx-1-0
    mrtmp.xxx-1-1
    mrtmp.xxx-1-2
    
    每个Worker必须能够读其他Worker写的文件以及输入文件. 实际部署环境中使用如GFS等的分布式文件系统, 来保证即便Worker运行在不同的机器上都能够访问这些文件. 但在实验中你只需要在同一台机器上运行, 并使用本地文件系统.
  4. 下一步Master需要对每个reduce task至少调用一次doReduce()(common_reduce.go文件). 与doMap()类似, 它可以直接运行或通过某个Worker进行. 第r个Reduce Task的doReduce()调用会收集每个Map Task输出的第r个中间文件, 并对文件中出现的每个Key调用reduce函数. Reduce Task产生nReduce个结果文件.
  5. Master调用mr.merge()(master_splitmerge.go文件), 这个函数合并前一步产生的nReduce个文件到一个单独的文件中.
  6. Master向每个Worker发送一个Shutdown RPC调用, 然后关闭自己的RPC Server.

注: 这门课接下来的练习中, 你将要自己实现/修改doMap, doReduceschedule. 这些函数分别在common_map.go, common_reduce.go, 以及 schedule.go中. 同时你还需要在../main/wc.go中实现Map和Reduce函数.

你不需要修改其他文件, 但阅读这些文件或许能帮助你更好地理解其他函数是如何在整个系统架构中运行的.

Part 1: MapReduce 输入输出

我们提供的这个MapReduce的实现缺少了某些部分. 在你编写第一组Map/Reduce函数前, 你需要完成MapReduce的串行(sequential)版本的实现. 需要注意的是, 我们给你的代码缺少两个关键部分: 将一个Map Task的输出分开的函数, 以及收集所有Reduce Task输出的函数. 这些Task分别在common_map.go中的doMap()common_reduce.go中的doReduce()函数中实现. 这些文件中的注释会你指明正确的方向.

为了让你判断是否正确地实现了doMap()doReduce(), 我们提供了一组Go单元测试集来验证你实现的正确性. 这些测试写在test_test.go文件中. 要对你已经实现的串行版本进行测试, 需要执行:

$ cd 6.824
$ export "GOPATH=$PWD"  # go needs $GOPATH to be set to the project's working directory
$ cd "$GOPATH/src/mapreduce"
$ go test -run Sequential
ok      mapreduce    2.694s

如果我们在自己的机器上执行你的程序时, 你的程序通过了Sequential测试(输出入上述命令行), 那么你这部分可以得满分.

如果命令行输出中测试项旁边没有出现ok的字样, 表示你的实现可能有一些bug. 如果需要更详细的输出, 可以在common.go中设置debugEnabled = true并在上面测试命令中加上-v, 你将会得到如下更详细的输出:

$ env "GOPATH=$PWD/../../" go test -v -run Sequential
=== RUN   TestSequentialSingle
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
master: Map/Reduce task completed
--- PASS: TestSequentialSingle (1.34s)
=== RUN   TestSequentialMany
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
--- PASS: TestSequentialMany (1.33s)
PASS
ok      mapreduce    2.672s

Part 2: Single-worker 单词统计(Word Count)

现在你将实现单词统计——一个简单的MapReduce用例. 在main/wc.go文件中, 你会发现空的mapF()reduceF()函数, 你的工作就是补充代码使得wc.go统计输入中每个单词的出现次数. 一个单词是连续的字母序列, 正如unicode.IsLetter的定义.

在src/main/目录下有一些文件名形如pg-*.txt的输入文件, 都是从Project Gutenberg下载的. 下面介绍如何用这些文件作为输入运行wc:

$ cd 6.824
$ export "GOPATH=$PWD"
$ cd "$GOPATH/src/main"
$ go run wc.go master sequential pg-*.txt
# command-line-arguments
./wc.go:14: missing return at end of function
./wc.go:21: missing return at end of function

由于mapF()reduceF()不完整导致了编译失败.

回忆一下MapReduce论文的第二章节(Section 2). 你的mapF()reduceF()函数会和论文2.1小节的有少许不同. 你的mapF()需要同时接收文件名和文件内容; 函数需要把文本内容分成单词, 并返回一个Go slice类型的mapreduce.KeyValue. 虽然你可以自定义在mapF()的输出中key和value的形式, 但在单词统计中只有把单词当做key才有意义. 你的reduceF()函数将会对mapF()产生的每组key和value被调用一次, 函数需要返回一个字符串表示这个key出现的总次数.

提示:

  • 要想更好地了解Go中的string, 可以阅读Go Blog on strings
  • 你可以使用strings.FieldsFunc函数将一个string分成多个子串
  • strconv包能方便地将string转成integer类型

你可以通过以下命令测试你的答案:

$ cd "$GOPATH/src/main"
$ time go run wc.go master sequential pg-*.txt
master: Starting Map/Reduce task wcseq
Merge: read mrtmp.wcseq-res-0
Merge: read mrtmp.wcseq-res-1
Merge: read mrtmp.wcseq-res-2
master: Map/Reduce task completed
14.59user 3.78system 0:14.81elapsed

输出结果将在文件mrtmp.wcseq中. 如果以下命令产生的结果如下所示则表明你的实现是正确的:

$ sort -n -k2 mrtmp.wcseq | tail -10
he: 34077
was: 37044
that: 37495
I: 44502
in: 46092
a: 60558
to: 74357
of: 79727
and: 93990
the: 154024

你通过下面的命令删除所有的输出文件和中间文件:

$ rm mrtmp.*

为了使测试更简单, 可以运行:

$ bash ./test-wc.sh

之后就能知道你的答案是否正确了.

如果我们在自己的机器上执行你的程序时, 你的MapReduce单词统计输出 (在之前实现的串行(Sequential)执行模式下)与正确输出相同, 那么你这部分可以得满分.

Part 3: 分发MapReduce Task

你现在的实现一次只能运行一个Map或Reduce的Task. MapReduce最大的卖点之一就是它能自动将串行代码并行化, 不需要开发者做额外的工作. 在实验的这个部分中, 你需要完成一个MapReduce的新版本, 需要能把任务分配给并行的,或运行在多核上的多个worker线程. 虽然不像现实的MapReduce集群那样运行在多台不同的机器上, 但你的实现会用到RPC来模拟分布式的计算.

mapreduce/master.go中的代码完成了管理MapReduce任务的大部分工作. 我们还在mapreduce/worker.go中提供了一个worker线程需要的所有代码, 以及在mapreduce/common_rpc.go中提供了处理RPC的代码.

你的任务是在mapreduce/schedule.go中实现schedule()函数. 在一个MapReduce任务中master调用两次schedule(), 一次用于Map阶段, 一次用于Reduce阶段. schedule的工作是向可用的Worker分发Task. 一般情况下, Task的个数要远多于Worker的数量, 因此schedule()需要给每个Worker一系列的Task, 每次分发一个. schedule()需要等待所有Task执行完成后再返回.

schedule()通过读取registerChan参数来知道Worker的集合. 这个channel对每个worker产生一个string, 表示这个Worker的RPC地址. 某些Worker可能在schedule()被调用之前就存在了, 有些可能是在schedule()运行时启动的, 所有这些Worker都会在registerChan上出现. schedule()需要使用所有Worker, 包括该函数运行时启动的那部分.

schedule()通过向Worker发送一个Worker.DoTask的RPC来让Worker执行一个Task. RPC的参数被定义在mapreduce/common_rpc.go文件中的DoTaskArgs结构体中. 其中File成员只用于Map Task, 表示要读入的文件名; schedule()可以在mapFiles参数中找到这些文件名.

使用mapreduce/common_rpc.go中的call()函数可以向Worker发送一个RPC. 第一个参数是从registerChan中读到的Worker地址, 第二个参数应为"Worker.DoTask", 第三个参数是DoTaskArgs结构体, 最后一个参数应为nil.

你在Part 3的答案只能包含对schedule.go的修改. 如果你在debug的过程中修改了其他文件, 请在提交之前恢复这些文件原有的内容再测试.

要测试答案的话, 应该用和Part 1同样的Go测试集, 但记得用-run TestBasic代替-run Sequential, 这样就会执行分布式的测试用例(不包含Worker错误), 而不是之前运行过的串行用例:

$ go test -run TestBasic

如果我们在自己的机器上执行你的程序时, 你的程序通过了test_test.go(上面这段代码的测试)中TestBasic的测试用例, 那么你这部分可以得满分.


提示:

  • Go RPC包的文档: RPC package
  • schedule()需要并行地向Worker发送RPC请求, 这样Worker才能并发地执行Task. 你会发现go statement这样的语法可以方便地达到这个目的, 见Concurrency in Go.
  • schedule()需要等待一个Worker完成时才能给它派发另一个Task. 你会发现Go的channel类型)对于实现这个功能很有帮助.
  • 可能会用到sync.WaitGroup
  • 追踪Bug最简单的方式就是插入一些输出状态的语句(可以调用common.godebug()). 将所有的输出收集到一个文件里可以用go test -run TestBasic > out, 然后思考这个输出和你的预期是否一致. 最后这一步思考是很重要的.
  • 要检查你的代码是否有竞争条件, 可以用go test -race -run TestBasic > out对你的测试运行Go的race detector


注: 我们给你的代码在一个单独的Unix进程中执行若干个Worker线程, 并可以利用同一台机器上的多个核(core). 要使这些Worker运行在多台机器上并通过网络通信, 可能需要做一些修改. RPC需要通过TCP而不是Unix-domain socket, 还需要一个能在所有机器上启动Worker进程的方法, 并且所有机器需要通过某种网络文件系统(NFS)来共享存储.

Part 4: 处理Worker错误

这个部分中你需要让Master处理失败的Worker. 这个工作在MapReduce中相对简单, 因为Worker不保存状态. 如果一个Worker失败了, Master向Worker发送的任何RPC调用都会失败(例如由于超时). 因此, 如果Master到Worker的RPC失败了, Master需要将分配给这个Worker的任务分配给另一个Worker.

一次RPC调用失败并不意味着Worker没有执行这次Task, Worker可能执行了Task但丢失了回复(lost reply), 或者WOrker可能仍在执行中单Master的RPC超时了. 因此, 有可能发生两个Worker收到同一个Task并执行产生输出的情况. 对于同一个输入, 要求两次调用map或reduce函数需产生同样的输出(也就是说map和reduce函数必须是”functional”的), 因此就算不确定后续的处理读入的是哪一个Worker产生的输出, 也不会造成不一致性(inconsistencies). 此外, MapReduce框架保证了map和reduce函数产生输出是原子操作: 输出文件要么不存在, 若存在的话就一定包含一次map或reduce函数执行的完整输出(实验代码没有实现这个功能, 但取而代之的是Worker只会在执行一个Task结束之后才失败, 因此不会有多个Task并发执行的情况发生).

注: 你不需要处理Master的失败. 使Master自动容错远比Worker容错更困难, 因为Master有状态, 在Master失败后需要恢复状态以便回到之前的操作. 后面的许多实验正是要挑战这个问题.

你的实现需要通过test_test.go中两个剩下的测试用例. 第一个用例测试一个WOrker的是啊比, 而第二个测试多个Worker的多个失败. 测试用例周期性地启动新的Worker, 使Master用来继续处理, 但这些Worker会在处理一些Task之后失败. 用以下命令执行这些测试:

$ go test -run Failure

如果我们在自己的机器上执行你的程序时, 你的程序通过了所有包含Worker错误的测试用例(就是上面的命令执行的测试), 那么你这部分可以得满分.

你在Part 4提交的答案只应该包含对schedule.go文件的修改. 如果你在debug的过程中修改了其他文件, 请在提交之前恢复这些文件原有的内容再测试.

Part 5: 生成反向索引(选做, 附加分)

这是一个挑战性的选做练习, 你需要构造Map和Reduce函数来生成一个反向索引.

反向索引在计算机科学中被广泛使用, 特别是用于文档搜索. 粗略地说, 反向索引就是在大量数据中建立一个从每个数据的特定字段(interesting facts), 到该数据实际存储位置的映射. 例如在搜索中, 反向索引可能就是一个从关键词到包含这些关键词的文档的一个映射.

我们已经创建了一个wc.go文件的副本:main/ii.go. 你需要在main/ii.go中修改mapFreduceF使得它们共同产生一个反向索引. 运行ii.go应输出一系列元组, 每个一行, 如下所示:

$ go run ii.go master sequential pg-*.txt
$ head -n5 mrtmp.iiseq
A: 16 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
ABC: 2 pg-les_miserables.txt,pg-war_and_peace.txt
ABOUT: 2 pg-moby_dick.txt,pg-tom_sawyer.txt
ABRAHAM: 1 pg-dracula.txt
ABSOLUTE: 1 pg-les_miserables.txt

上面列的可能不够清晰, 其标准格式如下:

word: #documents documents sorted and separated by commas

要从这部分拿到满分, 你需要通过bash ./test-ii.sh的测试, 命令和输出如下:

$ sort -k1,1 mrtmp.iiseq | sort -snk2,2 mrtmp.iiseq | grep -v '16' | tail -10
women: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
won: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
wonderful: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
words: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
worked: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
worse: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
wounded: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
yes: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
younger: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
yours: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt

运行所有测试

你可以通过脚本src/main/test-mr.sh来执行所有测试. 如果答案正确的话, 你的输出应该如下所示:

$ bash ./test-mr.sh
==> Part I
ok      mapreduce    3.053s

==> Part II
Passed test

==> Part III
ok      mapreduce    1.851s

==> Part IV
ok      mapreduce    10.650s

==> Part V (challenge)
Passed test

作业提交方式