MapReduce: Simplified Data Processing on Large Clusters

Table of Contents

http://research.google.com/archive/mapreduce.html @ 2004

1 Abstract

2 Introduction

3 Programming Model

  • 用户指定map以及reduce函数,其中map函数处理每个key/value pair,而reduce函数处理同一个key对应的所有values.
  • The intermediate val-ues are supplied to the user’s reduce function via an iter-ator. This allows us to handle lists of values that are too large to fit in memory.(对于reduce阶段处理的values都是iterator提供的,这样可以不用担心values过多放不进入内存)
  • Distributed Sort: The map function extracts the key from each record, and emits a key, record pair. The reduce function emits all pairs unchanged. This compu-tation depends on the partitioning facilities described in Section 4.1 and the ordering properties described in Sec-tion 4.2. (关于分布式排序这个比较值得关注,map以及reduce阶段都没有做任何工作。所有的工作都是依赖framework完成的:partition指定到一个reducer上面,而reducer是按照key的顺序来处理的,用户可以指定key的比较函数。)

4 Implementation

  • Many different implementations of the MapReduce in-terface are possible. The right choice depends on the environment. For example, one implementation may be suitable for a small shared-memory machine, another for a large NUMA multi-processor, and yet another for an even larger collection of networked machines.(mapreduce可以有很多实现,这个取决于环境。)
  • This section describes an implementation targeted to the computing environment in wide use at Google: large clusters of commodity PCs connected together with switched Ethernet . In our environment:(下面是google的环境)
    • (1) Machines are typically dual-processor x86 processors running Linux, with 2-4 GB of memory per machine.(双核x86,2-4GB内存)
    • (2) Commodity networking hardware is used – typically either 100 megabits/second or 1 gigabit/second at the machine level, but averaging considerably less in over-all bisection bandwidth.(1Gbps的网卡,但是带宽依然有限)
    • (3) A cluster consists of hundreds or thousands of ma-chines, and therefore machine failures are common.(考虑机器故障)
    • (4) Storage is provided by inexpensive IDE disks at-tached directly to individual machines. A distributed file system developed in-house is used to manage the data stored on these disks. The file system uses replication to provide availability and reliability on top of unreliable hardware.(使用廉价IDE磁盘,但是应该没有使用磁盘阵列,而是使用gfs做soft RAID)
    • (5) Users submit jobs to a scheduling system. Each job consists of a set of tasks, and is mapped by the scheduler to a set of available machines within a cluster.(用户提交到scheduler system,scheduler来指定job在哪些机器运行,每个job包含很多task)

4.1 Execution Overview

  • The Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of M splits. The input splits can be pro-cessed in parallel by different machines. (框架自动将输入数据拆分成为M份,每个mapper实例处理一份,分布在各个机器上面且可以同时运行。对于这个M没有办法指定)
  • Reduce invoca-tions are distributed by partitioning the intermediate key space into R pieces using a partitioning function (e.g.,hash(key) mod R). The number of partitions (R) and the partitioning function are specified by the user.(reducer的实例个数可以指定,分布在各个机器上面处理落到这个reducer上面所有的key/value pairs。其中partition函数用户可以指定)

mapreduce-execution-overview.png

