[译] MapReduce: 大型集群中的简化数据处理

论文地址: https://pdos.csail.mit.edu/6.824/papers/mapreduce.pdf

摘要

    MapReduce 是一种处理模型结合实现了大型数据集的处理和生成. 用户指明处理键值对且产生中间值键值对的映射函数(map). 之后聚合函数(reduce function)将所有拥有相同key的中间产物的值合并, 许多现实生活中的任务都可以用这种模型进行表达, 就像论文中展示的那样.

    程序员使用这种方式进行编程, 将会自动的获得并发和在大型商业集群上运行的能力. 运行中的系统关注运行时输入数据的分区, 计划程序在不同机器上执行, 处理机器故障和处理机器之间的通信. 这允许程序员无需任何有关于并行执行和分布式系统的经验, 更加简单的利用大型分布式系统的资源.

    我们实现的MapReduce 运行在大型的商业机器上, 并具有高度的可扩展性: 一种典型的 MapReduce 计算处理TB级(terabytes)数据在数千台机器上. 程序员发现系统容易使用: 数百个 MapReduce 程序被实现, 每天在 Google 集群上执行超过一千个 MapReduce.

1. 介绍

    在过去的五年中, 作者与在谷歌的许多其他人已经实现了数百个处理大量原始数据的特殊运算. 例如: 爬取的文件, web请求日志, 等等. 计算各种衍生数据, 例如: 反向索引, Web文档图形的各种表示, 每个主机爬取的页面数量的总结, 给定一天内最频繁被查询的集合, 等等. 大多是这样的计算在概念上都很简单. 然而, 输入的数据常常很大, 并且为了能在合理的时间内完成, 计算常常被分发到成百上千的机器中执行. 问题的关键在于如何并行计算, 分发数据, 和处理错误, 并用大量的复杂代码掩盖这个问题.

    为了应对复杂性, 我们设计了一个新的抽象, 允许表达试图执行的简单计算, 但在库中隐藏并发执行, 错误容忍, 数据分发还有负债均衡的混乱细节. 我们的抽象受到 Lisp 中已存在的原语 mapreduce 和其他函数式语言的启发. 我们意识到大多数的计算涉及使用map操作输入的逻辑上的record, 为了计算得出中间键值对的产物. 然后对拥有相同key值得中间产物应用reduce操作, 为了更适当得组合派生数据. 我们使用用户自定义的特定 mapreduce 操作功能模型, 使的我们能够轻松并行化大型计算, 并且使用重新执行作为容错的主要机制.

    对于工作的最大贡献得益于简单有效的接口设计, 使自动化并行和分发大规模的计算成为可能. 并结合该接口的实现, 在大型商用 PC 集群上实现高性能.

    第二部分描述了程序的基本模型, 并给出了许多样例. 第三部分描述了对于集群的计算环境量身定制的 MapReduce 接口实现. 第四部分描述了我们发现有用的程序模型的几个改进, 第五部分有对各种任务性能的分析测量. 第六部分探讨了 Google 对于 MapReduce 的使用, 包含了我们使用它作为生产索引系统的重写基础的经验. 第七部分讨论了相关未来的工作.

2. 程序模型

    计算任务的输入是一个键值对的集合, 聚合的输出是一个键值对的集合. MapReduce 库的用户使用两个函数进行表达计算: Map 和 Reduce.

    用户编写的 Map 函数将接收键值对输入, 将产生中间值的键值对. MapReduce 库将相同 key 的 value 进行分组, 然后该组传输给 Reduce 函数.

    用户编写的 Reduce 函数,接收中间值拥有相同 key 的 value 组, 将这些值合并起来形成更小的组. 通常 Reduce 的调用只产生零个或者一个输出值. 中间值通过迭代器向用户的 Reduce 函数传递数据. 这使我们能够处理因为过大而不能放入内存的列表.

2.1 例子

    考虑一个统计大型文档集合中每个单词出现的次数问题, 用户可能写下如下伪码:

1
2
3
4
5
6
7
8
9
10
11
12
13
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");

reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));

    map 函数发出每个 word 和词出现的次数(在这个简单的样例中仅仅是 ‘1’), Reduce 函数将指定词的所有统计相加.

    此外, 用户还需要使用 输入输出的文件名可选的参数MapReduce Specification 对象进行配置. 之后用户可以调用 MapReduce 函数, 将配置对象一并传递. 用户的代码将会与 MapReduce 的库(使用C++实现)结合在一起. 附录A包含这个例子的完整程序文本.

2.2 类型

    尽管前面的伪码使用字符串作为输入输出, 从概念上讲, 用户提供的 map 和 reduce 函数需要有关联性:

1
2
map (k1,v1) → list(k2,v2)
reduce (k2,list(v2)) → list(v2)

即: 输入的类型和输出的类型是不同的模型, 中间值与输出是属于相同模型

我们C++的实现使用字符串作为输入/输出的类型, 用户将根据需要进行转换.

