-
MapReduce
超大集群的简单数据处理
收件人:
发件人:
崮山路上走
9
遍
抄送
:
日期:
2005-08-05
关于:
MapReduce:
Simplified Data Processing on Large Clusters
Jeffrey Dean Sanjay Ghemawat
jeff@
,
sanjay
@
Google , Inc.
摘要
MapReduce
是一个编程模式,它是与处理
/
产生海量数据集的
实现相关。用户指定一个
map
函数,
通过这个
map
函数处理
key/va
lue
(键
/
值)
对,
并且产生一系列的中间
key/value
对,
并且使用
reduce
函数来合并所有的具有相同
key
值的中间键值对中的值部分
。
现实生活中的很多任务的实现都是基于
这个模式的,正如本文
稍后会讲述的那样。
使用这样的函数形式实现的程序可以自动
分布到一个由普通机器组成的超大几群上并发执行。
run-time
< br>系统会解决输入数据的分布细节,
跨越机器集群的程序执行调度,处理机器的失效
,并且管
理机器之间的通讯请求。这样的模式允许程序员可以不需要有什么并发处理或者
分布式系统的经验,
就可以处理超大的分布式系统得资源。
<
/p>
我们的
MapReduce
系统的实现运
行在一个由普通机器组成的大型集群上,并且有着很高的扩展性:
一个典型的
MapReduce
计算处理通常分布到上千台机器上来处理上
TB
的数据。程序员会发现这样
的系统很容易使用
:已经开发出来了上百个
MapReduce
程序,并且每天在
Google
的集群上有上千
个
MapReduce job
正在执行。
1
介绍
在过去的
5
年内,
的创造者和其他人实现了上百个用于特别计算目的的程序来出来海量的
原
始数据,比如蠕虫文档,
web
请求
log
,等等,用于计算出不同的数据,比如降序索引,不同的
图示
展示的
web
文档,蠕虫采集的每
个
host
的
page
数量摘要,给定日期内最常用的查询等等。绝大部
分计算都是概念上很简洁的。
不过,输入的数据通常是非常巨大的,
并且为了能在合理时间内
执行完
毕,其上的计算必须分布到上百个或者上千个计算机上去执行。如何并发计算,如
何分布数据,
如何
处理失败等等相关问题合并在一起就会导致原
本简单的计算掩埋在为了解决这些问题而引入的很复
杂的代码中。
因为这种复杂度,
我们设计了一种新的东西来让我们能够方
便处理这样的简单计算。
这些简单计算原
本很简单,但是由于考
虑到并发处理细节,容错细节,以及数据分布细节,负载均衡等等细节问题,
而导致代码
非常复杂。所以我们抽象这些公共的细节到一个
lib
中。这种
抽象是源自
Lisp
以及其他很
多面向
功能的语言的
map
和
reduce<
/p>
概念。
我们认识到大部分操作都和
map
操作相关,
这些
map
操
作都是运算在输入记录的每个逻辑
”
record
”
上,
并且
p>
map
操作为了产生一组中间的
key/v
alue
键值对
,
?
< br>
第
1
页
并且接着在所有相同
key
的中间结果上执行
reduce
操作,这样就可以合并适当的数据。我们得函数
模式是使用用户定义的
map
和
reduce
操作,
这样可以让我们并发执行大规模的运算,
并且使用重新
执行的方式作为容错的优先机制。
M
apReduce
的主要贡献在于提供了一个简单强大的接口,通过这个接口,可以把大
尺度的计算自动
的并发和分布执行。使用这个接口,可以通过普通
PC
的巨大集群,来达到极高的性能。
第二节讲述了基本的编程模式,
并且给出了一些例子。
第三
节讲述了一个面向我们基于集群的计算环
境的
MapReduc
e
的实现。第四节讲述了一些我们建议的精巧编程模式。第五节讲述了在不同任务下
p>
我们的
MapReduce
实现的性能比较
。第六节讲述了在
Google
中的
M
apReduce
应用以及尝试重写
了我们产品的索引系统。第
七节讲述了相关工作和未来的工作。
2
编程模式
我们的运算处理一组输入的
(
input
)键值对(
key/va
luepairs
)
,
并且产生一组输
出的(
output
)键值
对。
MapReduce
函数库德用户用两个函数来表达这样的计算:
p>
Map
和
Reduce
。
Map
函数,是用户自定义的
的函数,处理输入的键值对,并且产生一组中间的(
intermediate
)键值
对。
MapReduce
函数库稽核所有相同的中间键值键
I
的值,并且发送给
Reduce
函数进行处理。
Reduce
函数同样也是用户提供的,它处理中间键值
I
,以及这个中间键值相关的值集合。这个函数
合并
这些值,
最后形成一个相对较小的值集合。
通常一个单次
Reduce
执行会产生
0
< br>个或者
1
个输出
值。提供给
p>
Reduce
函数的中间值是通过一个
it
erator
来提供的。这就让我们可以处理超过内存容量
的值
列表。
2.1
例子
我们
考虑这样一个例子,
在很大的文档集合中通机每一个单词出现的次数。
< br>我们写出类似如下的伪代
码:
map(String key, String value):
reduce(String key,
I
terator values):
map
函
数检查每一个单词,
并且对每一个单词增加
1
< br>到其对应的计数器
(在这个例子里就是
’
1
’
)
.reduce
函数把特定单词的所有出现的次数进行合并。
此外,我们还要写代码来对
mapreduce specif
ication
对象进行赋值,设定输入和输出的文件名,以
及
设定一些参数。接着我们调用
MapReduce
函数,把这个
对象作为参数调用过去。我们把
MapReduce
函数库(<
/p>
C++
函数库)和我们的程序链接在一起。附件
< br>1
有完整的这个例子的代码。
// key: a word
// values: a
list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
//
key: document name
// value: document
contents
for each word w in value:
EmitIntermediate(w,
Emit(AsString(result));
?
第
2
页
2.2
类型
即使上边的例子是用字符串作为输入和输入出的,
从概念上讲,使用者提供的
map
和
reduce
函数有
着如下相关类型:
map (k1,v1)
?
list(k2,v2)
?
list(v2)
reduce
(k2,list(v2))
也
就是,
输入的键和值和输出的键值是属于不同的域的。进一步说,
中间的键值是和输出的键值属于
相同的域的。(比如
map<
/p>
的输出,就是作为
reduce
的输入)
。
我们的
C++
实现上,
把字符串作为用户定义函数的输入和输出,
由用户
代码来自己识别字符串到合适
的类型。
2.3
其他例子
这里有一些简单有趣的例子,都可以简单的通过
MapReduce
计算模型来展示:
分布式
Gre
p
:
如果
m
ap
函数检查输入行,满足条件的时候,
map
函数就把本行输出。
reduce
函数就是一个直通函
数,简单的把中间数据输出就可以了。
URL
访问频率统计:
map
函数处理
webpag
请求和应答
(
URL
,
1
)
的
log
。
Reduce
函数把所有
相同的
URL
的值合并,并且输出一个成对的(
URL
,总个数)。
逆向
Web-Link
图:
map
函数输出所有包含指向
target URL
的
source
网页,
用
(
p>
target,source
)
这样的结构
对输出。
Reduce
函数局和所有关联相同
< br>target URL
的
source
< br>列表,并且输出一个
(target,list(source))
这样的结构。
主机关键向量指标(
Term-Vector per
Hosts
)
:
关键词向量指标简而
言之就是在一个文档或者一
组文档中的重点次出现的频率,用
(
word,frequency)
表达。
map
函数计算每一个输入文档(主机名字
是从文档的
URL
取出的)
的关键词向量,
然后输出
p>
(
hostname,
关键词向量
(Term-Vector)
)
。
reduce
函数处理所有相同
host
的所有文档关键词向量。去掉不常用的关键词,并且输出最终的
(host
name
,
关键词向量
)
对。
逆序索引
:
map
函数分析每一个文档,并且产
生一个序列(
word
,
docume
ntID
)组。
reduce
函数处理
指定
word
的所有的序列组,并且对相关的
< br>document ID
进行排序,输出一个
(word
,list(document ID))
组。所有的输出组,组成一个简单的逆序索引
。通过这种方法可以很容易保
持关键词在文档库中的位置。
分布式排序:
map
函数从每条记录中抽取关键字
,
并且产生
(key,record)
对。
reduce
函数
原样输出所有的
关键字对。这个算法是与
4.1
节描述的分布式处理相关的,并
且排序是在
4.2
节描述
的。
3
实现
MapReduce
接口可以有很多种不同的实现。应当根据不同的环境
选择不同的实现。比如,一个实现
可以适用于小型的共享内存的机器,另一个实现可能是
基于大型
NUMA
多处理器系统,还可能有为
< br>大规模计算机集群的实现。
本届描述了
Google
广泛使用的计算环境:用交换机网络
[4
]
连接的,由普通
PC
构成的超大集群
。在
我们的环境里:
(
1
)
p>
每个节点通常是双
x86
处理器,运行
p>
Linux,
每台机器
2-4GB
内存。
?
第
3
页
(
2
)
p>
使用的网络设备都是常用的。
一般在节点上使用的是
100M/
或者千
M
网络,<
/p>
一般情况下都用
不到一半的网络带宽。
(
3
)
p>
一个
cluster
中常常有成百上千台机
器,所以,机器故障是家常便饭。
(
4
)
p>
存储时使用的便宜的
IDE
硬盘,
直接放在每一个机器上。
并且有一个分布式的文件系统来管
理这些分布在各个机器上的硬盘。文件系统通过复制的方法来在不可靠的硬件上保证可用性
和可靠性。
(
5
)
p>
用户向调度系统提交请求。
每一个请求都包含一组任务,
映射到这个计算机
cluster
里的一组
机器上执行。
3.1
执行概览
Map
操作通过把输入数据进行分区
(
partition
)
(比如分为<
/p>
M
块),
就可以分布到不同的机器上执行
了。输入块的拆成多块,可以并行在不同机器上执行。
Redu
ce
操作是通过对中间产生的
key
的
分布
来进行分布的,
中间产生的
key
可以根据某种分区函数进行分布
(比如
hash(key) mod R
)
,
分布成为
R
块。分区(
R
)的数量和分区函数都是由用户指定的。
图
1
p>
是我们实现的
MapReduce
操作的整
体数据流。当用户程序调用
MapReduce
函数,
就会引起如
下的操作(图一中的数字标示和下表的数字标示相同)。
p>
1
.
用户程
序中的
MapReduce
函数库首先把输入文件分成
M
块,每块大概
16M
到
64M
(可以通过
参数决定)。接着
在
cluster
的机器上执行处理程序。
?
第
4
页
2
.
这些分
排的执行程序中有一个程序比较特别,它是主控程序
master
。剩下的执行程序都是作为
master
分排工作的
worker
。总共有
M
个
map
任务和
R
< br>个
reduce
任务需要分排。
master
选择
空闲的
worker
并且分配这些
map
任务或者
reduce
任务
3
.
一个分
配了
map
任务的
worker
读取并处理相关的输入小块。他处理输入的数据,并且将分析出
的
p>
key/value
对传递给用户定义的
m
ap
函数。
map
函数产生的中间结果
key/value
对暂时缓冲到
内存
。
4
.
<
/p>
这些缓冲到内存的中间结果将被定时刷写到本地硬盘,
这些数据通
过分区函数分成
R
个区。
这些
中间结果在本地硬盘的位置信息将被发送回
master
,然后这个
master
负责把这些位置信息传送
p>
给
reduce
的
worker
。
5
.
当
p>
master
通知
reduce
的
worker
关于中间
key/value
对的位置时,他调用
remote
procedure
来
从
map wo
rker
的本地硬盘上读取缓冲的中间数据。
当
reduce
的
worker
读到了所有的中间数据,
他就使用中间
key
< br>进行排序,
这样可以使得相同
key
的值都在一起。
因为有许多不同
key
的
map
都对应相同的
reduc
e
任务,所以,排序是必须的。如果中间结果集太大了,那么就需要使用外
排序。
6
.
reduce worker<
/p>
根据每一个唯一中间
key
来遍历所有的
排序后的中间数据,并且把
key
和相关的
中间结果值集合传递给用户定义的
reduce
函数。
p>
reduce
函数的对于本
reduce<
/p>
区块的输出到一
个最终的输出文件。
7
.
当所有
的
map
任务和
reduce
任务都已经完成了的时候,
master
激活用
户程序。在这时候
MapReduce
返回用户程序的调用点。
当这些成功结束以后,
mapreduce
的执行数据存放在总计
R
p>
个输出文件中
(每个都是由
reduce<
/p>
任务
产生的,这些文件名是用户指定的)。通常,用户不需要合并
这
R
个输出文件到一个文件,他们通
常
把这些文件作为输入传递到另一个
MapReduce
调用,或
者用另一个分布式应用来处理这些文件,
并且这些分布式应用把这些文件看成为输入文件
由于分区(
partition
)成为的多个块文件。
3.2
Master
的数据结构
master
需要保存一定的数据结构。
对于每
一个
map
和
reduce
任务来说,
都需要保存它的状态
(
idle
,
in-progress
< br>或者
completed
),并且识别不同的
worker
机器(对于非
idel
的任务状态)。
master
是一个由
map
任务产生的中间区域文件位置信息到
reduce
任务的一个管道。因此,对于每
一个完成得
map
任务,
master
保存下来这个
map
任务产生的
R
中间区域文件信息的位置和大小。
对于这个
位置和大小信息是当接收到
map
任务完成得时候做的。这些信
息是增量推送到处于
in-progress
状态的
reduce
任务的
worker
上的。
3.3
容错考虑
由于
MapReduce
函数库是设计用于在成百上千台机器上
处理海量数据的,
所以这个函数库必须考虑
到机器故障的容错处
理。
Worker
失效的考虑
master
会定期
ping
每一个
worker
机器。
如果在
一定时间内没有
worker
机器的返回,
master
就认为
这个
work
er
失效了。所有这台
worker
完
成的
map
任务都被设置成为他们的初始
idel
状态,并且因
此可以被其他
worker
所调度执行。类似的,所有这个机器上正在处理的
map
任务或者
reduce
任务<
/p>
都被设置成为
idle
状态,可以被其他
worker
所重新执行。
?
第
5
页
在失
效机器上的已经完成的
map
任务还需要再次重新执行,这是因
为中间结果存放在这个失效的机
器上,所以导致中间结果无法访问。已经完成的
recude
任务无需再次执行,因为他们的结果已经保
存在全局的文件系统中了。
当
map
任务首先由
Aworker
执
行,
随后被
Bworker
执行的时候
(因为
A
失效了)
,
所有执行
reduce
任务的<
/p>
worker
都会被通知。所有还没有来得及从
< br>A
上读取数据的
worker
都
会从
B
上读取数据。
MapReduce
可以有效地支持到很大尺度的
wo
rker
失效的情况。比如,在一个
MapReduce
操作中,
在一个网络例行维护中,可能会导致每次大约有
80
台机器在几分钟之内不能访问。
MapRedu
ce
的
master
制式简单的把这些
不能访问的
worker
上的工作再执行一次,并且继续调度进
程,最后完成
MapReduce
的操作。
Master
失效
在
master
中,定期会设定
checkpoint
,写出
master
的数据结构。如果
master
任务失效了,可以从
上次最后一个
check
point
开始启动另一个
master
进程。不过,由于只有一个
master
在运行,所以
他如果失效就比较麻烦,因此我们当前的实现上,是如果
master
失效了,就终止
MapReduce
执
行。
客户端可以检测这种失效并且如果需要就重新尝试
MapR
educe
操作。
失效的处理设计
当用户提供的
map
和
reduce
函数对于他们的输入来说是确定性的函数,
我们的分布式的输出就应当
和在一个整个程序没有失败的连续执行相同。
我们依
靠对
map
和
reduce
任务的输出进行原子提交来完成这样的可靠性。每一个
in-progre
ss
任务
把输出写道一个私有的临时文件中。
< br>reduce
任务产生一个这样的文件,
map
任务产生
R
个这样的任
务(每一个对应一个
reduce
任务)
。当一个
map
任务完成的时候,
w
orker
发送一个消息给
master
,
并且这个消息中包含了这个
R
临时
文件的名字。如果
master
又收到一个已经完成的
map
任务的完
成消息,他就忽略这个消息。否
则,他就在
master
数据结构中记录这个
< br>R
文件。
当一个
reduce
任务完成的时候,
reduce wo
rker
自动把临时输出的文件名改为正式的输出文件。
如果<
/p>
再多台机器上有相同的
reduce
任务
执行,那么就会有多个针对最终输出文件的更名动作。我们依靠
文件系统提供的原子操作
’
改名字
’
,
来保证最终的文件系统状态中记录的是其中一个
reduce<
/p>
任务的输
出。
我们的绝大部分
map
和
reduce
操作都是确定性的,
实际上在语义角度,
这个
map
和
reduce
并发执
行和顺序执行市一样的,
这就使得程序员
很容易推测程序行为。当
map
和
re
duce
操作是非确定性的
时候,我们有稍弱的但是依旧是有道
理的错误处理机制。对于非确定性操作来说,特定
reduce
任务
R1
的输出,与,非确定性的顺序执行的程序对
R1
的输出是等价的。另外,另一个
reduce
任务
R2
的输出,是和另一个顺序执行
的非确定性程序对应的
R2
输出相关的。
考虑
map
任务
< br>M
和
reduce
任务
R1
,
R2
。
我们设定
e(Ri)
为已经提交的
Ri
执行(有且仅有一个这样
的执行)
。
当
e(R1)
处理得是<
/p>
M
的一次执行,
而
e(R2)
是处理
M
的另一次执行的
时候,
那么就会导致
稍弱的失效处理了。
3.4
存储位置
在我们的环境下,
网络带宽资源是相对缺乏的。
我们用尽量让输
入数据保存在构成集群机器的本地硬
盘上(通过
GFS
管理
[8]
)的方式来减少网络带宽的开销。<
/p>
GFS
把文件分成
64M
一块,并且每一块
都有几个拷贝(通常是
3
个拷贝),分布到不同的机器上。
MapReduce
的
master
有输入文件组的
位
置信息,并且尝试分派
map
任务在对应包含了相关输入数据块
的设备上执行。如果不能分配
map
?
第
6
页
任务到对应其输入数据的机器上执
行,他就尝试分配
map
任务到尽量靠近这个任务的输入数据库
的
机器上执行(比如,分配到一个和包含输入数据块在一个
sw
itch
网段的
worker
机器上执
行)。当在
一个足够大的
cluster
集群上运行大型
MapReduce
操作的时候,大部分输入
数据都是在本地机器读
取的,他们消耗比较少的网络带宽。
3.5
任务颗粒度
如果上边我们讲的,我们把
map<
/p>
阶段拆分到
M
小块,并且
reduce
阶段拆分到
R
小
块执行。在理想
状态下,
M
和
R
应当比
worker
机器数量要多得多。每一个
worker
机器都通过执行大量的
任务来提
高动态的负载均衡能力,并且能够加快故障恢复的速度:这个失效机器上执行的
大量
map
任务都可
以分布到所有其他
worker
机器上执行。
但是我们的实现中,
实际上对于
M
和
R
的取值有一定的限制,
因为
master
必须执行
O(M+R
)
次调度,
并且在内存中保存
O(M*
R)
个状态。
(对影响内存使用的因素还是比较小的:
O(M*R)
块状态,大概每
对
map
任务
/reduce
任
务
1
个字节就可以了)
进一步来说,用户通常会指定
R
的值,因为每一个<
/p>
reduce
任务最终都是一个独立的输出文件。在实
际中,我们倾向于调整
M
的值,使得每一个独立任
务都是处理大约
16M
到
64M
的输入数据(这样,
上面描写的本地优化策略会最有效)
,
另外,
我们使
R
比较小,
这样使得
R
占
用不多的
worker
机器。
我们通常
会用这样的比例来执行
MapReduce: M=200
,<
/p>
000
,
R=5
,
000
,使用
2
,
000
台
worker
机
器。
3.6
备用任务
通常情况下,
一个
MapReduce
的总执行时间会受到最后的几个
”
拖后腿
”
的任务影响:
在计算过程中,
会有
一个机器过了比正常执行时间长得多的时间还没有执行完
map
或者
reduce
任务,导致
MapR
educe
总任务不能按时完成。
出现拖后腿的情况有很多原因
。
比如:
一个机器的硬盘有点问题,
经
常需要反复读取纠错,
然后把读取输入数据的性能从
30M/s
降低到
1M/s
。
cluster
调度系统已经在
某台机器上调度了其他的任
务,所以因为
CPU/
内存
/
本地硬盘
/
网络带宽等竞争的关系,导致执行<
/p>
MapReduce
的代码性能比较慢。
我们最近出现的一个问题是机器的启动代码有问题,
导致关闭了
cpu
的
cache
:在这些机器上的
任务性能有上百倍的影响。
我们有一个通用的机制来减少拖后
腿的情况。当
MapReduce
操作接近完成的时候,
master
调度备
用进程来执行那些剩下的
in-progress
状态的任务。
无论当最初的任务还是
backup
任务执行完成的时
候,
都把这个任务标记成为已经完成。
我们调优
了这个机制,
通常只会占用多几个百分点的机器资源。
但是我们
发现这样做以后对于减少超大
MapReduce
操作的总处理
时间来说非常有效。例如,在
5.3
节描述的排序任务,在关闭
掉备用任务的情况下,要比有备用任务的情况下多花
44%
的时
间。
4
技巧
虽然简单写
map
和
reduce
函数实现基
本功能就已经对大部分需要都足够了,
我们还是开发了一些有
用
的扩展,这些在本节详细描述。
4.1
分区函数
MapReduce
的使用者通过指
定(
R
)来给出
reduce
任务
/
输出文件的数量。他们处理的数据在这
些
任务上通过对中间结果
key
得分区
函数来进行分区。缺省的分区函数时使用
hash
函数(例如<
/p>
hash(key)mod R
)。这一般就可以得到分散均匀的
分区。不过,在某些情况下,对
key
用其他的函
数进行分区可能更有用。比如,某些情况下
key
是
URL
,那么我们希望所有对单个
ho
st
的入口
URL
都保存在相同的输出
文件。为了支持类似的情况,
MapReduce
函数库可以让
用户提供一个特定的分
?
第
7
页
区函数。比如使用
hash(hostname(urlkey))mod R
作为分区函数,这
样可以让指向同一个
hostname
的
URL
分配到相同的输出文件中。
4.2
顺序保证
我们确保在给定的分区中,中间键值对
key/value
的处理顺序是根据
key
增量处理的。这样的顺序保
证可以很容易生成每一个分区有序的输出文件,
p>
这对于输出文件格式需要支持客户端的对
key
的随机
存取的时候就很有用,或者对输出数据集再作排序就很容易。
4.3
combiner
函数
在某些情况下,允许中间结果
key
重复会占据相
当的比重,并且用户定义的
reduce
函数满足结合律
和交换律。比如
2.1
节的一个统计单词出现
次数的例子。
由于
word
的频率趋势
符合
Zipf
分布(齐夫
分布),每
一个
map
任务都回产生成百上千的
<
the,1>
这样格式的记录。所有这些记录都通过网络发
送给
一个单个的
reduce
任务,
通过<
/p>
reduce
函数进行相加,
最后产生单
个数字。
我们允许用户指定一
个可选的组合函数
Combiner
函数,先在本地进行合并以下,然后再通过网络发送。
Combiner
函数在每一个
map
任务的机器上执行。通常这个
combin
er
函数的代码和
reduce
的代码
实现上都是一样的。
reduce
函数
和
combiner
函数唯一的不同就是
MapReduce
对于这两个函数的输
出处理上不同。
p>
对于
reduce
函数的输出是直接写到最
终的输出文件。
对于
combiner
函数来说,
输出
是写到中间文件,并且会被发送到
reduce
任务中去。
部分使用
combiner
函数可以显著提高某些类型的
MapReduce
操作。
附录
A
有这样的使用
combiner
< br>的例子。
4.4
输入和输出类型
< br>MapReduce
函数库提供了读取几种不同格式的输入的支持。例如,
”
text
”
模式下
,
每行输入都被看成
一个
key/va
lue
对:
key
是在文件的偏移量,
value
是行的内容。
另一个宠用格
式保存了根据
key
进行
排序
key/value
对的顺序。
每一个输入类型
的实现都知道如何把输入为了分别得
map
任务而进行有效
p>
分隔(比如,
text
模式下的分隔就是要
确保分隔的边界只能按照行来进行分隔)。用户可以通过简单
的提供
reader
接口来进行新的输入类型的支持。不过大部分用户都只用一小部分预先
定义的输入类
型。
reader
p>
函数不需要提供从文件读取数据。
例如,
我
们很容易定义一个
reader
函数从数据库读取数据,
或者从保存在内存中的数据结构中读取数据。
类似的,
我们提供了一组用于输出的类型,可以产生不同格式的数据,并且用户也可以
很简单的增加
新的输出类型。
4.5
边界效应
在某些情况下,
MapReduce
的使用上,
如果再
map
操作或者
red
uce
操作时,
增加辅助的输出文件,
会比较有用。
我们依靠程序来提供这样的边界原子操作。
通常应
用程序写一个临时文件并且用系统的
原子操作:改名字操作,来再这个文件写完的时候,
一次把这个文件改名改掉。
对于单个任务产生的多个输出文件
来说,我们没有提供其上的两阶段提交的原子操作支持。因此,对
于产生多个输出文件的
,对于跨文件有一致性要求的任务,都必须是确定性的任务。这个限制到现在
为止还没有
真正在实际中遇到过。
4.6
跳过损坏的记录
< br>某些情况下,
用户程序的代码会让
map
或者
reduce
函数在处理某些记录的时候
crash
掉。
这种情况
下
MapReduce
操作就不能完成。一般的做法是改掉<
/p>
bug
然后再执行,但是有时候这种先改掉
bug
?
第
8
页
的方式不太可行;也许是因为
p>
bug
是在第三方的
lib
里边,它的原代码不存在等等。并且,很多时候,
忽略一些记录不处理也是可以
接受的,比如,在一个大数据集上进行统计分析的时候,就可以忽略有
问题的少量记录。
我们提供了一种执行模式,在这种执行模式下,
MapReduce
会检测到哪些记录会
导致确定的
crash
,并且跳过这些记录不处理,使得整个处理能继续进行。
每一个
worker
处理进程都有一个
signal handler
,
可以捕获内存段异常和总
线错误。
在执行用户
map
或者
reduce
操作之前,
MapReduce
函数库通过全局变量保存记录序号。如果用户代码产生了这个
信
号,
signal handler
于是用
”
最后一口气
”
通过
UDP
包向
master
发
送上次处理的最后一条记录的序
号。当
master
看到在这个特定记录上,有不止一个失效的时候,他就标志着条记录需要被跳过,,
并且在下次重新执行相关的
Map
或者
Reduce
任务的时候跳过这条记录。
4.7
本地执行
因为实际执行操作时分布在系统中执行的,通常是在好几千台
计算机上执行得,并且是由
master
机
器进行动态调度的任务,
所以对
map
和
reduce
函数的调试就比较麻烦。
< br>为了能够让调试方便,
profiling
和小规模测试
,我们开发了一套
MapReduce
的本地实现,也就是说,
MapReduce
函数库在本地机
器
上顺序执行所有的
MapReduce
操作。用户可以控制执行
,这样计算可以限制到特定的
map
任务
上。用户可以通过设定特别的标志来执行他们的程序,
同时也可以很容易的使用调试和
测试工具
(比
如
gdb
)等等。
4.8
状态信息
master
内部有一个
HTTP
服务
器,并且可以输出状态报告。状态页提供了计算的进度报告,比如有
多少任务已经完成,
有多少任务正在处理,输入的字节数,中间数据的字节数,
输出的字节数,
处理
百分比,
等等。这些页面也包括了指向每个任务
输出的标准错误和输出的标准文件的连接。用户可以
根据这些数据来预测计算需要大约执
行多长时间,
是否需要为这个计算增加额外的计算资源。这些页
面也可以用来分析为何计算执行的会比预期的慢。
此外,
p>
最上层的状态页面也显示了哪些
worker
失效了,
以及他们失效的时候上面运行的
map
和
reduce
任务。这些信息对于调试用户代码中
的
bug
很有帮助。
4.9
计数器
MapReduce
函数库提供了用
于统计不同事件发生次数的计数器。比如,用户可能想统计所有已经索
引的
German
文档数量或者已经处理了多少单词的数量,等等。
为了使用这样的特性,用户代码创建一个叫做
co
unter
的对象,并且在
map
和<
/p>
reduce
函数中在适当
的时候增加<
/p>
counter
的值。例如:
Counter* uppercase;
uppercase = GetCounter(
map(String name, String contents):
for each word w
in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w,
?
第
9
页