整个工作逻辑是:

  • The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (con-trollable by the user via an optional parameter). It then starts up many copies of the program on a clus-ter of machines.(根据输入文件大小,按照每个block 16-64M这个数值用户可以指定,来进行切分。这个数目就是mapper实例个数)
  • One of the copies of the program is special – the master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or a reduce task.(所有的都是worker,其中一个worker比较特殊成为master,工作是将mapper/reducer指派到worker上面去,并且监控mapper/reducer的运行情况)
  • A worker who is assigned a map task reads the contents of the corresponding input split. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The interme-diate key/value pairs produced by the Map function are buffered in memory.(分配到mapper工作的worker读取对应的数据并且进行解析,将解析的结果丢给map函数处理。输出结果会缓存起来而不是立刻输出)
  • Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers.(mapper缓存的结果周期性地写到本地磁盘,并且是根据partition函数写成R份。mapper将磁盘地址通知给master,一方面是通知完成,另外一方面master可以通知reducer拉数据)
  • When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all in-termediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task. If the amount of intermediate data is too large to fit in memory, an external sort is used.(之后reducer会被master通知到mapper输出结果地址。reducer使用RPC将结果copy到本地。一旦reducer读取到所有的文件之后,就会进行排序。如果内存排序不行的话,那么就会使用外部排序)
  • The reduce worker iterates over the sorted interme-diate data and for each unique intermediate key en-countered, it passes the key and the corresponding set of intermediate values to the user’s Reduce func-tion. The output of the Reduce function is appended to a final output file for this reduce partition.(reducer将同一个key对应的values传递给指定的reduce函数)
  • When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user pro-gram returns back to the user code.(当所有的task完成之后,master就会返回到user code)
  • After successful completion, the output of the mapre-duce execution is available in the R output files (one per reduce task, with file names as specified by the user). Typically, users do not need to combine these R output files into one file – they often pass these files as input to another MapReduce call, or use them from another dis-tributed application that is able to deal with input that is partitioned into multiple files.(输出结果有R份,我们并不需要进行merge,通常这些结果会作为下一轮mapreduce的输入)

4.2 Master Data Structures

master维护的数据结构之需要包括

  • 每个task的状态(idle/in-progress/completed)
  • 每个mapper的R份输出文件名称以及文件大小。对于一个mapper如果一旦有R份输出文件的话,那么就可以认为这个mapper运行完成。

4.3 Fault Tolerance

4.3.1 Worker Failure

  • 检测worker故障是通过周期性ping来完成的。
  • 如果检测到worker失败的话
    • 如果是mapper的话,那么无论如何都需要重新启动这个mapper任务(将状态修改为idle),因为mapper输出结果在本地文件
    • 如果是reducer的话,如果completed的话那么不需要重新执行,因为输出文件在gfs。否则也需要重启(修改为idle状态)
    • 如果是mapper重新计算的话,那么mapper也会同时reducer地址变化。如果reducer之前没有拉下数据的话,那么就在新的mapper机器上拉数据

4.3.2 Master Failure

对于master来说可以定时地做chkp来记录自己的信息,如果挂掉的话那么可以重启来恢复之前执行情况。但是考虑到job只有一个master,挂掉几率非常小,因为可以简单地abort job,通知用户重新执行。

4.3.3 Semantics in the Presence of Failures

当mapper完成时候会一次将所有的R个输出文件通知给master。因为mapper是写本地文件,所以如果有两个相同mapper运行且同时完成,master只取其中一份即可。对于reducer来说的话, 因为是将输出写到gfs上面,所以需要提供文件写原子操作。实现上可以让reducer写临时文件,然后使用提供的atomic rename操作重命名。

TOOD(dirlt):对于mapper和reducer存在non-deterministic operation的话,没有看懂所谓的weaker semantics是什么意思?

  • When the map and/or reduce operators are non-deterministic, we provide weaker but still reasonable se-mantics. In the presence of non-deterministic operators, the output of a particular reduce task R1 is equivalent to the output for R1 produced by a sequential execution of the non-deterministic program. However, the output for a different reduce task R2 may correspond to the output for R2 produced by a different sequential execution of the non-deterministic program.
  • Consider map task M and reduce tasks R1 and R2. Let e(Ri ) be the execution of Ri that committed (there is exactly one such execution). The weaker semantics arise because e(R1 ) may have read the output produced by one execution of M and e(R2) may have read the output produced by a different execution of M.

4.4 Locality

master在选择mapper启动位置的话,会优先考虑将mapper启动到离input data近的机器上面(如果是本地的话就可以节省网络带宽)。

4.5 Task Granularity