2.3 更多例子

这有些有意思的简单程序, 能够容易的使用 MapReduce 计算实现.

  • 分布式 Grep: 当一行符合匹配的规则时, 使用 map 函数进行发送, reduce 只要简单的将中间值复制到输出.
  • 统计 URL 的访问频率: map 函数处理 web 请求的日志并且输出 (URL, 1) . reduce 函数将相同 URL 的值进行相加, 并且发出 (URL, 合计的次数)
  • 网络连接溯源图: map 函数从为source的页面发现的每个连接(target)输出(target, source), reduce 关联所有相同的 target 并输出(target, list(source))
  • 每台主机的词向量: 一个词向量总结了一个列表(word, frequency)表示一个文档中最常出现的词, map 函数分析每个文档并输出了(hostname, term vector), ( hostname的地址由输入的文档 URL 解析而来) reduce 函数获取给定 host 的所有词向量, 丢弃不常使用的词组, 并且输出最后的(hostname, term vector).
  • 反向索引: map 函数对每个文档进行转换并输出(word, document ID), reduce 函数接受给定的 word 的所有 document ID 并输出 (word, list(document ID)). 这样就容易追踪到词的来源
  • 分布式排序: map 函数读取出 (key, record) 键值对, reduce 函数保持键值对. 这个计算取决与4.1的分区设施和4.2的属性排序.

3 实现

    MapReduce 接口使得根据不同环境有着不同实现成为可能. 例如, 一种实现可能适用于小型共享内存机器, 另一种适用于大型 NUMA 多处理器, 还有另一种适用于更大的联网机器集合.
    这个章节描述了在 Google 广泛使用的计算环境实现: 使用以太网交换机4连接的大集群的商业机器, 环境情况如下:

  1. 机器通常是运行 Linux 的双核心 x86 处理器, 每台机器有着 2-4G 的内存
  2. 使用商业网络硬件 — 通常在机器级别为 100 megabits/s 或 1 G/s, 但总体平分带宽的平均值要少得多
  3. 集群由数百或者数千机器组成, 因此错误成为家常便饭
  4. 存储由直接连接在单台机器上便宜的 IDE 硬盘提供. 内部开发的文件系统8用于管理存储这些磁盘上的数据. 文件系统使用副本的形式在不可靠的硬件上提供可靠性和可用性.
  5. 用户提交作业到调度系统. 每个作业由多个任务组成, 并由调度机器映射到一群可用的机器上执行.

3.1 执行概览

    通过自动将输入的数据分割成 M 个分片, 并分发到多台机器上进行 Map 调用. 输入的分片能够在不同的机器上并发的执行. 使用分片函数(例如: hash(key) mod R) 将中间值的键值对分割成 R 个部分, 并使用 Reduce 分发执行. R 的数量和分片函数将由用户进行指定.

例图1

    例图1 展示了我们实现的 MapReduce 的所有的流程. 当我们使用程序调用 MapReduce 函数时, 下列的行为将会发生(例图1中的编号对应下列的数字):

  1. 在用户程序中的 MapReduce 库, 通常会先将输入的文件分片成 M 个大小为 16 ~ 64 MB的部分(由用户的可选参数控制) 当启动时, 会将程序的副本复制到集群的不同机器中
  2. 其中一个副本程序是特殊的 - Master. Master 将会给剩下的 worker 分配任务. 将会有 M 个 Map 和 R 个 Reduce任务被安排, Master 将会给空闲的 worker 分配任务.
  3. 每个 worker 将会读取相应的输入分片文本, 使用用户定义的 Map 函数对输入数据进行转换. 期间这些数据将以缓存的形式存储在内存中
  4. 周期性的, 缓存中的键值对将会写入本地磁盘, 数据将会通过分区函数被分为 R 个部分, 文件所存储的位置将会告知 Master ,Master 将会把这些位置发给 Reduce
  5. 当 Reduce 的 worker 从 Master 获取了中间产物的存储地址, 将会通过远程调用(RPC)从 Map Worker 的本地磁盘中读取这些文件, 当 Reduce 读取完成所有需要的中间文件时. 会将数据根据Key的值进行排序分组. 之所以需要排序是因为通常有许多不同的 key 放置在同一个 Reduce 进行工作. 如果大小超出了内存大小, 那么将会使用外部的排序.
  6. Reduce worker 将会对排序后的中间数据按照唯一的key进行遍历, 将传递 key 和对应key的value集合到用户定义的 Reduce 函数, 函数输出的结果将被附加到最终该Reduce分区的输出文件
  7. 当所有的 Map 和 Reduce 任务都完成后, MapReduce 唤醒用户程序, 并返回用户code

    在成功之后, mapreduce 的执行结果存放在 R 个文件中(每个文件对应一个reduce任务, 文件名上将会指明所属的用户)通常来说用户无需合并这些成为一个文件, 通常将这些文件作为另外一个 MapReduce 的输入数据, 或者其他能够将这些文件分割成更多文件的分布式应用

