Hadoop MapReduce 自定义序列化实战:部门工资总额统计

一、实验基本信息

  • 实验名称:基于 Hadoop MapReduce 的部门工资总额统计(自定义 Writable 序列化)
  • 实验目标:
    1. 巩固 Hadoop MapReduce 核心架构(Mapper、Reducer)知识
    2. 掌握自定义 Writable 序列化对象的编写规范
    3. 实现从原始员工数据解析到部门工资总额聚合的完整流程
    4. 理解分布式计算中数据传输的序列化原理
  • 实验环境:Hadoop 2.x/3.x 集群(或本地单机模式)、JDK 1.8、IDE(IntelliJ IDEA/Eclipse)

二、实验原理与基础知识巩固(入门阶段)

2.1 核心概念回顾

  1. MapReduce 核心流程:输入数据 → Mapper 解析映射 → Shuffle(排序+分组) → Reducer 聚合计算 → 输出结果
  2. 序列化的意义:Hadoop 分布式计算中,数据需在节点间传输,序列化将对象转为字节流,反序列化则恢复对象,自定义 Writable 可适配业务数据类型
  3. 关键组件职责
    • Mapper:读取原始数据,解析为业务对象,输出 < 分组键, 业务对象 > 中间结果
    • Reducer:接收同一分组键的所有值,进行聚合计算(如求和、计数)
    • Writable 接口:Hadoop 序列化标准,需实现 write()(序列化)和 readFields()(反序列化)方法

2.2 基础思考题(巩固认知)

  1. 为什么 Hadoop 不直接使用 Java 自带的 Serializable 接口,而需要自定义 Writable?

    答案:Writable 更轻量、高效,适配 Hadoop 分布式场景的性能需求

  2. Mapper 的输入键(LongWritable)对应的是原始数据的什么信息?

    答案:数据行号,无业务意义,仅为 Hadoop 默认生成

三、实验步骤(循序渐进)

