分区计算

Employee

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package cn.studybigdata.hadoop.mapred.dpartition;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Employee implements Writable {
//员工ID,名字, 职位, otherID,雇佣时间,工资,奖金,部门号
//7369,SMITH,CLERK,7902,1980/12/17,800, ,20
private int empId;
private String name;
private String position;
private int otherId;
private String hireDate;
private int salary;
private int bonus;
private int deptId;

public int getEmpId() {
return empId;
}

public void setEmpId(int empId) {
this.empId = empId;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getPosition() {
return position;
}

public void setPosition(String position) {
this.position = position;
}

public int getOtherId() {
return otherId;
}

public void setOtherId(int otherId) {
this.otherId = otherId;
}

public String getHireDate() {
return hireDate;
}

public void setHireDate(String hireDate) {
this.hireDate = hireDate;
}

public int getSalary() {
return salary;
}

public void setSalary(int salary) {
this.salary = salary;
}

public int getBonus() {
return bonus;
}

public void setBonus(int bonus) {
this.bonus = bonus;
}

public int getDeptId() {
return deptId;
}

public void setDeptId(int deptId) {
this.deptId = deptId;
}


public Employee() {
}

public Employee(int empId, String name, String position, int otherId, String hireDate, int salary, int bonus, int deptId) {
this.empId = empId;
this.name = name;
this.position = position;
this.otherId = otherId;
this.hireDate = hireDate;
this.salary = salary;
this.bonus = bonus;
this.deptId = deptId;
}

@Override
public String toString() {
return "Employee{" +
"empId=" + empId +
", name='" + name + '\'' +
", position='" + position + '\'' +
", otherId=" + otherId +
", hireDate='" + hireDate + '\'' +
", salary=" + salary +
", bonus=" + bonus +
", deptId=" + deptId +
'}';
}

/**
* 序列化
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.empId);
out.writeUTF(this.name);
out.writeUTF(this.position);
out.writeInt(this.otherId);
out.writeUTF(this.hireDate);
out.writeInt(this.salary);
out.writeInt(this.bonus);
out.writeInt(this.deptId);
}

/**
* 反序 列化
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.empId = in.readInt();
this.name = in.readUTF();
this.position = in.readUTF();
this.otherId = in.readInt();
this.hireDate = in.readUTF();
this.salary = in.readInt();
this.bonus = in.readInt();
this.deptId = in.readInt();
}

}

Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package cn.studybigdata.hadoop.mapred.dpartition;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class DeptMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {
//7369,SMITH,CLERK,7902,1980/12/17,800,,20
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String lineContent = value.toString();
// [7369,SMITH,CLERK,7902,1980/12/17,800,,20]
String[] empArray = lineContent.split(",");

int empId = Integer.parseInt(empArray[0]);
String name = empArray[1];
String position = empArray[2];
int otherId = -1;
try {
otherId = Integer.parseInt(empArray[3]);
}catch (Exception e){
System.out.println("员工老板的id没有获取到,使用了前面设置的默认值: -1");
}
String hireDate = empArray[4];
int salary = Integer.parseInt(empArray[5]);
int bonus = 0;
try {
bonus = Integer.parseInt(empArray[6]);
}catch (Exception e){
System.out.println("无奖金,bonus设置为零");
}
int deptId = Integer.parseInt(empArray[7]);

Employee employee = new Employee(empId, name, position, otherId, hireDate, salary, bonus, deptId);

context.write(new IntWritable(deptId), employee);
}
}

Partitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package cn.studybigdata.hadoop.mapred.dpartition;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class DeptPartitioner extends Partitioner<IntWritable, Employee> {


@Override
public int getPartition(IntWritable key, Employee value, int numPartitions) {

if (key.get()%numPartitions==0){
return 0;
}else if (key.get()%numPartitions==1) {
return 1;
}else {
return 2;
}
}
}

Reducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package cn.studybigdata.hadoop.mapred.dpartition;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class DeptReduce extends Reducer<IntWritable, Employee, IntWritable, Text> {
@Override
protected void reduce(IntWritable dept, Iterable<Employee> employees, Context context) throws IOException, InterruptedException {

for (Employee employee : employees) {
context.write(dept, new Text(employee.toString()));
}
}
}
Main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package cn.studybigdata.hadoop.mapred.dpartition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class DeptMain {

//hadoop jar xxx.jar arg0 arg1
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration configuration = new Configuration();
//create a mapreduce job
Job job = Job.getInstance(configuration);


job.setJarByClass(DeptMain.class);

job.setMapperClass(DeptMapper.class);

job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Employee.class);

job.setNumReduceTasks(3);
job.setPartitionerClass(DeptPartitioner.class);

job.setReducerClass(DeptReduce.class);
//<word,3>
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);

//input path
//output path
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);

}
}

pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.18.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifest>
<mainClass>cn.studybigdata.hadoop.mapred.dpartition.DeptMain</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>