3.2 Master 的数据结构

Master 对于每个 map 或者 reduce 任务保留少量的数据结构, 例如: 存储的状态(空闲 / 工作中 / 完成), 每个工作机器的身份(正在忙碌的机器)

    Master像是一种管道, 将 Map 产生的中间值的文件存放信息传输至Reduce 任务的 Worker 中.因此, 对于每个完成的 Map 任务, Master 需要将上传的生成的文件列表以及大小进行储存. 以便下发到逐渐启动的Reduce Worker 中.

3.3 错误容忍

自从 MapReduce 被设计成使用数百或者数千机器帮助处理大量数据, 但是也需要能优雅的对失败进行容忍.

Worker 失败

    Master 节点将会周期性的 ping 每个 Worker. 如果在一定的时间内没有得到 worker 的回应, master 将会认为这个 worker 已经失败. 任何的 worker 完成 Map 任务后会将自身的状态变更为空闲状态.这样就变得有资格接受调度. 同样, 任何 map / reduce 任务遇到失败, 也会将自身的状态变为空闲, 从而获得调度的资格.

完成的 Map 任务因为结果存储在机器的本地磁盘上, 如果机器变得不可访问时,就需要重新在另一台执行相同 Map 任务. 而 Reduce 因为结果存储在全局共享的文件系统, 所以即使运行 Reduce 的机器失败, 还是能读取之前的结果.

当一个 Map 任务被 A 执行, 之后又被 B 执行(因为 A 失败了), 所有正在进行的 reduce 任务将会收到重新执行的通知.还未从A节点读取数据的 reduce 将会从 B 进行读取数据.

MapReduce 对大规模的 worker 失败具有弹性. 例如: 在一次MapReduce 运行期间, 因为网络维护导致 80 台机器在几分钟内无法问问. MapReduce 只是简单地重新执行不能访问 worker 的工作, 并继续执行, 最终完成了操作.

Master Failure

    可以让 Master 周期的将上述的数据结构写入检查点. 如果主控失败, 可以从最近的检查点开始一个新副本. 但是鉴于只有一个 master , 失败的情况比较少见. 如果 Master 失败,当前的实现是终止 MapReduce 的计算. 客户端在观察到这种情况时重试 MapReduce 操作.

出现错误的语义

    当用户编写的 map 和 reduce 函数是其输入值的确定性函数时(幂等函数), 我们分布式实际产生的输出与整个程序无故障顺序执行产生的结果相同.

    我们通过 map 和 reduce 任务的原子提交来保证这个性质. 每个进行中的任务将输出写入临时的私有文件. 一个 Reduce 任务产生一个文件, 一个 Map 任务产生 R 个文件(每一个将会被单独的一个 Reduce 任务读取). 当 Map 任务完成时, 将会向 Master 发送一条含有 R 个临时文件名的消息. 如果 Master 已经收到过该 Map 任务的完成消息, 那么这条消息将被忽略. 否则将会在主数据结构中记录 R 文件的名称.

    当一个 Reduce 任务完成时. Reduce Worker 会将临时文件名重命名为最终的输出文件. 如果一个 reduce 在多台机器上执行, 将会执行多次重命名. 通过文件系统的提供的原子重命名操作保证最后的只含有一个文件是最终输出的数据.

    绝大部分的 map 和 reduce 运算符是确定的, 这种情况下, 我们的语义等同于顺序执行, 这使得程序的行为能够预测. 但在 map 和 reduce 运算符是不确定时, 提供较弱但是合理的语义. 例如: reduce 的结果 R1 等价于非确定性程序顺序执行产生的输出. 但是另一个 reduce 的结果 R2 可能因为执行顺序原因出现差异.
     考虑到 map 任务 M 和 reduce 任务 $R{1}$ 和 $R{2}$ . 使用 e($R{i}$) 定义 $R{i}$ 次执行(仅执行一次). 虚落的语义出现因为 e($R{1}$) 与 e($R{2}$) 在不同机器上的 M 中执行.

3.4 局限性

    网络带宽在我们的计算环境中是相对稀缺的资源. 我们将输入的数据(由GFS8管理)存储在组成集群机器的本地磁盘中来节省网络带宽. GFS 将文件划分成每块 64 MB的大小, 并且对每个块复制了几个副本(通常是3个副本)存储在不同的主机上. MapReduce 的 master 也将输入文件的位置信息考虑在内, 并尝试相应机器上调度 map 任务. 如果无法在本机进行读取, 那么将会尝试就近读取(寻找同一个网络交换机上的机器). 在集群中运行的大部分大型的 MapReduce 输入数据是本地读取的, 并不消耗网络带宽.

3.5 任务粒度

