Hadoop - MapReduce的核心原理、执行流程、运行架构详解
一、基本介绍
1,什么是 MapReduce?
- MapReduce 是一种分布式计算框架,算是大数据行业的第一代离线数据计算引擎,可以稳定、可靠地并行处理 TB、PB 级别的海量数据,主要用于搜索领域。
- MapReduce 计算引擎的核心思想是,将计算逻辑抽象成 Map 和 Reduce 两个阶段进行处理。
2,MapReduce 的前世今生
(1)MapReduce 源于 Google 在 2004 年发表的论文 Simplified Data Processing on Large Clusters。
(2)MapReduce 属于 Hadoop 项目的核心组件,主要负责海量数据的分布式计算。
- 在 Hadoop 1.x 版本中,MapReduce 需要负责分布式数据计算和集群资源管理,这导致 MapReduce 比较臃肿,并且此时在 Hadoop 集群中只能运行 MapReduce 任务,无法运行其他类型的任务。
- 从 Hadoop 2.x 版本开始,官方将 MapReduce 的功能进行了拆分,并引入了 YARN。此时 MapReduce 只需要负责分布式数据计算,YARN 负责集群资源管理和分配。这样拆分之后,YARN 就成了一个公共的集群资源管理平台,在它上面不仅可以运行 MapReduce 任务,还可以运行其他类型的任务。
提示:由于 Hadoop 起步比较早,属于大数据的开拓者,引入 YARN 之后,它变成了一个平台提供者,这样可以更好地发展基于 Hadoop 的生态圈。后来兴起的 Spark 和 Flink 这些计算引擎都可以在 YARN 上执行,这就更加巩固了 Hadoop 在大数据生态圈中的地位。
二、MapReduce 的核心思想
1,分而治之
(1)MapReduce 是分布式运行的,由 Map 和 Reduce 两个阶段组成。
- Map 阶段是一个独立的程序,可以在多个节点同时运行,每个节点处理一部分数据。
- Reduce 阶段也是一个独立的程序,可以在一个或多个节点同时运行,每个节点处理一部分数据
注意:如果是全局聚合需求,则 Reduce 阶段只会在一个节点上运行。
以计算一摞扑克牌中黑桃的个数为例:
- 第一步:把这摞扑克牌分配给在座的所有玩家。
- 第二步:让每个玩家检查自己手中的扑克牌有多少张黑桃,然后把这个数字汇报给你。
- 第三步:你把所有玩家告诉你的数字加起来,得到最终的结果。
2,移动计算
(1)传统的计算方式是,把需要计算的数据通过网络传输到计算程序所在的节点上。如果需要计算的数据量比较大,则这种方式效率就比较低了,因为需要通过网络传输大量的数据,会受制于磁盘 I/O 和网络 I/O(网络 I/O 是最消耗时间的)。这种计算方式可以被称为移动数据。
(2)如果把计算程序移动到数据所在的节点,即计算程序和数据在同一个节点上,则可以节省网络 I/O。这种方式可以被称为移动计算。
提示:计算程序是很小的,一般也就几十 KB 或几百 KB,通过网络复制计算程序不会消耗多少时间,几乎可以忽略不计。
(4)但是计算程序只能计算当前节点上的数据,无法获取全局的结果,所以还需要有一个汇总程序,这样每个数据节点上计算的临时结果就可以通过汇总程序得到最终的结果。
三、MapReduce 的执行流程
1,执行原理
(1)下图是 MapReduce 详细的执行原理,其中左下角是一个 File(文件),这个文件表示输入数据源。文件下面是多个 Block,说明这个文件被切分成了多个 Block。文件上面是一些 Split,Split 表示 File 的切片,这里的切片是逻辑切分,不会对 Block 数据进行真正的切分,默认情况下 Split 的大小等于 Block 的大小。
注意:特殊情况下 Split 的大小会大于 Block 的大小。默认会先按照 Block 的大小将文件切分为 Split,当(文件的剩余大小/128MB) ≤ 1.1 时,会将剩余的内容划分到一个 Split 中,这主要是为了提高计算效率。
(2)MapReduce 任务在执行时,针对每个 Split 都会产生一个 Map Task。下图中一共产生了 5 个 Map Task。
(1)一个 1G 的文件,会产生多少个 map 任务?
- Block 块默认是 128M,所以 1G 的文件会产生 8 个 Block 块
- 默认情况下 InputSplit 的大小和 Block 块的大小一致,每一个 InputSplit 会产生一个 map 任务
- 所以:1024/128=8 个 map 任务
(2)1000 个文件,每个文件 100KB,会产生多少个 map 任务?
- 一个文件,不管再小,都会占用一个 block,所以这 1000 个小文件会产生 1000 个 Block
- 那最终会产生 1000 个 InputSplit,也就对应着会产生 1000 个 map 任务
(3)一个 140M 的文件,会产生多少个 map 任务?
- 根据前面的分析,140M 的文件会产生 2 个 Block,那是不是对应的就会产生 2 个 InputSplit 了?其实不是的。
- 注意:这个有点特殊,140M/128M=1.09375<1.1
- 所以,这个文件只会产生一个 InputSplit,也最终也就只会产生 1 个 map 任务。
(3)Map Task 计算的中间结果会通过 Shuffle 远程复制到 Reduce Task 中进行汇总计算。
(4)图中一共有 3 个 Reduce Task,每个 Reduce Task 负责处理一部分数据。3 个 Reduce Task 最终会在结果目录下产生 3 个文件:part-r-00000、part-r-00001 和 part-r-00002。
2,Map 阶段详解
(1)以统计 hello.txt 文件中每个单词出现的总次数为例。假设文件中有两行内容,单词之间使用空格分隔,文件内容如下:
hello you hello me
提示:<k1,v1> 表示键值对类型的数据。后面还会出现 <k2,v2> 和 <k3,v3>,分别代表数据的不同阶段。
- 所以,hello.txt 文件中的数据经过第一步处理之后的结果如下:
<0,hello you> <10,hello me>
提示:第 1 次执行此步骤会产生 <0,hello you>,第 2 次执行此步骤会产生 <10,hello me>。并不是执行一次就获取这两行结果,因为框架每次只会读取一行数据,这里只是把两次执行的最终结果一起列出来了。
(3)第二步:MapReduce 框架调用 Mapper 类中的 map() 函数。map() 函数的输入是 <k1,v1>,输出是 <k2,v2>。一个 Split 对应一个 Map Task,程序员需要自己覆盖 Mapper 类中的 map() 函数,实现具体的业务逻辑。
- 因为需要统计文件中每个单词出现的总次数,所以需要先把每一行内容中的单词切开,然后记录每个单词出现次数为 1,这个逻辑需要在 map() 函数中实现。
- 对于 <0,hello you>,执行 map() 函数中的逻辑之后结果为:
<hello,1> <you,1>
- 对于 <10,hello me>,执行 map() 函数中的逻辑之后结果为:
<hello,1> <me,1>
(4)第三步:MapReduce 框架对 map() 函数输出的 <k2,v2> 数据进行分区,不同分区中的 <k2,v2> 由不同的 Reduce Task 处理。默认只有 1 个分区,所以所有的数据都会被分到 1 个分区中,最后只产生一个 Reduce Task。
- 经过这个步骤之后,数据没什么变化。如果有多个分区,则需要将这些数据根据指定的分区规则分开。
<hello,1> <you,1> <hello,1> <me,1>
(5)第四步:MapReduce 框架将每个分区中的数据都按照 k2 进行排序和分组。分组表示把相同 k2 的 v2 分到一个组。
- 按照 k2 进行排序:
<hello,1> <hello,1> <me,1> <you,1>
- 按照 k2 进行分组:
<hello,{1,1}> <me,{1}> <you,{1}>
(6)第五步:在 Map 阶段,MapReduce 框架选择执行 Combiner 过程。Combiner 可以被翻译为“规约”。
- 在这个例子中,最终是要在 Reduce 阶段汇总每个单词出现的总次数,所以可以在 Map 阶段提前执行 Reduce 阶段的计算逻辑,即在 Map 阶段对单词出现的次数进行局部汇总,这样就可以减少 Map 阶段到 Reduce 阶段的数据传输量,这就是规约的好处。
提示:并不是所有场景都可以使用规约。对于求平均值之类的操作就不能使用规约了,否则最终计算的结果就不准确了。
(7)第六步:MapReduce 框架会把 Map Task 输出的 <k2,v2> 写入 Linux 系统的本地磁盘文件中。至此,整个 Map 阶段执行结束。
- 写入 Linux 系统本地磁盘文件的内容大致如下:
<hello,{1,1}> <me,{1}> <you,{1}>
3,Reduce 阶段详解
提示:MapReduce 程序是由 Map 和 Reduce 这两个阶段组成的,但是 Reduce 阶段并不是必需的。如果某个需求不需要最终的汇总聚合操作,则只需要对数据进行清洗处理,即数据经过 Map 阶段处理完就结束了,Map 阶段可以直接将结果数据输出到 HDFS 中。
(1)第一步:MapReduce 框架对多个 Map Task 的输出,按照不同的分区,通过网络复制到不同的 ReduceTask 中。这个过程被称为 Shuffle。
- 当前需求只涉及 1 个分区,所以把数据复制到 Reduce Task 之后不会发生变化。
<hello,{1,1}> <me,{1}> <you,{1}>
(2)第二步:MapReduce 框架对 Reduce Task 接收到的相同分区的 <k2,v2> 数据进行合并、排序和分组。Reduce Task 接收到的是多个 Map Task 的输出,所以需要对多个 Map Task 中相同分区的数据进行合并、排序和分组。
- 当前需求需求只涉及 1 个 Map Task、1 个分区,所以执行合并、排序和分组之后数据是不变的。
<hello,{1,1}> <me,{1}> <you,{1}>
(3)第 三 步 : MapReduce 框架调用 Reducer 类中的 reduce() 函数 。 reduce() 函数的输入是 <k2,{v2...}>,输出是 <k3,v3>。每个 <k2,{v2...}> 会调用一次 reduce() 函数,程序员需要覆盖 reduce() 函数实现具体的业务逻辑。
- 这里需要先在 reduce() 函数中实现最终的聚合计算逻辑,将相同 k2 的 {v2...} 累加求和,然后转换为 <k3,v3> 写出去。此需求中会调用 3 次 reduce() 函数,最终的结果如下所示:
<hello,2> <me,1> <you,1>
(4)第四步:MapReduce 框架把 Reduce Task 的输出结果保存到 HDFS 中。至此,整个 Reduce 阶段执行结束。
- 结果文件内容如下:
hello 2 me 1 you 1
4,Shuffle 过程详解
(1)shuffer 是一个网络拷贝的过程,是把 map 端产生的数据通过网络拷贝到 reduce 阶段进行统一聚合计算。
(2)通过上图可以看到 shuffle 的一些细节信息:
- 首先看 map 阶段,map 任务在执行的时候会把 k1,v1 转化为 k2,v2,这些数据会先临时存储到一个内存缓冲区中,这个内存缓冲区的大小默认是 100M(io.sort.mb 属性),当达到内存缓冲区大小的 80%(io.sort.spill.percent)也就是 80M 的时候,会把内存中的数据溢写到本地磁盘中(mapred.local.dir),一直到 map 把所有的数据都计算完,最后会把内存缓冲区中的数据一次性全部刷新到本地磁盘文件中。在上面这个图里面表示产生了 3 个临时文件,每个临时文件中有 3 个分区,这是由于 map 阶段中对数据做了分区,所以数据在存储的时候,在每个临时文件中也划分为了 3 块,最后需要对这些临时文件进行合并,合并为一个大文件,因为一个 map 任务最终只会产生一个文件,这个合并之后的文件也是有 3 个分区的。
- 这 3 个分区的数据会被 shuffle 线程分别拷贝到三个不同的 reduce 节点,图里面只显示了一个 reduce 节点,下面还有两个没有显示。不同 map 任务中的相同分区的数据会在同一个 reduce 节点进行合并,合并以后会执行 reduce 的功能,最终产生结果数据。
5,执行流程示例
(1)下面是通过单文件方式描述单词计数案例的执行流程:
(2)在单文件的执行流程中,有一些阶段数据的变化不是很清晰。下面通过多文件的方式进行分析。多文件肯定会有多个 Block,这样就会产生多个 Split,进而产生多个 Map Task。
- 下面示例使用自定义分区将数据分为两个分区,并且使用了可选的 Combiner(规约)在 Map 端提前对数据进行了局部聚合,这样可以减少 Shuffle 过程传输的数据量,提高任务的执行效率。
四、MapReduce 在 YARN 上的架构分析
1,基本介绍
在 Hadoop 3.x 中,MapReduce 是在 YARN 中执行的。MapReduce 在 YARN 上的运行架构,大致可以分为两个阶段:
- 第 1 个阶段:ResourceManager(实际是 ResourceManager 中的 ApplicationManager)启动 MR AppMaster 进程。MR AppMaster 是 MapReduce ApplicationMaster 的简写,主要负责管理 MapReduce 任务的生命周期。对于每一个 MapReduce 任务都会启动一个 AppMaster 进程。
- 第 2 个阶段:MR AppMaster 创建应用程序,申请资源,并且监控应用程序的运行过程。
2,详细执行流程
(1)用户通过客户端节点向集群提交任务,该任务首先会找到 ResourceManager 中的 ApplicationManager。
(2)ApplicationManager 在接收到任务后,会在集群中找一个 NodeManager,并在该 NodeManager 所在的节点上分配一个 Container(Container 是 YARN 动态分配的资源容器,包括一定的内存和 CPU),在这个 Container 中启动此任务对应的 MR AppMaster 进程,该进程用于进行任务的划分和任务的监控。
(3)MR AppMaster 在启动之后,会向 ResourceManager 中的 ApplicationManager 注册其信息,目的是与之通信。这样用户就可以通过 ResourceManager 查询作业的运行状态了。
(4)MR AppMaster 向 ResourceManager 中的 ResourceScheduler 申请计算任务所需要的资源。
(5)MR AppMaster 在申请到资源之后,会与对应的 NodeManager 通信,要求它们启动应用程序所需的任务(Map Task 和 Reduce Task)。
(6)各个 NodeManager 启动对应的 Container 来执行 Map Task 和 Reduce Task。
(7)各个任务(Map Task 和 Reduce Task)会向 MR AppMaster 汇报自己的执行进度和执行状况,以便让 MR AppMaster 随时掌握各个任务的运行状态,在某个任务出了问题之后重启执行该任务。在任务运行期间,用户可以通过 MR AppMaster 查询任务当前的运行状态。
(8)在任务执行完成之后,MR AppMaster 向 ApplicationManager 汇报,让 ApplicationManager 注销并关闭自己,释放并回收资源。