
1.mapreduce特点:
1:良好的扩展性
2:高容错性
3:不擅长DAG(有向图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个应用程序的输出,在这种情况下,Mapreduce并不是不能做,而是使用后每个Mapreduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常低下
2.MapReduce进程
1. MRAppMaster:负责整个程序过程调度及状态协调
2. MapTask:负责map阶段整个数据处理流程
3. ReduceTask:负责reduce阶段整个数据处理流程
3. MR框架原理

4.mr整体流程

计算流程是:输入分片 —> map阶段 —> combiner阶段(可选) —> shuffle阶段 —> reduce阶段 —> 数据输出
4.1 inputformat数据输入

4.2 输入文件指定
对于文件的应用,提供四类静态方法提供job的输入路径,用来指定文件的读取路径
- public static void addInputPath(Job job,Path path) throws IOException
- public static void addInputPaths(Job job, String commaSeparatedPaths) throws IOException
- public static void setInputPaths(Job job, Path… inputPaths) throws IOException
- public static void setInputPaths(Job job,String commaSeparatedPaths) throws IOException
4.2.1FileInputFormat切片机制☆
- 简单按照文件的内容长度进行切片
- 切片大小,默认等于Block大小
- 切片时不考虑数据集整体,而是逐个针对每个文件进行单独的切片切片大小,默认等于Block大小
切片前数据 | 切片后数据 |
file1.txt 320M file2.txt 10M | file1.txt.split1– 0~128 file1.txt.split2– 128~256 file1.txt.split3– 256~320 file2.txt.split1– 0~10M |
源码中的切片大小设置
Math.max(minSize, Math.min(maxSize,blockSize)); mapreduce.input.fileinputformat.split.minsize=1 默认值1 mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默认为Long.MAXValue |
因此默认情况下,切片大小等于= block
切片大小设置:
- maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值
- minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比bolckSize还大
4.2.2TextInputFormate切片机制
MapReduce默认使用的切片机制。在切片时不考虑数据集整体,也就是不考虑输入路径下其他文件,而是逐个对每一个文件单独切片
#文本内容 abc def ghi hello world Sadge Sadge Sadge #键值对内容 < 0, abc def ghi > < 13, hello world > < 26, Sadge Sadge Sadge> |
4.2.3 KeyValue切片机制
每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReaderKEY_ VALUE_ SEPERATOR, “\t”);来设定分隔符。默认分隔符是tab (\t)
以下是一个示例,输入是一个包含4条记录的分片。 其中—->表示一个(水平方向的)制表符。
line1—->Rich learning form line2—->Intelligent learning engine line3—->Learning more convenient line4—->From the real demand for more close to the enterprise |
切片后结果
(1ine1,Rich learning form) (1ine2,Intelligent learning engine) (line3,Learning more convenient) (line4,From the real demand for more close to the enterprise) |
4.2.4 NLineInputFormat切片机制
NlineInputFormat,代表每个map进程处理的InputSplit不再按Block块去划分,而是按NlineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数,如果不整除,切片数=/商+ 1。
Rich learning form Intelligent learning engine Learning more convenient From the real demand for more close to the enteprise |
如果N是2,则每个输入分片包含两行,开启2个MapTask
第一个Map任务,执行首两行
(0,Rich learning form) (19,Inelligent 1eaming engine) |
第二个Map,执行后两行
(47,Learning more convenient) (72,From the real demand for more close to the enterprise) |
4.2.5 CombinTextInputFormat切片机制
默认情况下TextInputformat对任务的切片机制是按文件规划切片,不管文件多小,都会有一个单独的切片,都会交给一个maptask,如果有大量的小文件,就会产生大量的maptask,处理效率及其低下
CombineTextInputFormat:用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个 MapTask 处理
设置方法:
// 如果不设置InputFormat,它默认用的是TextInputFormat.classjob.setInputFormatClass(CombineTextInputFormat.class); //虚拟存储切片最大值设置4mCombineTextInputFormat.setMaxInputSplitSize(job, 4194304); |
假设 setMaxInputSplitSize 值为 4M,有如下四个文件
a.txt 1.7M
b.txt 5.1M
c.txt 3.4M
d.txt 6.8M
1)虚拟存储过程
1.1)将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不大于设置的最大值,逻辑上划分一个块。
1.2)如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块,当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
1.7M < 4M 划分一块
5.1M > 4M 但是小于 2*4M 划分二块:块1=2.55M,块2=2.55M
3.4M < 4M 划分一块
6.8M > 4M 但是小于 2*4M 划分二块:块1=3.4M,块2=3.4M
最终存储的文件:
1.7M
2.55M,2.55M
3.4M
3.4M,3.4M
2)切片过程
2.1)判断虚拟存储的文件大小是否大于 setlMaxIputSplitSize 值,大于等于则单独形成一个切片。
2.2)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
最终会形成3个切片:
(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

2.3 MapTask工作机制

整体流程:Read阶段—> Map阶段—> Collect阶段—> spill阶段—-> Combine阶段
2.4 Shuffle工作机制
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle

2.4.1. shuffle的Map端流程
整体流程:collect阶段—> Map阶段—> Collect阶段—> spill阶段—-> Combine阶段

sort
先把Kvbuffer中的数据按照partition值和key两个关键字升序排序,移动的只是索引数据,排序结果是Kvmeta中数据按照partition为单位聚集在一起,同一partition内的按照key有序。
环形缓冲区spill溢出写入到磁盘的数据存储如下形式

merge
Map任务如果输出数据量很大,可能会进行好几次Spill,out文件和Index文件会产生很多,分布在不同的磁盘上。最后把这些文件进行合并的merge。
从所有的本地目录上扫描得到Index文件,然后把索引信息存储在一个列表里。然后为merge过程创建一个叫file.out的文件和一个叫file.out.Index的文件用来存储最终的输出和索引。一个partition一个partition的进行合并输出。然后对这个partition对应的所有的segment进行合并,目标是合并成一个segment。当这个partition对应很多个segment时,会分批地进行合并

Partition分区
在进行MapReduce计算时,有时候须要把最终的输入数据分到不同的文件中,这时候,就须要用到分区。
通过job.setNumReduceTasks(x)设置多个分区,Partition的默认实现是根据key的hashCode对ReduceTask个数取模得到的,用户没法控制哪个key存储到哪个分区
如果不设置ReduceTask个数,它默认分区个数为1个
2.4.2 shuffle的Reduce端流程
1)Copy
Reduce任务通过HTTP向各个Map任务拖取它所需要的数据
每个节点都会启动一个常驻的HTTP server,其中一项服务就是响应Reduce拖取Map数据
内存存放:如果在内存中能放得下这次数据的话就直接把数据写到内存中,Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中
磁盘存放:如果在内存中不能放得下这个Map的数据的话,直接把Map数据写到磁盘上,在本地目录创建一个文件,从HTTP流中读取数据然后写到磁盘,使用的缓存区大小是64K
2)merge sort
这里使用的Merge和Map端使用的Merge过程一样。Map的输出数据已经是有序的,Merge进行一次归并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。
2.5 ReduceTask工作机制
整体流程:copy阶段—> merge阶段—> reducer阶段