如上所述, 我们将 map 阶段划分为 M 个任务, 将 reduce 阶段划分为 R 个任务. 理想情况下, M 和 R 应该是超过了 worker 机器的数量. 不同的 worker 执行不同的任务能够增强动态平衡性, 并且也可以加快从故障恢复的速度: 通过将失败的任务分发到其他主机上, 使其完成

    在我们的实现中 M 和 R 的大小是有实际的边界的,从 master 需要制造O(M + R)个调度决定了需要内存中开辟 O(M * R) 的空间进行存储.(然而内存使用的常数因数很小: O(M ∗R) 部分由每个 map 任务/reduce 任务对状态存储大约需要一个字节的数据组成)

    此外, R 经常受到用户的限制, 因为每个 reduce 任务的输出最终都在一个单独的输出文件中. 在实践中, 我们倾向于选择 M 以便每个单独的任务大约是 16 MB 到 64 MB 的输入数据(这样上面描述的局部性优化是最有效的), 并且我们将 R 设为我们工作机器数量的小倍数 期待使用. 我们经常使用 2,000 台工作机器执行 M = 200, 000 和 R = 5, 000 的 MapReduce 计算.

3.6 备份任务

    延长 MapReduce 执行时间的一个常见原因是”落后者”: 一台机器需要非常长的时间去完成计算中的最后几个 map 或者 reduce. 落后者的出现可能有很多原因. 例如, 损坏的硬盘可能需要频繁的纠正错误, 使读取的速度从 30MB/s 降到 1MB/s. 集成调度系统可能已经在该机器上调度了其他的任务, 导致它执行 MapReduce时,因为发生 CPU / 内存 / 磁盘 或者 网络带宽 的竞争而变成更慢. 我们最近遇到的一个问题是机器初始化代码中的一个错误, 它导致处理器的缓存被禁用: 受影响的机器上的计算速度减缓了一百多倍.

    我们有一个通用的机制来缓解落后者的问题. 当 MapReduce 接近完成时, master 会将还在执行中的任务’备份’执行. 当主执行或者备份的任务执行完成时, 该任务将会被标记成已完成. 通过这样调整, 使总体使用的计算资源增加不超出几个百分点. 但是这样显著减少了完成大型 MapReduce 的操作时间. 例如, 当禁用备份任务机制时, 5.3 节中描述的排序程序需要额外44%的时间才能完成.

4 改进

    尽管简单编写的 map 和 reduce 函数能够满足大多数的需求, 但是我们发现一些扩展很有用. 接下来将介绍这些

4.1 分区函数

    MapReduce 的用户指定 reduce 任务输出的文件数量 R 时, 分区函数将会根据中间值产生的 key 进行分区. 默认的分区函数是使用哈希(例如 hash(key) mod R). 这样的结果让每个分片相当平衡. 在一些情况下, 需要与某些函数进行组合才能更加高效. 例如: 有些数据的输出 key 为 URL, 我们想要相同的主机的条目分配到同一个分区. 为了支持这种情况, 我们需要提供一个特别的分区函数, 例如使用 “hash(Hostname(urlkey)) mod” 这样就能将相同主机条码存入同一个分区.

4.2 顺序承诺

    当给指定一个分区时我们承诺中间值的处理顺序是按照key的递增顺序进行的, 这样的顺序执行承诺让生成的输出文件变得有序. 这样的输出在利用key进行随机访问时非常高效效, 或者是有序的输出对用户非常方便.

4.3 组合函数

    在一些情况下, 每个map 任务产生的中间key 存在明显的重复, 使用 reduce 进行关联合并. 就如之前举例的在章节2.1中单词统计. 由于词频偏向于 Zipf 分布, 所以每个map 将会产生成百上千的 (the, 1). 所有的这些记录, 将会被发到单独的 reduce 任务中去, 然后被相加统计成一个数字. 我们允许用户指定一个可选的Combiner 函数. 在通过网络发送之前对其进行合并.

    Combiner 函数将会在每台执行 map 任务的机器上执行. 通常 Combiner 和 reduce 函数几乎一致. 唯一的区别在于 MapReduce 库处理两者输出的方式. reduce 的输出会写入到最终输出的目录. 而 combiner 的输出将会写入到中间值目录并且将会发送到 reduce 任务.

    部分的合并显然加快了某些 MapReduce 操作的类, 附页 A 包含了使用 Combiner 的示例.

4.4 输入和输出的类型

    MapReduce 库支持以多种不同格式中读取数据的方法. 例如: “文本”模式将每行作为键值对: 键是文件中的偏移, 值是行的内容. 另一种常见支持的格式是以 key 的顺序, 并以键值对的方式进行存储. 每个输入类型的实现都知道如何有意义的将自己拆分成map 任务的输入. (例如, 文本模型中将拆分范围控制在一行中). 用户可以在 reader 接口中实现支持新的类型输入, 对于普通用户来说, 只要传入想要的处理的输入类型编号即可.

    Reader 不一定需要提供从文件中读取的数据. 例如, 也可以指定 Reader 从数据库中读取数据, 或者从映射到内存中的数据结构中读取.

    用相同的方式, 我们也可以指定输出的类型, 或者由用户进行自定义.

