public class ReducerJoin {
public static class ValueFlag implements Writable {
private String value;
private String flag;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(value);
out.writeUTF(flag);
}
public void readFields(DataInput in) throws IOException {
this.value = in.readUTF();
this.flag = in.readUTF();
}
}
// map读取两个文件 根据来源把每个kv对打上标签 输出给reduce可以必须是关联字段
public static class ReducerJoinMap extends Mapper<LongWritable, Text, Text, ValueFlag> {
private FileSplit fileSplit;
private String fileName;
private String[] infos;
private Text oKey = new Text();
private ValueFlag oValue = new ValueFlag();
@Override
protected void setup(Mapper<LongWritable, Text, Text, ValueFlag>.Context context)
throws IOException, InterruptedException {
fileSplit = (FileSplit) context.getInputSplit();
if (fileSplit.getPath().toString().contains("user-logs-large.txt")) {
fileName = "userLogsLarge";
} else if (fileSplit.getPath().toString().contains("user_info.txt")) {
fileName = "userInfo";
}
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ValueFlag>.Context context)
throws IOException, InterruptedException {
infos = value.toString().split("\\s");
oValue.setFlag(fileName);
if (fileName.equals("userLogsLarge")) {
// 解析user-logs-large.txt
oKey.set(infos[0]);
oValue.setValue(infos[1] + "\t" + infos[2]);
context.write(oKey, oValue);
} else if (fileName.equals("userInfo")) {
// 解析user_infos.txt
oKey.set(infos[0]);
oValue.setValue(infos[1] + "\t" + infos[2]);
context.write(oKey, oValue);
}
}
}
// 接受map发送过来的kv队 根据value中的flag把同一个key对应的value分组
// 那么两组中的数据就是分别来自两张表中的数据 对这两组数据做笛卡尔乘机即完成关联
public static class ReducerJoinReducer extends Reducer<Text, ValueFlag, AvroKey<UserActionLog>, NullWritable> {
private List<String> userLogsLargeList;
private List<String> userInfosList;
private NullWritable outValue = NullWritable.get();
private AvroKey<UserActionLog> outKey = new AvroKey<UserActionLog>();
private String[] infos;
@Override
protected void reduce(Text key, Iterable<ValueFlag> values,
Reducer<Text, ValueFlag, AvroKey<UserActionLog>, NullWritable>.Context context)
throws IOException, InterruptedException {
userLogsLargeList = new ArrayList<String>();
userInfosList = new ArrayList<String>();
for (ValueFlag value : values) {
if (value.getFlag().equals("userLogsLarge")) {
userLogsLargeList.add(value.getValue());
} else if (value.getFlag().equals("userInfo")) {
userInfosList.add(value.getValue());
}
}
// 对两组中的数据进行笛卡尔乘积
for (String userlog : userLogsLargeList) {
for (String userinfo : userInfosList) {
// 构建一个useractionLog对象
UserActionLog.Builder build = UserActionLog.newBuilder();
// 从userlog中提取actiontyoe和ipaddress
infos = userlog.split("\\s");
build.setActionType(infos[0]);
build.setIpAddress(infos[1]);
// 从userinfo 提取gender 和privince
infos = userinfo.split("\\s");
if (infos[0].equals("man")) {
build.setGender(0);
} else {
build.setGender(1);
}
build.setProvience(infos[1]);
build.setUserName(key.toString());
UserActionLog userActionLog = build.build();
// 吧userAction封装到Avrokey中
outKey.datum(userActionLog);
context.write(outKey, outValue);
}
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(ReducerJoin.class);
job.setJobName("reducer联合");
job.setMapperClass(ReducerJoinMap.class);
job.setReducerClass(ReducerJoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ValueFlag.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWriter.class);
//设置输出的格式是avrokey
job.setOutputFormatClass(AvroKeyOutputFormat.class);
//设置输出key的schema
AvroJob.setOutputKeySchema(job, UserActionLog.SCHEMA$);
FileInputFormat.addInputPath(job, new Path("/mapjoin"));
Path outputPath = new Path("/ReducerJoin");
outputPath.getFileSystem(configuration).delete(outputPath, true);
FileOutputFormat.setOutputPath(job, outputPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}