@xtccc
2016-03-31T12:16:21.000000Z
字数 1882
阅读 2096
HDFS
对于系统提供的TextInputFormat,对于每个split调用next的返回结果: key是当前行开头在输入文件的offset, value是当前行的文本内容,默认的分隔符是\n,\r,或者\r\n
如果希望自己写一个TextInputFormatWithFilePath,使得每次对输入split调用next时,返回的key是该split所在文件的路径,value是一行文本,那么可以如如下定义:
// TextInputFormatWithFilePath.java 文件
public class TextInputFormatWithFilePath
extends FileInputFormat<Text, Text> implements JobConfigurable{
private CompressionCodecFactory codecsFactory = null;
@Override
public void configure(JobConf conf) {
codecsFactory = new CompressionCodecFactory(conf);
}
@Override
public RecordReader getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(split.toString());
String delimiter = job.get("textinputformat.record.delimiter");
byte[] recordDelimiter = null;
if (null != delimiter)
recordDelimiter = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReaderWithFilePath(job,
(FileSplit)split, recordDelimiter);
}
}
// LineRecordReaderWithFilePath.java 文件
public class LineRecordReaderWithFilePath implements RecordReader<Text, Text> {
private LineRecordReader lineReader;
private LongWritable offset;
private Text line;
private String filepath;
public LineRecordReaderWithFilePath(Configuration conf,
FileSplit split, byte[] delimiterBytes) throws IOException {
lineReader = new LineRecordReader(conf, split, delimiterBytes);
offset = lineReader.createKey();
line = lineReader.createValue();
filepath = split.getPath().toString();
}
@Override
public boolean next(Text key, Text value) throws IOException {
if (!lineReader.next(offset, line))
return false;
key.set(filepath);
value.set(line);
return true;
}
@Override
public Text createKey() {
return new Text();
}
@Override
public Text createValue() {
return new Text();
}
@Override
public long getPos() throws IOException {
return lineReader.getPos();
}
@Override
public void close() throws IOException {
lineReader.close();
}
@Override
public float getProgress() throws IOException {
return lineReader.getProgress();
}
}
使用:
job.setInputFormat(TextInputFormatWithFilePath.class);