4.5 其他效益

    在一些情况下, 使用 MapReduce 的过程中发现, 在进行文件输出时, 同时输出些附属的文件是一件很方便的事情. 不过程序员需要保证相应函数的原子性和幂等性. 通常, 应用程序会写入一个临时文件, 并在该文件完全生成后,自动重命名该文件. 我们不支持两阶段提交单个任务产生的多个文件. 所以输出多个文件有一致性要求的 Map 任务应该是明确的. 这种现在实践中不是问题.

4.6 掠过错误的记录

    有些时候因为用户编写的bug 导致了 Map 和 Reduce 函数因为某些记录崩溃. 此类错误导致 MapReduce 操作无法完成. 通常的做法是修复错误, 但是有些时候错在三方库中. 此外, 有些时候忽略一些是可以接受的. 例如在进行统计分析一个大数据集. 我们提供了一个可选模式. 当遇到一个可能崩溃的记录时, 跳过它并继续执行.

    每个 worker 线程都会安装一个信号处理程序, 用于捕获分段违规和总线错误.在调用用户 Map 或 Reduce 操作之前, MapReduce 库将参数的序列号存储在全局变量中. 如果用户代码生成信号, 则信号处理程序将包含序列号的“last gasp”UDP 数据包发送到 MapReduce 主机. 当 master 在特定记录上看到多个故障时, 它表明在下一次重新执行相应的 Map 或 Reduce 任务时应该跳过该记录.

4.7 本地执行

    调试 Map 或者 Reduce 函数中的问题可能变得棘手, 因为实际的计算发生在分布式系统中, 通常位于数千台机器上, 工作分配决策由 master 动态做出.为了帮助促进调试、分析和小规模测试, 我们开发了 MapReduce 库的替代实现, 它在本地机器上按顺序执行 MapReduce 操作的所有工作. 向用户提供了控件, 以便计算可以仅限于特定的 Map 任务. 用户使用特殊标志调用他们的程序, 然后可以轻松使用他们认为有用的任何调试或测试工具(例如 gdb).

4.8 状态信息

    主服务器运行了一个内部的 HTTP 服务器并展示了一系列专题页面以供使用. 状态页面显示计算的进度, 例如已经完成了多少任务, 有多少正在进行中, 输入字节数, 中间数据字节数, 输出字节数, 处理速率等. 页面还将展示每个任务出现的错误数量, 和输出的文件连接. 用户可以使用这些数据来预测计算需要多长时间, 以及是否应该将更多资源添加到计算中. 这些页面还可用于确定计算何时比预期慢得多.

    此外, 顶级状态页面显示哪些 Worker 失败了, 以及失败时他们正在处理哪些 map 和 reduce 任务. 在尝试诊断用户代码中的错误时, 此信息很有用.

4.9 计数器

    MapReduce 库提供了计数功能去记录发生的各种各样的事件. 例如, 用户代码可能想要计算处理的总字数或索引的德语文档数等. 要使用此功能, 用户代码会创建一个命名的计数器对象, 然后在 Map 或 Reduce 函数中适当地递增计数器. 例如:

1
2
3
4
5
6
7
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");

    来自各个工作机器的统计值, 定期的发送到主服务器(随着 ping 一起发送). 主聚合计数器从已经成功的 Map 和 Reduce 任务中合计计数器的值, 并在 MapReduce 完成后, 返回给用户. 当前的计数器值也将显示在主节点的管理页面上, 以便人们能实时查看计算的进度. 主节点在合计所有计数器值时会消除重复执行任务的影响, 避免重复记录.
    一些计数器值由 MapReduce 库自动维护, 例如处理的输入键/值对的数量和产生的输出键/值对的数量.

    用户发现计数器功能对于检查 MapReduce 操作行为的完整性很有用. 例如, 在某些 MapReduce 操作中, 用户代码可能希望确保生成的键值对的数量完全等于处理的输入对的数量, 或者确保处理的德国文档的比例在文档总数的某个可容忍的范围内.

5 性能

    本章节, 我们测量了 MapReduce 在大型机器集群上运行的两个计算性能. 一个是在大约 1TB 的数据集中寻找符合模板的数据. 另一个是对大约 1TB 的数据进行排序. 这两个程序代表了 MapReduce 用户编写的真实程序的一部分: 一类是将数据从一种表现形式转换成另外一种表现形式, 另一类是从大型数据集中提取少量有趣的数据.

5.1 集群配置

    所有的程序都运行在一个由大约 1800 台机器组成的集群上执行. 每台机器有两个启用超线程的 2GHz 的 Intel Xeon 处理器, 4 GB 内存, 两个 160 GB IDE 磁盘, 和一个千兆以太网链路. 这些机器被安排在一个两级树形交换网络中, 在根部提供大约 100-200 Gbps 的聚合带宽. 所有机器都在同一个托管设施中, 因此任何一对机器之间的往返时间都小于一毫秒.
    在 4GB 内存中, 大约 1-1.5GB 被集群上运行的其他任务保留. 这些程序是在一个周末的下午执行的, 当时 CPU、磁盘和网络大多处于空闲状态

