oplus_1048579

1.mapreduce特点:

1:良好的扩展性

2:高容错性

3:不擅长DAG(有向图)计算

多个应用程序存在依赖关系,后一个应用程序的输入为前一个应用程序的输出,在这种情况下,Mapreduce并不是不能做,而是使用后每个Mapreduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常低下

2.MapReduce进程

1. MRAppMaster:负责整个程序过程调度及状态协调

2. MapTask:负责map阶段整个数据处理流程

3. ReduceTask:负责reduce阶段整个数据处理流程

3. MR框架原理

4.mr整体流程

计算流程是:输入分片 —> map阶段 —> combiner阶段(可选) —> shuffle阶段 —> reduce阶段 —> 数据输出

4.1 inputformat数据输入

4.2 输入文件指定

对于文件的应用,提供四类静态方法提供job的输入路径,用来指定文件的读取路径

  • public static void addInputPath(Job job,Path path) throws IOException
  • public static void addInputPaths(Job job, String commaSeparatedPaths) throws IOException
  • public static void setInputPaths(Job job, Path… inputPaths) throws IOException
  • public static void setInputPaths(Job job,String commaSeparatedPaths) throws IOException

4.2.1FileInputFormat切片机制☆

  • 简单按照文件的内容长度进行切片
  • 切片大小,默认等于Block大小
  • 切片时不考虑数据集整体,而是逐个针对每个文件进行单独的切片切片大小,默认等于Block大小
切片前数据切片后数据
file1.txt      320M
file2.txt      10M
file1.txt.split1– 0~128
file1.txt.split2– 128~256
file1.txt.split3– 256~320
file2.txt.split1– 0~10M

源码中的切片大小设置

Math.max(minSize, Math.min(maxSize,blockSize));
mapreduce.input.fileinputformat.split.minsize=1 默认值1
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue  
默认为Long.MAXValue

因此默认情况下,切片大小等于= block

切片大小设置:

  • maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值
  • minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比bolckSize还大

4.2.2TextInputFormate切片机制

MapReduce默认使用的切片机制。在切片时不考虑数据集整体,也就是不考虑输入路径下其他文件,而是逐个对每一个文件单独切片

#文本内容
abc def ghi
hello world
Sadge Sadge Sadge

#键值对内容
< 0, abc def ghi >
< 13, hello world >
< 26, Sadge Sadge Sadge>

4.2.3 KeyValue切片机制

每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReaderKEY_ VALUE_ SEPERATOR, “\t”);来设定分隔符。默认分隔符是tab (\t)

以下是一个示例,输入是一个包含4条记录的分片。 其中—->表示一个(水平方向的)制表符。

line1—->Rich learning form
line2—->Intelligent learning engine
line3—->Learning more convenient
line4—->From the real demand for more close to the enterprise

切片后结果

(1ine1,Rich learning form)
(1ine2,Intelligent learning engine)
(line3,Learning more convenient)
(line4,From the real demand for more close to the enterprise)

4.2.4 NLineInputFormat切片机制

NlineInputFormat,代表每个map进程处理的InputSplit不再按Block块去划分,而是按NlineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数,如果不整除,切片数=/商+ 1。

Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enteprise

如果N是2,则每个输入分片包含两行,开启2个MapTask

第一个Map任务,执行首两行

(0,Rich learning form)
(19,Inelligent 1eaming engine)

第二个Map,执行后两行

(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)

4.2.5 CombinTextInputFormat切片机制

默认情况下TextInputformat对任务的切片机制是按文件规划切片,不管文件多小,都会有一个单独的切片,都会交给一个maptask,如果有大量的小文件,就会产生大量的maptask,处理效率及其低下

CombineTextInputFormat:用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个 MapTask 处理

设置方法:

// 如果不设置InputFormat,它默认用的是TextInputFormat.classjob.setInputFormatClass(CombineTextInputFormat.class); //虚拟存储切片最大值设置4mCombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

假设 setMaxInputSplitSize 值为 4M,有如下四个文件

a.txt 1.7M

b.txt 5.1M