阶段一:环境准备与项目搭建

  1. 创建 Maven 项目,在 pom.xml 中添加 Hadoop 核心依赖(MapReduce、HDFS 相关)

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.3</version>
    </dependency>
  2. 准备测试数据:在本地创建 emp.txt 文件,写入员工数据(格式:empId,name,position,otherId,hireDate,salary,bonus,deptId

    1
    2
    3
    4
    5
    7369,SMITH,CLERK,7902,1980/12/17,800,,20
    7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
    7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
    7566,JONES,MANAGER,7839,1981/4/2,2975,,20
    7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30

阶段二:编写自定义 Writable 类(Employee)

任务目标:实现员工对象的序列化与反序列化,适配 Hadoop 数据传输

  1. 创建 Employee 类,实现 Writable 接口

  2. 定义员工属性:empId(int)、name(String)、position(String)、otherId(int)、hireDate(String)、salary(int)、bonus(int)、deptId(int)

    1
    2
    3
    4
    5
    6
    7
    8
    private int empId;
    private String name;
    private String position;
    private int otherId;
    private String hireDate;
    private int salary;
    private int bonus;
    private int deptId;
  3. 编写 getter/setter 方法(用于后续 Mapper/Reducer 获取属性值)

    1
       
  4. 编写构造函数:必须包含无参构造函数(Hadoop 反射实例化需要),以及带参构造函数(快速初始化对象)

    1
       
  5. 实现 write() 方法(序列化逻辑):

    • 按属性顺序“int→String→String→int→String→int→int→int”写入字段
    • 注意:int 类型用 out.writeInt()String 类型用 out.writeUTF()
    1
    2
    3
    public void write(DataOutput out) throws IOException {

    }
  6. 实现 readFields() 方法(反序列化逻辑):

    • 必须与 write() 方法的字段顺序完全一致(否则解析错乱)
    • in.readInt() 读取 int 字段,in.readUTF() 读取 String 字段
    1
    2
    3
    public void readFields(DataInput in) throws IOException {

    }
  7. 测试验证:编写简单main方法,将 Employee 对象序列化后再反序列化,检查属性值是否一致

阶段三:编写 Mapper 组件(DeptMapper)

任务目标:解析原始文本数据,生成 < 部门号, 员工对象 > 中间键值对

  1. 创建 DeptMapper 类,继承 Mapper<LongWritable, Text, IntWritable, Employee>
    • 输入泛型:< 行号(LongWritable),行数据(Text)>
    • 输出泛型:< 部门号(IntWritable),员工对象(Employee)>
  2. 实现 map() 方法核心逻辑:
    • 第一步:将输入 value(Text 类型)转为字符串,用逗号 , 分割为字符串数组
    • 第二步:解析数组元素,处理异常场景:
      • otherId 可能为空,解析失败时设默认值 -1
      • bonus 可能为空,解析失败时设默认值 0
      • 其他字段(如 empIdsalary)直接用 Integer.parseInt() 解析
    • 第三步:用解析后的字段创建 Employee 对象
    • 第四步:通过 context.write(new IntWritable(deptId), employee) 输出中间结果
  3. 注意事项:分割数组长度需为 8,避免数组越界异常;解析数字时需捕获 NumberFormatException

记得空值判断或者捕获异常

阶段四:编写 Reducer 组件(DeptReduce)

任务目标:按部门号分组,累加该部门所有员工的工资总额

  1. 创建 DeptReducer 类,继承 Reducer<IntWritable, Employee, IntWritable, IntWritable>
    • 输入泛型:< 部门号(IntWritable),该部门所有员工(Iterable)>
    • 输出泛型:< 部门号(IntWritable),工资总额(IntWritable)>
  2. 实现 reduce() 方法核心逻辑:
    • 第一步:初始化累加变量 totalSalary,初始值为 0
    • 第二步:遍历 Iterable<Employee> 集合,获取每个员工的 salary 属性,累加到 totalSalary
    • 第三步:通过 context.write(deptId, new IntWritable(totalSalary)) 输出最终结果
  3. 关键理解:Iterable<Employee> 对应的是同一部门的所有员工对象,MapReduce 已自动完成分组

阶段五:编写 Driver 驱动类(DeptDriver)

任务目标:配置 MapReduce 作业,指定各组件、输入输出路径,提交作业

1
job.setMapOutputValueClass(Employee.class);

阶段六:程序运行与结果验证

  1. 本地模式运行:在 IDE 中直接运行 DeptDriver 的 main 方法,需确保 Hadoop 配置正确

    1
    file:///D:/input/emp.csv file:///D:/out
  2. 集群模式运行:

    • 将项目打包为 JAR 文件(如 dept-salary.jar
    • 上传 JAR 文件和 emp.txt 到 Hadoop 集群
    • 执行命令:hadoop jar dept-salary.jar cn.studybigdata.hadoop.mapred.cserialize.DeptDriver /input/emp.txt /output/dept_salary
  3. 结果验证:查看输出目录下的 part-r-00000 文件,确认部门工资总额是否正确

    • 预期结果(基于测试数据):20 部门工资总额 3775(800+2975),30 部门工资总额 4100(1600+1250+1250)

阶段七:异常排查

  • 序列化/反序列化错乱:检查 write()readFields() 方法的字段顺序是否一致
  • 数组越界异常:确保原始数据每行字段数为 8,无多余或缺失的逗号
  • 输出路径已存在:修改输出路径为不存在的目录,或先删除原有目录

四、实验拓展

  1. 功能拓展:在原有基础上,计算每个部门的平均工资(需统计员工人数)
  2. 数据拓展:增加员工奖金参与计算(工资+奖金总和),处理奖金为空的场景

五、实验总结

  1. 核心知识点回顾:自定义 Writable 序列化规范、MapReduce 组件协作流程、分布式数据聚合逻辑
  2. 重点难点梳理:序列化/反序列化的字段顺序一致性、原始数据的异常处理、作业配置的关键参数
  3. 实践收获:理解 Hadoop 分布式计算的核心思想,掌握从业务需求到 MapReduce 程序实现的完整流程