@xtccc
2016-03-31T04:16:21.000000Z
字数 1882
阅读 2375
HDFS
对于系统提供的TextInputFormat,对于每个split调用next的返回结果: key是当前行开头在输入文件的offset, value是当前行的文本内容,默认的分隔符是\n,\r,或者\r\n
如果希望自己写一个TextInputFormatWithFilePath,使得每次对输入split调用next时,返回的key是该split所在文件的路径,value是一行文本,那么可以如如下定义:
// TextInputFormatWithFilePath.java 文件public class TextInputFormatWithFilePathextends FileInputFormat<Text, Text> implements JobConfigurable{private CompressionCodecFactory codecsFactory = null;@Overridepublic void configure(JobConf conf) {codecsFactory = new CompressionCodecFactory(conf);}@Overridepublic RecordReader getRecordReader(InputSplit split,JobConf job, Reporter reporter) throws IOException {reporter.setStatus(split.toString());String delimiter = job.get("textinputformat.record.delimiter");byte[] recordDelimiter = null;if (null != delimiter)recordDelimiter = delimiter.getBytes(Charsets.UTF_8);return new LineRecordReaderWithFilePath(job,(FileSplit)split, recordDelimiter);}}// LineRecordReaderWithFilePath.java 文件public class LineRecordReaderWithFilePath implements RecordReader<Text, Text> {private LineRecordReader lineReader;private LongWritable offset;private Text line;private String filepath;public LineRecordReaderWithFilePath(Configuration conf,FileSplit split, byte[] delimiterBytes) throws IOException {lineReader = new LineRecordReader(conf, split, delimiterBytes);offset = lineReader.createKey();line = lineReader.createValue();filepath = split.getPath().toString();}@Overridepublic boolean next(Text key, Text value) throws IOException {if (!lineReader.next(offset, line))return false;key.set(filepath);value.set(line);return true;}@Overridepublic Text createKey() {return new Text();}@Overridepublic Text createValue() {return new Text();}@Overridepublic long getPos() throws IOException {return lineReader.getPos();}@Overridepublic void close() throws IOException {lineReader.close();}@Overridepublic float getProgress() throws IOException {return lineReader.getProgress();}}
使用:
job.setInputFormat(TextInputFormatWithFilePath.class);