5.2 抓取

    Grep 程序扫描 $ 10^{10} $ 条 100 字节的记录, 搜索相对罕见的三字符模式(该模式出现在 92,337 条记录中). 输入被分成大约 64MB 的片段(M = 15000), 整个输出被放置在一个文件中(R = 1).

每秒数据传输率

    上图展示了, 随着程序的执行, 越来越多的机器被主节点分配了 Map 计算任务, 扫描的输入数据的速度逐渐上升, 当 worker 的数量达到 1764 个时, 速度也达到了顶峰: 30 GB/s. 随着所有 Map 任务逐渐的完成, 读取的速率开始下降并在大约80秒时达到0. 整个计算从开始到结束大约花费了150秒. 这包括大约一分钟的启动开销. 开销的主要原因是要将程序分发到所有的工作主机, 以及需要定位打开输入文件与 GFS 进行交互并获取局部优化所需信息的时间.

5.3 排序

    Sort 程序对 $ 10^{10} $ 条 100 字节的记录(大约 1TB)进行排序, 该程序仿照 TeraSort 基准测试 10.

    排序程序由不到50行的代码组成. Map 函数仅用了三行, 主要是取了每一行的前 10 字节作为排序的键, 将原始数据作为值. 组成中间值返回. 我们使用内置的 Identity 函数作为 Reduce 的操作. 此函数没有对输入输出的键值对进行修改. 排序的最终结果被写入一组2路复制的 GFS 文件(即, 程序的输出写了 2TB)

    与之前一样, 输入的数据被分成 64MB 的片段(M = 15000). 我们将排序后的输出划分为 4000 个文件 ( R = 4000 ). 分区函数使用 key 的初始字节去定位所放置的分区.

    我们对这个基准的分区函数有关于键的分布的内置知识.在一般的排序程序中, 我们会添加一个预处理的 MapReduce 操作, 该操作会收集一个键的样本, 并使用采样的键的分布来计算最终排序的分割点.

不同执行配置对于速率的影响

    图中(a) 显示了排序程序正常执行的进度. 左上图显示了读取输入的速率. 速率在 13 GB/s 左右达到峰值, 并且很快就结束了, 因为所有 Map 任务都在 200 秒之前完成. 输入速率小于 Grep 是因为排序映射任务花费大约一半的时间和 I/O 带宽将中间输出写入本地磁盘. Grep 的相应中间输出的大小可以忽略不计.

    左中图展示了数据通过网络从 Map 任务发送到 Reduce 任务的速率, 一旦第一个 Map 任务完成, 这种拖拽就开始了. 图中的第一个驼峰是针对第一批大约 1700 个 Reduce 任务(整个 MapReduce 分配了大约 1700 台机器, 每台机器一次最多执行一个 reduce 任务) 大约在300秒后, 其中一些 Reduce 任务已经处理了第一批数据, 我们开始为剩余的 Reduce 开始拖拽数据. 所有的拖拽大概是在开始运行后 600 时完成的.

    左下图显示了 Reduce 任务将输出写入最终输出文件的速率. 在第一个拖拽文件周期结束和写入周期开始之间存在延迟, 这是因为机器在忙于对中间数据进行排序. 写入将会以 2-4 GB/s 的速度持续一段时间. 所有的写入大概会在850秒后完成. 包括启动开销, 整个的计算需要891秒. 这与 TeraSort 基准测试的最佳结果相似18

    需要注意的几件事: 由于局部性优化, 输入数据高于数据拖拽的速度和输出速率, 因为大多数数据是从本地磁盘读取绕过了带宽受限的网络. 数据的拖拽速率高于输出率, 因为输出的阶段会写入两个已排序的数据副本(出于可靠性和可用性的原因, 我们制作了两个输出副本) 我们编写两个副本, 因为这是我们的底层文件系统提供的可靠性和可用性机制. 如果底层文件系统使用纠删码14 而不是复制, 则写入数据的网络带宽需求将会降低

5.4 备份任务的效果

    图中(b) 我们展示了禁用备份任务后执行排序程序的情况. 大部分流程与(a)中的一致, 只是程序的结尾被拖的很长, 几乎没有写入发生. 960秒后, 除了5个 Reduce 任务之外的所有任务都已完成. 然而, 这些最后几个落后者直到300秒后才完成,总耗时1283秒. 相比启用备份执行增加了44%的时间

5.5 机器故障

    图中(c)中, 我们在启动后的前几分钟内故意杀死了 1746 个 worker 中的200个. 底层集群调度程序立刻重启这些机器上的工作进程(因为只有进程被杀死, 机器仍然是正常进行的)

    当程序被杀死时, 在输入率的图中的表示是负值, 因为之前完成的 Map 消失了,并需要重新计算. 重新执行 Map 相对较快. 整个计算在 933 秒内完成, 包括启动开销(仅比正常的执行多了 5% 的时间)