4.6 Backup Tasks

  • One of the common causes that lengthens the total time taken for a MapReduce operation is a “straggler”: a ma-chine that takes an unusually long time to complete one of the last few map or reduce tasks in the computation.(straggler表示有某一个机器花去非常多的时间完成了最后一个mapper或者是reducer任务,使得整个任务运行时间延长)
  • Stragglers can arise for a whole host of reasons. For ex-ample, (造成straggler原因非常多)
    • a machine with a bad disk may experience fre- quent correctable errors that slow its read performance from 30 MB/s to 1 MB/s. (磁盘故障)
    • The cluster scheduling sys-tem may have scheduled other tasks on the machine, causing it to execute the MapReduce code more slowly due to competition for CPU, memory, local disk, or net-work bandwidth.(并且如果过多的任务在这个机器上的话,那么因为CPU,mem,network的竞争使用会变得更慢)
    • A recent problem we experienced was a bug in machine initialization code that caused proces-sor caches to be disabled: computations on affected ma-chines slowed down by over a factor of one hundred.(最近遇到的鼓掌机器代码使得CPU cache失效,使得运行速度降低1/100)
  • When a MapReduce operation is close to completion, the master schedules backup executions of the remaining in-progress tasks. The task is marked as completed whenever either the primary or the backup execution completes(解决这个问题就是当task快完成的时候,启动一个backup task同时运行,看哪个首先完成) We have tuned this mechanism so that it typically increases the computational resources used by the operation by no more than a few percent.(调节这个机制使得计算资源并不会浪费太多)
  • As an exam-ple, the sort program described in Section 5.3 takes 44% longer to complete when the backup task mechanism is disabled.(最近的例子就是发现如果将backup停止的话,那么执行时间延长44%)

5 Refinements

5.1 Partitioning Function

5.2 Ordering Guarantees

  • We guarantee that within a given partition, the interme-diate key/value pairs are processed in increasing key or-der. (确保产生的key/value pairs都是按照key进行排序的)

5.3 Combiner Function

combiner可以使得在mapper本地就进行一些reducer操作。通常这些操作可以使得输出减少很多,这样reducer可以节省带宽。

  • The Combiner function is executed on each machine that performs a map task.
  • Typically the same code is used to implement both the combiner and the reduce func-tions. (combiner和reducer使用相同的reduce代码)
  • The only difference between a reduce function and a combiner function is how the MapReduce library han-dles the output of the function. The output of a reduce function is written to the final output file. The output of a combiner function is written to an intermediate file that will be sent to a reduce task.(唯一差别在于combiner写入本地文件,而reducer写入gfs)

这个多谢邓业强同学的提醒。我们假设reduce API是这样的

int reduce(Key key, Iterator<Value> values);

对于这个API有两种解释:

  1. 每次调用reduce的话,这个key对应的所有values都已经存在
  2. 每次调用reduce并不一定保证key对应的所有values存在,只有连续调用reduce之后才能够全部读完。

这个问题值得思考一下:)

5.4 Input and Output Types

允许用户自定义reader/writer

5.5 Side-effects

如果程序有side-effect的话,那么需要保证这个side-effect是原子并且是幂等的。

5.6 Skipping Bad Records

实现方法非常巧妙:

  • 针对每条记录,都将记录的seqnum记录在一个全局变量里面
  • Each worker process installs a signal handler that catches segmentation violations and bus errors.(安装segfault的sighandler)
  • 如果出现错误的话,那么在sighandler里面将这个seqnum作为UDP packet发送给master
  • 如果master连续收到两次相同seqnum的话,那么就会告诉worker忽略seqnum这条记录。

5.7 Local Execution

方便调试以及测试。

5.8 Status Information

