Hadoop Serialize
Hadoop MapReduce 自定义序列化实战:部门工资总额统计
一、实验基本信息
- 实验名称:基于 Hadoop MapReduce 的部门工资总额统计(自定义 Writable 序列化)
- 实验目标:
- 巩固 Hadoop MapReduce 核心架构(Mapper、Reducer)知识
- 掌握自定义 Writable 序列化对象的编写规范
- 实现从原始员工数据解析到部门工资总额聚合的完整流程
- 理解分布式计算中数据传输的序列化原理
- 实验环境:Hadoop 2.x/3.x 集群(或本地单机模式)、JDK 1.8、IDE(IntelliJ IDEA/Eclipse)
二、实验原理与基础知识巩固(入门阶段)
2.1 核心概念回顾
- MapReduce 核心流程:输入数据 → Mapper 解析映射 → Shuffle(排序+分组) → Reducer 聚合计算 → 输出结果
- 序列化的意义:Hadoop 分布式计算中,数据需在节点间传输,序列化将对象转为字节流,反序列化则恢复对象,自定义 Writable 可适配业务数据类型
- 关键组件职责:
- Mapper:读取原始数据,解析为业务对象,输出 < 分组键, 业务对象 > 中间结果
- Reducer:接收同一分组键的所有值,进行聚合计算(如求和、计数)
- Writable 接口:Hadoop 序列化标准,需实现
write()(序列化)和readFields()(反序列化)方法
2.2 基础思考题(巩固认知)
为什么 Hadoop 不直接使用 Java 自带的 Serializable 接口,而需要自定义 Writable?
答案:Writable 更轻量、高效,适配 Hadoop 分布式场景的性能需求
Mapper 的输入键(LongWritable)对应的是原始数据的什么信息?
答案:数据行号,无业务意义,仅为 Hadoop 默认生成
三、实验步骤(循序渐进)
阶段一:环境准备与项目搭建
创建 Maven 项目,在
pom.xml中添加 Hadoop 核心依赖(MapReduce、HDFS 相关)1
2
3
4
5<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>准备测试数据:在本地创建
emp.txt文件,写入员工数据(格式:empId,name,position,otherId,hireDate,salary,bonus,deptId)1
2
3
4
57369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
阶段二:编写自定义 Writable 类(Employee)
任务目标:实现员工对象的序列化与反序列化,适配 Hadoop 数据传输
创建
Employee类,实现Writable接口定义员工属性:
empId(int)、name(String)、position(String)、otherId(int)、hireDate(String)、salary(int)、bonus(int)、deptId(int)1
2
3
4
5
6
7
8private int empId;
private String name;
private String position;
private int otherId;
private String hireDate;
private int salary;
private int bonus;
private int deptId;编写 getter/setter 方法(用于后续 Mapper/Reducer 获取属性值)
1
编写构造函数:必须包含无参构造函数(Hadoop 反射实例化需要),以及带参构造函数(快速初始化对象)
1
实现
write()方法(序列化逻辑):- 按属性顺序“int→String→String→int→String→int→int→int”写入字段
- 注意:
int类型用out.writeInt(),String类型用out.writeUTF()
1
2
3public void write(DataOutput out) throws IOException {
}实现
readFields()方法(反序列化逻辑):- 必须与
write()方法的字段顺序完全一致(否则解析错乱) - 用
in.readInt()读取 int 字段,in.readUTF()读取 String 字段
1
2
3public void readFields(DataInput in) throws IOException {
}- 必须与
测试验证:编写简单main方法,将 Employee 对象序列化后再反序列化,检查属性值是否一致
阶段三:编写 Mapper 组件(DeptMapper)
任务目标:解析原始文本数据,生成 < 部门号, 员工对象 > 中间键值对
- 创建
DeptMapper类,继承Mapper<LongWritable, Text, IntWritable, Employee>- 输入泛型:< 行号(LongWritable),行数据(Text)>
- 输出泛型:< 部门号(IntWritable),员工对象(Employee)>
- 实现
map()方法核心逻辑:- 第一步:将输入
value(Text 类型)转为字符串,用逗号,分割为字符串数组 - 第二步:解析数组元素,处理异常场景:
otherId可能为空,解析失败时设默认值 -1bonus可能为空,解析失败时设默认值 0- 其他字段(如
empId、salary)直接用Integer.parseInt()解析
- 第三步:用解析后的字段创建
Employee对象 - 第四步:通过
context.write(new IntWritable(deptId), employee)输出中间结果
- 第一步:将输入
- 注意事项:分割数组长度需为 8,避免数组越界异常;解析数字时需捕获
NumberFormatException
记得空值判断或者捕获异常
阶段四:编写 Reducer 组件(DeptReduce)
任务目标:按部门号分组,累加该部门所有员工的工资总额
- 创建
DeptReducer类,继承Reducer<IntWritable, Employee, IntWritable, IntWritable>- 输入泛型:< 部门号(IntWritable),该部门所有员工(Iterable
)> - 输出泛型:< 部门号(IntWritable),工资总额(IntWritable)>
- 输入泛型:< 部门号(IntWritable),该部门所有员工(Iterable
- 实现
reduce()方法核心逻辑:- 第一步:初始化累加变量
totalSalary,初始值为 0 - 第二步:遍历
Iterable<Employee>集合,获取每个员工的salary属性,累加到totalSalary - 第三步:通过
context.write(deptId, new IntWritable(totalSalary))输出最终结果
- 第一步:初始化累加变量
- 关键理解:
Iterable<Employee>对应的是同一部门的所有员工对象,MapReduce 已自动完成分组
阶段五:编写 Driver 驱动类(DeptDriver)
任务目标:配置 MapReduce 作业,指定各组件、输入输出路径,提交作业
1 | job.setMapOutputValueClass(Employee.class); |
阶段六:程序运行与结果验证
本地模式运行:在 IDE 中直接运行
DeptDriver的 main 方法,需确保 Hadoop 配置正确1
file:///D:/input/emp.csv file:///D:/out
集群模式运行:
- 将项目打包为 JAR 文件(如
dept-salary.jar) - 上传 JAR 文件和
emp.txt到 Hadoop 集群 - 执行命令:
hadoop jar dept-salary.jar cn.studybigdata.hadoop.mapred.cserialize.DeptDriver /input/emp.txt /output/dept_salary
- 将项目打包为 JAR 文件(如
结果验证:查看输出目录下的
part-r-00000文件,确认部门工资总额是否正确- 预期结果(基于测试数据):20 部门工资总额 3775(800+2975),30 部门工资总额 4100(1600+1250+1250)
阶段七:异常排查
- 序列化/反序列化错乱:检查
write()和readFields()方法的字段顺序是否一致 - 数组越界异常:确保原始数据每行字段数为 8,无多余或缺失的逗号
- 输出路径已存在:修改输出路径为不存在的目录,或先删除原有目录
四、实验拓展
- 功能拓展:在原有基础上,计算每个部门的平均工资(需统计员工人数)
- 数据拓展:增加员工奖金参与计算(工资+奖金总和),处理奖金为空的场景
五、实验总结
- 核心知识点回顾:自定义 Writable 序列化规范、MapReduce 组件协作流程、分布式数据聚合逻辑
- 重点难点梳理:序列化/反序列化的字段顺序一致性、原始数据的异常处理、作业配置的关键参数
- 实践收获:理解 Hadoop 分布式计算的核心思想,掌握从业务需求到 MapReduce 程序实现的完整流程