6 经验

    我们在 2003 年 2 月编写了 MapReduce 库的第一个版本, 并在 2003 年 8 月对其进行了重大改进, 包括局部性优化、跨工作机执行任务的动态负载平衡等.从那时起, 我们一直很愉快 惊讶于 MapReduce 库对我们所处理的各种问题的广泛适用性. 它已在 Google 内的广泛领域中使用, 包括:

  • 大规模机器学习问题,
  • Google 新闻和 Froogle 产品的聚类问题,
  • 提取用于生成流行查询报告的数据(例如 Google Zeitgeist),
  • 为新的实验和产品提取网页属性(例如, 从大量网页语料库中提取地理位置以进行本地化搜索), 以及
  • 大规模图计算.

    上图展示了随着时间的推移, 在代码仓库中 MapReduce 程序的数量增长情况. 从 2003 年初的 0 个到 2004 年 9 月下旬接近 900 个独立程序. MapReduce 之所以这么成功. 是因为使简单的程序能够在超过一千台的机器上高效执行, 大大加快了开发和原型制作周期. 此外, 它允许没有分布式和并行系统经验的程序员轻松利用大量资源.


    在每个作业结束时, MapReduce 库记录有关作业使用的计算资源的统计信息. 在上图中, 我们展示了 2004 年 8 月在 Google 运行的 MapReduce 作业子集的一些统计数据.

6.1 大规模索引

    迄今为止, 我们对 MapReduce 最重要的用途之一是完全重写生产索引系统, 该系统生成用于 Google Web 搜索服务的数据结构. 索引系统将我们的爬虫系统检索到的大量文档作为输入, 存储为一组 GFS 文件. 这些文档的原始内容超过 20 TB.
索引的处理过程是 5 至 10 个的 MapReduce 按照顺序执行.使用 MapReduce(而不是先前版本的索引系统中的临时分布式传递)提供了几个好处:

  • 索引代码更简单、更小、更容易理解, 因为处理容错、分布和并行化的代码隐藏在 MapReduce 库中. 例如, 当使用 MapReduce 表示时, 一个计算阶段的大小从大约 3800 行 C++ 代码下降到大约 700 行.
  • MapReduce 库的性能足够好, 我们可以将概念上不相关的计算分开, 而不是将它们混合在一起以避免额外的数据传递. 这使得更改索引过程变得容易. 例如, 在我们的旧索引系统中需要几个月才能进行的一项更改只需要几天时间就可以在新系统中实施.
  • 索引过程变得更容易操作, 因为大部分由机器故障、机器缓慢和网络中断引起的问题都由 MapReduce 库自动处理, 无需操作员干预. 此外, 通过向索引集群添加新机器很容易提高索引过程的性能.

7 相关系统

    许多系统提供了受限的编程模型, 并使用这些限制来自动并行计算. 例如, 可以使用并行前缀计算 [6, 9, 13] 在 N 个处理器上以 log N 时间在 N 个元素数组的所有前缀上计算关联函数. 基于我们在大型现实世界计算方面的经验, MapReduce 可以被认为是对其中一些模型的简化和提炼. 更重要的是, 我们提供了可扩展到数千个处理器的容错实现. 相比之下, 大多数系统仅在较小的规模上实现并行处理, 并将处理机器故障的细节留给程序员.

    Bulk Synchronous Programming 17 和一些 MPI 原语 11 提供了更高级别的抽象,使程序员更容易编写并行程序。 这些系统和 MapReduce 之间的一个关键区别是 MapReduce 利用受限编程模型自动并行化用户程序并提供透明的容错

    我们的局部性优化从活动磁盘 12 15 等技术中汲取灵感,其中计算被推入靠近本地磁盘的处理元素中,以减少通过 I/O 子系统或网络发送的数据量。 我们在直接连接少量磁盘的商用处理器上运行,而不是直接在磁盘控制器处理器上运行,但一般方法是相似的。

    我们的备份任务机制类似于 Charlotte System 3 中采用的急切调度机制。 简单急切调度的缺点之一是,如果给定任务导致重复失败,则整个计算无法完成。 我们对不符合的记录进行跳过的机制修复了此问题的一些实例。

    MapReduce 实现依赖于内部集群管理系统,该系统负责在大量共享机器上分发和运行用户任务。 虽然不是本文的重点,但集群管理系统在精神上与 Condor 16 等其他系统相似。

    作为 MapReduce 库一部分的排序工具在操作上类似于 NOW-Sort 1。 源机器(地图工作者)对要排序的数据进行分区并将其发送给 R reduce 工作者之一。 每个reduce worker在本地(如果可能的话在内存中)对其数据进行排序。 当然,NOW-Sort 没有用户可定义的 Map 和 Reduce 函数,这些函数使我们的库广泛适用。

    River 2 提供了一种编程模型,其中进程通过分布式队列发送数据来相互通信。与 MapReduce 一样,River 系统试图提供良好的平均案例性能,即使存在由异构硬件或系统扰动引入的不均匀性。 River 通过仔细调度磁盘和网络传输以实现平衡的完成时间来实现这一点。MapReduce 有不同的方法。 通过限制编程模型,MapReduce 框架能够将问题划分为大量细粒度的任务。这些任务在可用的 Worker 上动态调度,以便更快的 Worker 处理更多任务。 受限的编程模型还允许我们在作业接近尾声时安排任务的冗余执行,这大大减少了在存在不均匀性(例如缓慢或卡住的 Worker )的情况下的完成时间。

    BAD-FS 5 具有与 MapReduce 非常不同的编程模型,并且与 MapReduce 不同,它的目标是跨广域网执行作业。 然而,有两个基本的相似之处。 (1) 两个系统都使用冗余执行来从故障导致的数据丢失中恢复。 (2) 两者都使用局部感知调度来减少通过拥塞的网络链路发送的数据量。

    TACC [7] 是一个旨在简化高可用性网络服务构建的系统。 与 MapReduce 一样,它依赖于重新执行作为实现容错的机制