提供http接口返回当前master状态,信息包括下面这些:

  • The sta-tus pages show the progress of the computation, such as (计算进度)

    • how many tasks have been completed, (当前多少个任务完成)
    • how many are in progress, (多少任务正在运行)
    • bytes of input, (输入多少个字节)
    • bytes of intermediate data, (中间数据多少字节)
    • bytes of output, (输出多少字节)
    • processing rates, etc.(处理速率)
    • The pages also contain links to the standard error and standard output files gen-erated by each task.(标准输出和错误)
    • The user can use this data to pre-dict how long the computation will take, and whether or not more resources should be added to the computation.These pages can also be used to figure out when the com-putation is much slower than expected. (用户可以分析出计算大概需要多长时间完成,是否需要添加新的资源,以及找出计算慢的原因)
    • In addition, the top-level status page shows
      • which workers have failed, and (哪些worker失败)
      • which map and reduce tasks they were processing when they failed. (为什么失败)
      • This informa-tion is useful when attempting to diagnose bugs in the user code.

5.9 Counters

  • The counter values from individual worker machines are periodically propagated to the master (piggybacked on the ping response).(关于这些counter信息都是在ping response时候捎带回去给master的)
  • When aggre-gating counter values, the master eliminates the effects of duplicate executions of the same map or reduce task to avoid double counting. (Duplicate executions can arise from our use of backup tasks and from re-execution of tasks due to failures.)(在进行这些counter聚合的时候需要考虑重复计数,比如backup execution就可能造成重复计数)
  • Some counter values are automatically maintained by the MapReduce library, such as the number of in-put key/value pairs processed and the number of output key/value pairs produced.(有一些计数是mr自带的,比如输入和输出多少个kv pairs)

6 Performance

7 Experience

  • We wrote the first version of the MapReduce library in February of 2003, and made significant enhancements to it in August of 2003, including the locality optimization, dynamic load balancing of task execution across worker machines, etc. (03年二月份完成,03年8月份将本地优化,动态负载均衡加入)
  • It has been used across a wide range of domains within Google, including:
    • large-scale machine learning problems,(机器学习)
    • clustering problems for the Google News and Froogle products,
    • extraction of data used to produce reports of popular queries (e.g. Google Zeitgeist),
    • extraction of properties of web pages for new exper-iments and products (e.g. extraction of geographi-cal locations from a large corpus of web pages for localized search), and
    • large-scale graph computations.(大规模图计算)

mapreduce-instances-overtime.png

在2004.9月份达到了900个mapreduce程序。下面是2004.8里面部分mapreduce程序运行数据。

mapreduce-jobs-run.png

改写了线上索引程序,处理文档大小在20TB(也不是很多啊?),大约使用了5~10个mapreduce程序。带来效果是这样的:

  • 代码少了很多,比如很多错误恢复代码。(通信代码应该没有太大节省,因为google库本身就应该比较完善)。其中一个计算过程的代码从3800掉至700行。
  • 可以更加注重逻辑,因为计算过程被分离出来了。原来改造index系统需要几个月的时间,现在几天就可以完成。
  • 整个过程控制更加简单,因为错误恢复,slow machine带来的影响,以及网络抖动都被framework处理了而不需要人工操作。

8 Related Work

9 Conclusions

We have learned several things from this work.(从中得到的经验):

  • First, restricting the programming model makes it easy to par-allelize and distribute computations and to make such computations fault-tolerant. (限制编程模型能够使得并行化以及错误容忍处理更加简单)
  • Second, network bandwidth is a scarce resource. A number of optimizations in our system are therefore targeted at reducing the amount of data sent across the network: the locality optimization al-lows us to read data from local disks, and writing a single copy of the intermediate data to local disk saves network bandwidth. (带宽是比较稀缺的资源,比如mapper考虑从local读取,并且写入本地磁盘,同时reduce也是拉到本地磁盘做排序)
  • Third, redundant execution can be used to reduce the impact of slow machines, and to handle ma-chine failures and data loss.(冗余执行可以减少慢机器带来的影响,且能够用来处理机器故障和数据丢失)
comments powered by Disqus