c.txt 3.4M

d.txt 6.8M

1)虚拟存储过程

1.1)将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不大于设置的最大值,逻辑上划分一个块。
1.2)如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块,当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。

1.7M < 4M 划分一块

5.1M > 4M 但是小于 2*4M 划分二块:块1=2.55M,块2=2.55M

3.4M < 4M 划分一块

6.8M > 4M 但是小于 2*4M 划分二块:块1=3.4M,块2=3.4M

最终存储的文件:

1.7M

2.55M,2.55M

3.4M

3.4M,3.4M

2)切片过程

2.1)判断虚拟存储的文件大小是否大于 setlMaxIputSplitSize 值,大于等于则单独形成一个切片。

2.2)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。

最终会形成3个切片:

(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

2.3 MapTask工作机制

整体流程:Read阶段—> Map阶段—> Collect阶段—> spill阶段—-> Combine阶段

2.4 Shuffle工作机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle

2.4.1. shuffle的Map端流程

整体流程:collect阶段—> Map阶段—> Collect阶段—> spill阶段—-> Combine阶段

sort

先把Kvbuffer中的数据按照partition值和key两个关键字升序排序,移动的只是索引数据,排序结果是Kvmeta中数据按照partition为单位聚集在一起,同一partition内的按照key有序。

环形缓冲区spill溢出写入到磁盘的数据存储如下形式

merge

Map任务如果输出数据量很大,可能会进行好几次Spill,out文件和Index文件会产生很多,分布在不同的磁盘上。最后把这些文件进行合并的merge。

从所有的本地目录上扫描得到Index文件,然后把索引信息存储在一个列表里。然后为merge过程创建一个叫file.out的文件和一个叫file.out.Index的文件用来存储最终的输出和索引。一个partition一个partition的进行合并输出。然后对这个partition对应的所有的segment进行合并,目标是合并成一个segment。当这个partition对应很多个segment时,会分批地进行合并

Partition分区

在进行MapReduce计算时,有时候须要把最终的输入数据分到不同的文件中,这时候,就须要用到分区。

通过job.setNumReduceTasks(x)设置多个分区,Partition的默认实现是根据key的hashCode对ReduceTask个数取模得到的,用户没法控制哪个key存储到哪个分区

如果不设置ReduceTask个数,它默认分区个数为1个

2.4.2 shuffle的Reduce端流程

1)Copy

Reduce任务通过HTTP向各个Map任务拖取它所需要的数据

每个节点都会启动一个常驻的HTTP server,其中一项服务就是响应Reduce拖取Map数据

内存存放:如果在内存中能放得下这次数据的话就直接把数据写到内存中,Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中

磁盘存放:如果在内存中不能放得下这个Map的数据的话,直接把Map数据写到磁盘上,在本地目录创建一个文件,从HTTP流中读取数据然后写到磁盘,使用的缓存区大小是64K

2)merge sort

这里使用的Merge和Map端使用的Merge过程一样。Map的输出数据已经是有序的,Merge进行一次归并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。

2.5 ReduceTask工作机制

整体流程:copy阶段—> merge阶段—> reducer阶段

1)  Copy: Reducer通过Http方式得到输出文件的分区

reduce端可能从n个map的结果中获取数据,而这些map的执行速度不尽相同,当其中一个map运行结束时,reduce就会从JobTracker中获取该信息。map运行结束后TaskTracker会得到消息,进而将消息汇报给JobTracker,reduce定时从JobTracker获取该信息,reduce端默认有5个数据复制线程从map端复制数据

2)Merge如果形成多个磁盘文件会进行合并

从map端复制来的数据首先写到reduce端的缓存中,同样缓存占用到达一定阈值后会将数据写到磁盘中,同样会进行partition、combine、排序等过程。如果形成了多个磁盘文件还会进行合并,最后一次合并的结果作为reduce的输入而不是写入到磁盘中。

3) Reducer: 最后将合并后的结果作为输入传入Reduce任务中

Reduce任务个数的指定

2.6 . 并行度测定

maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度 那么,mapTask并行实例是否越多越好呢?其并行度又是如何决定呢

2.6.1 MapTask并行度

一个job的map阶段并行度由客户端在提交job时决定而客户端对map阶段并行度的规划的基本逻辑为: 将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理。这段逻辑及形成的切片规划描述文件,由FileInputFormat实现类的getSplits()方法完成,其过程如下图:

选择并发数的影响因素:

    运算节点的硬件配置
    运算任务的类型:CPU密集型(PI)还是IO密集型(distcp)
     CPU密集: CPU/MEM = 1:2 (16C32G, 32C64GB)

IO密集:CPU/MEM=1:4; (8C32GB + 20TB, 4TB *5 )

运算任务的数据量

2.6.2 ReduceTask并行度

reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:

//默认值是1,手动设置为4

job.setNumReduceTasks(4);

如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜

注意: reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask
  尽量不要运行太多的reduce task。对大多数job来说,最好rduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小。这个对于小集群而言,尤其重要

3.MapReduce 性能调优

如果能够根据情况对shuffle过程进行调优,对于提供MapReduce性能很有帮助

一个通用的原则是给shuffle过程分配尽可能大的内存,当然你需要确保map和reduce有足够的内存来运行业务逻辑。因此在实现Mapper和Reducer时,应该尽量减少内存的使用,例如避免在Map中不断地叠加

运行map和reduce任务的JVM,内存通过mapred.child.java.opts属性来设置,尽可能设大内存。容器的内存大小通过mapreduce.map.memory.mb和mapreduce.reduce.memory.mb来设置,默认都是1024M

3.1 map优化:

在map端,避免写入多个spill文件可能达到最好的性能,一个spill文件是最好的。通过估计map的输出大小,设置合理的mapreduce.task.io.sort.*属性,使得spill文件数量最小。例如尽可能调大mapreduce.task.io.sort.mb

map端相关的属性如下表:

属性名值类型默认值说明
mapreduce.task.io.sort.mbint100用于map输出排序的内存大小
mapreduce.map.sort.spill.percentfloat0.80开始spill的缓冲池阈值
mapreduce.task.io.sort.factorint10合并文件数最大值,与reduce共用
mapreduce.map.combine.minspillsint3运行combiner的最低spill文件数
mapreduce.map.out.compressbooleanfalse输出是否压缩
mapreduce.map.out.compress类名DefaultCodec压缩算法
mapreduce.shuffle.max.threadsint0服务于reduce提取结果的线程数量

3.2 Reduce优化:

在reduce端,如果能够让所有数据都保存在内存中,可以达到最佳的性能。通常情况下,内存都保留给reduce函数,但是如果reduce函数对内存需求不是很高,将mapreduce.reduce.merge.inmem.threshold(触发合并的map输出文件数)设为0,mapreduce.reduce.input.buffer.percent(用于保存map输出文件的堆内存比例)设为1.0,可以达到很好的性能提升

属性名类型默认值说明
mapreduce.reduce.shuffle.parallelcopiesint5提取map输出的copier线程数
mapreduce.reduce.shuffle.maxfetchfailuresint10提取map输出最大尝试次数,超出后报错
mapreduce.task.io.sort.factorint10合并文件数最大值,与map共用
mapreduce.reduce.shuffle.input.buffer.percentfloat0.70copy阶段用于保存map输出的堆内存比例
mapreduce.reduce.shuffle.merge.percentfloat0.66开始spill的缓冲池比例阈值
mapreduce.reduce.shuffle.inmem.thresholdint1000开始spill的map输出文件数阈值,小于等于0表示没有阈值,此时只由缓冲池比例来控制
mapreduce.reduce.input.buffer.percentfloat0.0reduce函数开始运行时,内存中的map输出所占的堆内存比例不得高于这个值,默认情况内存都用于reduce函数,也就是map输出都写入到磁盘

3.3 通用优化

Hadoop默认使用4KB作为缓冲,这个算是很小的,可以通过io.file.buffer.size来调高缓冲池大小

作者 admin

张宴银,大数据开发工程师

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注