8 结论

    MapReduce 编程模型已在 Google 成功用于许多不同的目标。 我们将这一成功归因于几个原因。 首先,该模型易于使用,即使对于没有并行和分布式系统经验的程序员也是如此,因为它隐藏了并行化、容错、局部优化和负载平衡的细节。 其次,各种各样的问题都可以很容易地表达为 MapReduce 计算。 例如,MapReduce 用于为 Google 的生产网络搜索服务生成数据,用于排序、数据挖掘、机器学习和许多其他系统。 第三,我们开发了 MapReduce 的实现,可以扩展到包含数千台机器的大型机器集群。 该实现有效地利用了这些机器资源,因此适用于 Google 遇到的许多大型计算问题.

    我们从这项工作中学到了几件事。 首先,限制编程模型使得并行化和分布计算变得容易,并使这种计算容错。 其次,网络带宽是一种稀缺资源。 因此,我们系统中的一些优化旨在减少通过网络发送的数据量:局部性优化允许我们从本地磁盘读取数据,并将中间数据的单个副本写入本地磁盘可以节省网络带宽。 第三,冗余执行可以用来减少慢机器的影响,处理机器故障和数据丢失。

致谢

    Josh Levenberg has been instrumental in revising and extending the user-level MapReduce API with a number of new features based on his experience with using MapReduce and other people’s suggestions for enhancements. MapReduce reads its input from and writes its output to the Google File System 8. We would like to thank Mohit Aron, Howard Gobioff, Markus Gutschke, David Kramer, Shun-Tak Leung, and Josh Redstone for their work in developing GFS. We would also like to thank Percy Liang and Olcan Sercinoglu for their work in developing the cluster management system used by MapReduce. Mike Burrows, Wilson Hsieh, Josh Levenberg, Sharon Perl, Rob Pike, and Debby Wallach provided helpful comments on earlier drafts of this paper. The anonymous OSDI reviewers, and our shepherd, Eric Brewer, provided many useful suggestions of areas where the paper could be improved. Finally, we thank all the users of MapReduce within Google’s engineering organization for providing helpful feedback, suggestions, and bug reports.

引用

[1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, David E. Culler, Joseph M. Hellerstein, and David A. Patterson. High-performance sorting on networks of workstations. In Proceedings of the 1997 ACM SIGMOD International Conference on Management of Data, Tucson, Arizona, May 1997.

[2] Remzi H. Arpaci-Dusseau, Eric Anderson, Noah Treuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River: Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS ’99), pages 10–22, Atlanta, Georgia, May 1999.

[3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996.

[4] Luiz A. Barroso, Jeffrey Dean, and Urs H¨olzle. Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22–28, April 2003.

[5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.

[6] Guy E. Blelloch. Scans as primitive parallel operations. IEEE Transactions on Computers, C-38(11), November 1989.

[7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78– 91, Saint-Malo, France, 1997.

[8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29–43, Lake George, New York, 2003.

[9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par’96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401–408. Springer-Verlag, 1996.

[10] Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/.

[11] William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999.

[12] L. Huston, R. Sukthankar, R. Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004.

[13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831–838, 1980.

[14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335–348, 1989.

[15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68–74, June 2001.

[16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004.

[17] L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103–111, 1997.

[18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.

附录A 统计词频程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include "mapreduce/mapreduce.h"
// User’s map function
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
// User’s reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into "spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combiner_class("Adder");
// Tuning parameters: use at most 2000
// machines and 100 MB of memory per task
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// Now run it
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// Done: ’result’ structure contains info
// about counters, time taken, number of
// machines used, etc.
return 0;
}

Author: Sean
Link: https://blog.whileaway.io/posts/e44cfcd2/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.