MapReduce工作流程

image-20220612193029275

  • 不同的Map任务之间不会进行通信

  • 不同的Reduce任务之间也不会发生任何信息交换

  • 用户不能显式地从一台机器向另一台机器发送消息

  • 所有的数据交换都是通过MapReduce框架自身去实现的

Split

image-20220612193157103

HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。

默认切片规则

file < Block*1.1

每个文件对应一个切片。一个切片对应一个Map进程,所以对于大量小文件场景,会启动很多的Map进程。

Map数量

image-20220612193234798

Hadoop为每个split创建一个Map任务,split 的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一个HDFS块

Reduce数量

与Partitioner有关;

最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目

•通常设置比reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)

Shuffle过程详解

image-20220612193504374

Map端Shuffle

image-20220612193545292

•每个Map任务分配一个缓存,默认100MB;

•设置溢写比例0.8

•分区默认采用哈希函数

•排序是默认的操作

•排序后可以合并(Combine)合并不能改变最终结果(比如平均值,不可使用combiner)

•在Map任务全部结束之前进行归并

•归并得到一个大的文件,放在本地磁盘

•文件归并时,如果溢写文件数量大于预定值(默认是3)则可以再次启动Combiner,少于3不需要

•JobTracker会一直监测Map任务的执行,并通知Reduce任务来领取数据

合并(Combine)和归并(Merge)的区别:

两个键值对<“a”,1>和<“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1>>

Reduce端Shuffle

image-20220612193624044

•Reduce任务通过RPC向JobTracker询问Map任务是否已经完成,若完成,则领取数据

•Reduce领取数据先放入缓存,来自不同Map机器,先归并,再合并,写入磁盘

•多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的

•当数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce

MapReduce执行过程回顾

image-20220612193731268

小文件

1. 小文件概述

  小文件通常指文件大小要比HDFS块大小还要小很多的文件(在hadoop1.x版本的时候可以通过dfs.blocksize来设置,默认块大小为64M;在hadoop2.x版本的时候,则需要通过dfs.block.size设置,且默认大小为128M)

  如果存在大量小文件,则会对整个存储系统有一定影响:

  (1)1个文件块占用namenode 150字节内存,大量小文件会占用namenode内存,影响HDFS的横向扩展能力

  (2)如果使用mapreduce处理小文件,则会增加map任务数量,增加寻址次数

2. 如何解决

1. 采用HAR(Hadoop Archives)的归档方式。

  HAR为构建在其它文件系统上用于文件存档的文件系统,通常将hdfs中的多个文件打包成一个存档文件,减少namenode内存的使用,可以直接使用hadoop archive命令创建HAR文件。创建HAR的过程是在运行一个mr作业。

  har在对小文件进行存档后,原文件不会被删除,且创建之后不能改变,文件名中也不能有空格存在,否则会报异常。

具体归档操作流程

https://blog.csdn.net/asd623444055/article/details/123919597

2. 采用CombineFileInputFormat

  CombineFileInputFormat是一种新的inputformat,用于将多个文件合成一个单独的split,而且它还可以考虑数据的存储位置

3. 开启JVM重用

  JVM重用可以使得JVM实例在同一个job中重新使用N次,N的值可以在hadoop的mapred-site.xml文件中进行配置,通常在10-20之间。如果没有小文件,不要开启JVM重用,因为会一直占用使用到的task卡槽,直到任务完成才释放。

1
2
3
4
5
<property>
  <name>mapreduce.job.jvm.numtasks</name>
  <value>10</value>
  <description>How many tasks to run per jvm,if set to -1 ,there is no limit</description>
</property>

转载自http://t.zoukankan.com/s1023-p-13216260.html

数据倾斜

  1. Key前加1-n
  2. 增加Reducer
  3. 重写Partitioner

Windows开发环境搭建

Maven

Maven下载

1
https://archive.apache.org

Maven配置

MAVEN_HOME/conf/settings.xml
1
2
3
4
5
6
7
8
9
10
11
12
<!-- 本地仓库: 根据自己的情况修改保存位置 -->
<localRepository>D:\maven-repo</localRepository>

<!-- 远程仓库: aliyun仓库 -->
<mirrors>
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
</mirrors>

安装IDEA

IDEA绑定Maven

image-20221029182326941

创建Maven项目

image-20221029182538750

安装Hadoop

替换bin目录

1
2
3
https://github.com/cdarlint/winutils
或者
https://github.com/A-stranger/studybigdata/tree/master/hadoop-2.7.3/bin

配置环境变量

HADOOP_HOME

image-20221028150513734

PATH

image-20221030141057578

WordCount

WordCountMapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package cn.studybigdata.hadoop.mapred.awordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String[] wordArray = value.toString().split(" ");

IntWritable one = new IntWritable(1);

for (String word : wordArray) {
context.write(new Text(word), one);
}
}
}
WordCountReducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package cn.studybigdata.hadoop.mapred.awordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int count = 0;
for (IntWritable value : values) {
int i = value.get();
count += i;
}
context.write(key, new IntWritable(count));
}
}
WordCountMain
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package cn.studybigdata.hadoop.mapred.awordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.io.Text;
import java.io.IOException;

public class WordCountMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

job.setJarByClass(WordCountMain.class);

job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);

}
}
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
</dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifest>
<mainClass>cn.studybigdata.hadoop.mapred.awordcount.WordCountMain</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
Run Configuration

这样我们就可以读取本地文件,进行代码调试。

image-20221030141758287