1) Copy: Reducer通过Http方式得到输出文件的分区
reduce端可能从n个map的结果中获取数据,而这些map的执行速度不尽相同,当其中一个map运行结束时,reduce就会从JobTracker中获取该信息。map运行结束后TaskTracker会得到消息,进而将消息汇报给JobTracker,reduce定时从JobTracker获取该信息,reduce端默认有5个数据复制线程从map端复制数据
2)Merge如果形成多个磁盘文件会进行合并
从map端复制来的数据首先写到reduce端的缓存中,同样缓存占用到达一定阈值后会将数据写到磁盘中,同样会进行partition、combine、排序等过程。如果形成了多个磁盘文件还会进行合并,最后一次合并的结果作为reduce的输入而不是写入到磁盘中。
3) Reducer: 最后将合并后的结果作为输入传入Reduce任务中
Reduce任务个数的指定

2.6 . 并行度测定
maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度 那么,mapTask并行实例是否越多越好呢?其并行度又是如何决定呢
2.6.1 MapTask并行度
一个job的map阶段并行度由客户端在提交job时决定而客户端对map阶段并行度的规划的基本逻辑为: 将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理。这段逻辑及形成的切片规划描述文件,由FileInputFormat实现类的getSplits()方法完成,其过程如下图:

选择并发数的影响因素:
运算节点的硬件配置
运算任务的类型:CPU密集型(PI)还是IO密集型(distcp)
CPU密集: CPU/MEM = 1:2 (16C32G, 32C64GB)
IO密集:CPU/MEM=1:4; (8C32GB + 20TB, 4TB *5 )
运算任务的数据量
2.6.2 ReduceTask并行度
reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:
//默认值是1,手动设置为4
job.setNumReduceTasks(4); |
如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜
注意: reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask
尽量不要运行太多的reduce task。对大多数job来说,最好rduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小。这个对于小集群而言,尤其重要
3.MapReduce 性能调优
如果能够根据情况对shuffle过程进行调优,对于提供MapReduce性能很有帮助
一个通用的原则是给shuffle过程分配尽可能大的内存,当然你需要确保map和reduce有足够的内存来运行业务逻辑。因此在实现Mapper和Reducer时,应该尽量减少内存的使用,例如避免在Map中不断地叠加
运行map和reduce任务的JVM,内存通过mapred.child.java.opts属性来设置,尽可能设大内存。容器的内存大小通过mapreduce.map.memory.mb和mapreduce.reduce.memory.mb来设置,默认都是1024M
3.1 map优化:
在map端,避免写入多个spill文件可能达到最好的性能,一个spill文件是最好的。通过估计map的输出大小,设置合理的mapreduce.task.io.sort.*属性,使得spill文件数量最小。例如尽可能调大mapreduce.task.io.sort.mb
map端相关的属性如下表:
属性名 | 值类型 | 默认值 | 说明 |
mapreduce.task.io.sort.mb | int | 100 | 用于map输出排序的内存大小 |
mapreduce.map.sort.spill.percent | float | 0.80 | 开始spill的缓冲池阈值 |
mapreduce.task.io.sort.factor | int | 10 | 合并文件数最大值,与reduce共用 |
mapreduce.map.combine.minspills | int | 3 | 运行combiner的最低spill文件数 |
mapreduce.map.out.compress | boolean | false | 输出是否压缩 |
mapreduce.map.out.compress | 类名 | DefaultCodec | 压缩算法 |
mapreduce.shuffle.max.threads | int | 0 | 服务于reduce提取结果的线程数量 |
3.2 Reduce优化:
在reduce端,如果能够让所有数据都保存在内存中,可以达到最佳的性能。通常情况下,内存都保留给reduce函数,但是如果reduce函数对内存需求不是很高,将mapreduce.reduce.merge.inmem.threshold(触发合并的map输出文件数)设为0,mapreduce.reduce.input.buffer.percent(用于保存map输出文件的堆内存比例)设为1.0,可以达到很好的性能提升
属性名 | 类型 | 默认值 | 说明 |
mapreduce.reduce.shuffle.parallelcopies | int | 5 | 提取map输出的copier线程数 |
mapreduce.reduce.shuffle.maxfetchfailures | int | 10 | 提取map输出最大尝试次数,超出后报错 |
mapreduce.task.io.sort.factor | int | 10 | 合并文件数最大值,与map共用 |
mapreduce.reduce.shuffle.input.buffer.percent | float | 0.70 | copy阶段用于保存map输出的堆内存比例 |
mapreduce.reduce.shuffle.merge.percent | float | 0.66 | 开始spill的缓冲池比例阈值 |
mapreduce.reduce.shuffle.inmem.threshold | int | 1000 | 开始spill的map输出文件数阈值,小于等于0表示没有阈值,此时只由缓冲池比例来控制 |
mapreduce.reduce.input.buffer.percent | float | 0.0 | reduce函数开始运行时,内存中的map输出所占的堆内存比例不得高于这个值,默认情况内存都用于reduce函数,也就是map输出都写入到磁盘 |
3.3 通用优化
Hadoop默认使用4KB作为缓冲,这个算是很小的,可以通过io.file.buffer.size来调高缓冲池大小