MapReduce工作流程
Split
HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。
默认切片规则 file < Block*1.1 每个文件对应一个切片。一个切片对应一个Map进程,所以对于大量小文件场景,会启动很多的Map进程。
Map数量
Hadoop为每个split创建一个Map任务,split 的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一个HDFS块
Reduce数量 与Partitioner有关;
最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目
•通常设置比reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)
Shuffle过程详解
Map端Shuffle
•每个Map任务分配一个缓存,默认100MB;
•设置溢写比例0.8
•分区默认采用哈希函数
•排序是默认的操作
•排序后可以合并(Combine)合并不能改变最终结果(比如平均值,不可使用combiner)
•在Map任务全部结束之前进行归并
•归并得到一个大的文件,放在本地磁盘
•文件归并时,如果溢写文件数量大于预定值(默认是3)则可以再次启动Combiner,少于3不需要
•JobTracker会一直监测Map任务的执行,并通知Reduce任务来领取数据
合并(Combine)和归并(Merge)的区别:
两个键值对<“a”,1>和<“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1>>
Reduce端Shuffle
•Reduce任务通过RPC向JobTracker询问Map任务是否已经完成,若完成,则领取数据
•Reduce领取数据先放入缓存,来自不同Map机器,先归并,再合并,写入磁盘
•多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的
•当数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce
MapReduce执行过程回顾
小文件 1. 小文件概述 小文件通常指文件大小要比HDFS块大小还要小很多的文件(在hadoop1.x版本的时候可以通过dfs.blocksize来设置,默认块大小为64M;在hadoop2.x版本的时候,则需要通过dfs.block.size设置,且默认大小为128M)
如果存在大量小文件,则会对整个存储系统有一定影响:
(1)1个文件块占用namenode 150字节内存,大量小文件会占用namenode内存,影响HDFS的横向扩展能力
(2)如果使用mapreduce处理小文件,则会增加map任务数量,增加寻址次数
2. 如何解决 1. 采用HAR(Hadoop Archives)的归档方式。 HAR为构建在其它文件系统上用于文件存档的文件系统,通常将hdfs中的多个文件打包成一个存档文件,减少namenode内存的使用,可以直接使用hadoop archive命令创建HAR文件。创建HAR的过程是在运行一个mr作业。
har在对小文件进行存档后,原文件不会被删除,且创建之后不能改变,文件名中也不能有空格存在,否则会报异常。
具体归档操作流程
https://blog.csdn.net/asd623444055/article/details/123919597
CombineFileInputFormat 是一种新的inputformat ,用于将多个文件合成一个单独的split ,而且它还可以考虑数据的存储位置
3. 开启JVM重用 JVM重用可以使得JVM实例在同一个job中重新使用N次,N的值可以在hadoop的mapred-site.xml文件中进行配置,通常在10-20之间。如果没有小文件,不要开启JVM重用,因为会一直占用使用到的task卡槽,直到任务完成才释放。
1 2 3 4 5 <property > <name > mapreduce.job.jvm.numtasks</name > <value > 10</value > <description > How many tasks to run per jvm,if set to -1 ,there is no limit</description > </property >
转载自http://t.zoukankan.com/s1023-p-13216260.html
数据倾斜
Key前加1-n
增加Reducer
重写Partitioner
Windows开发环境搭建 Maven Maven下载 1 https://archive.apache.org
Maven配置 MAVEN_HOME/conf/settings.xml 1 2 3 4 5 6 7 8 9 10 11 12 <localRepository > D:\maven-repo</localRepository > <mirrors > <mirror > <id > alimaven</id > <name > aliyun maven</name > <url > http://maven.aliyun.com/nexus/content/groups/public/</url > <mirrorOf > central</mirrorOf > </mirror > </mirrors >
安装IDEA IDEA绑定Maven
创建Maven项目
安装Hadoop 替换bin目录 1 2 3 https://github.com/cdarlint/winutils 或者 https://github.com/A-stranger/studybigdata/tree/master/hadoop-2.7.3/bin
配置环境变量 HADOOP_HOME
PATH
WordCount WordCountMapper 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package cn.studybigdata.hadoop.mapred.awordcount;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordCountMapper extends Mapper <LongWritable, Text, Text, IntWritable> { @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] wordArray = value.toString().split(" " ); IntWritable one = new IntWritable (1 ); for (String word : wordArray) { context.write(new Text (word), one); } } }
WordCountReducer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package cn.studybigdata.hadoop.mapred.awordcount;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReduce extends Reducer <Text, IntWritable, Text, IntWritable> { @Override protected void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0 ; for (IntWritable value : values) { int i = value.get(); count += i; } context.write(key, new IntWritable (count)); } }
WordCountMain 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package cn.studybigdata.hadoop.mapred.awordcount;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.io.Text;import java.io.IOException;public class WordCountMain { public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration (); Job job = Job.getInstance(configuration); job.setJarByClass(WordCountMain.class); job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); job.waitForCompletion(true ); } }
pom.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 </dependencies > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-client</artifactId > <version > 2.7.3</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-jar-plugin</artifactId > <version > 2.6</version > <configuration > <archive > <manifest > <mainClass > cn.studybigdata.hadoop.mapred.awordcount.WordCountMain</mainClass > </manifest > </archive > </configuration > </plugin > </plugins > </build >
Run Configuration 这样我们就可以读取本地文件,进行代码调试。