MultipleOutputs实现MapReduce多个输出

在默认的情况下,MapReduce在HDFS上新建一个目录作为输出目录。目录结构如下:

- _SUCCESS
- part-r-00000
- part-r-00001

其中_SUCCESS是一个空文件,表示任务成功,其他是输出文件。输出文件的文件名有三个部分:part是固定的,r表示是reduce的输出,最后的5位数表示是第几个reduce。每个reduce的输出都是独立的,利用这点可以实现将记录分类输出到不同文件。具体方式是将用于分类的字段放到map output的key中,并实现partitioner按照自己设定的规则分类。

但是如果我需要像Flume一样将输出的记录分类写入层级目录,或者希望输出文件以自己的方式命名怎么办呢?用普通的输出方式是做不到的,这时就需要用到MultipleOutputs。(MultipleOutputs有新旧两个版本,这里只讨论新版本 org.apache.hadoop.mapreduce.lib.output.MultipleOutputs)。
MultipleOutputs从名字就可以看出用于将输出数据写到多个输出流。官方描述如下:

The MultipleOutputs class simplifies writing output data to multiple outputs
Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own OutputFormat, with its own key class and with its own value class.
Case two: to write data to different files provided by user

翻译成中文:
MultipleOutputs类简化了将输出写到多个输出流的方式。
应用场景一:除了默认的输出,还需要将输出数据写到其他附加的输出。每个附加输出,或者说命名输出(下文会讲到),可以设置独立的OutputFormat,独立的key class和value class。
应用场景二:写数据到用户提供的多个不同的文件中。

下面我说说在这两种场景下MultipleOutputs分别的用法:
1.利用命名输出将数据写到默认输出目录的不同文件中,并自定义文件名。
在Job的初始化提交函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Job job = new Job();
FileInputFormat.setInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
job.setMapperClass(MOMap.class);
job.setReducerClass(MOReduce.class);
// 定义命名输出“text”。每个reduce都将不同的命名输出分别写到一个文件中,文件会以 text-r-00000 方式命名。
// 聪明的读者已经可以猜出,part是默认的命名输出。
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
LongWritable.class, Text.class);
// 定义命名输出“seq”。
MultipleOutputs.addNamedOutput(job, "seq"
SequenceFileOutputFormat.class,
LongWritable.class, Text.class);
job.waitForCompletion(true);

定义完命名输出,即可在Reducer中使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MOReduce extends
Reducer<WritableComparableWritableWritableComparableWritable> {
private MultipleOutputs mos;
public void setup(Context context) {
// 用context初始化MultipleOutputs
mos = new MultipleOutputs(context);
}
public void reduce(WritableComparable key, Iterator<Writable> values,
Context context)
throws IOException {
// 使用mos.write输出,而不是context.write
mos.write("text", , key, new Text("Hello"));
mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
}
public void cleanup(Context) throws IOException {
mos.close();
}

可以看出,不同于一般的OutputFormat,它是Context的封装。

2.不使用命名输出,直接将数据写到指定的路径。
在Job初始化时和普通MapReduce一样,不需要定义命名输出。在Reduce中使用 MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath) 输出数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private MultipleOutputs<Text, Text> out;
public void setup(Context context) {
out = new MultipleOutputs<Text, Text>(context);
}
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text t : values) {
out.write(key, t, generateFileName(<parameter list...>));
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
out.close();
}

generateFileName()根据参数生成输出路径。这个路径是相对的,当前路径是Job指定的OutputPath,路径最后一个斜杠后的字符串会作为文件名。

1
2
3
4
5
6
7
8
9
10
11
private String generateFileName(Text k) {
// expect Text k in format "Surname|Forename"
String[] kStr = k.toString().split("\\|");
String sName = kStr[0];
String fName = kStr[1];
// example for k = Smith|John
// output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc)
return sName + "/" + fName;
}

有了这两个用法,基本就可以满足自定义MapReduce输出的需求啦。

参文献:
1.Hadoop-3.0.0alpha1 API: MultipleOutputs
2.Hadoop多文件输出:MultipleOutputFormat和MutipleOutputs深究(二)

本文是原创文章,转载请注明:时间与精神的小屋 - MultipleOutputs实现MapReduce多个输出