[关闭]
@xtccc 2016-03-31T12:16:21.000000Z 字数 1882 阅读 2110

Custom File Input Format

给我写信
GitHub

此处输入图片的描述


HDFS



对于系统提供的TextInputFormat,对于每个split调用next的返回结果: key是当前行开头在输入文件的offset, value是当前行的文本内容,默认的分隔符是\n,\r,或者\r\n

如果希望自己写一个TextInputFormatWithFilePath,使得每次对输入split调用next时,返回的key是该split所在文件的路径,value是一行文本,那么可以如如下定义:


  1. // TextInputFormatWithFilePath.java 文件
  2. public class TextInputFormatWithFilePath
  3. extends FileInputFormat<Text, Text> implements JobConfigurable{
  4. private CompressionCodecFactory codecsFactory = null;
  5. @Override
  6. public void configure(JobConf conf) {
  7. codecsFactory = new CompressionCodecFactory(conf);
  8. }
  9. @Override
  10. public RecordReader getRecordReader(InputSplit split,
  11. JobConf job, Reporter reporter) throws IOException {
  12. reporter.setStatus(split.toString());
  13. String delimiter = job.get("textinputformat.record.delimiter");
  14. byte[] recordDelimiter = null;
  15. if (null != delimiter)
  16. recordDelimiter = delimiter.getBytes(Charsets.UTF_8);
  17. return new LineRecordReaderWithFilePath(job,
  18. (FileSplit)split, recordDelimiter);
  19. }
  20. }
  21. // LineRecordReaderWithFilePath.java 文件
  22. public class LineRecordReaderWithFilePath implements RecordReader<Text, Text> {
  23. private LineRecordReader lineReader;
  24. private LongWritable offset;
  25. private Text line;
  26. private String filepath;
  27. public LineRecordReaderWithFilePath(Configuration conf,
  28. FileSplit split, byte[] delimiterBytes) throws IOException {
  29. lineReader = new LineRecordReader(conf, split, delimiterBytes);
  30. offset = lineReader.createKey();
  31. line = lineReader.createValue();
  32. filepath = split.getPath().toString();
  33. }
  34. @Override
  35. public boolean next(Text key, Text value) throws IOException {
  36. if (!lineReader.next(offset, line))
  37. return false;
  38. key.set(filepath);
  39. value.set(line);
  40. return true;
  41. }
  42. @Override
  43. public Text createKey() {
  44. return new Text();
  45. }
  46. @Override
  47. public Text createValue() {
  48. return new Text();
  49. }
  50. @Override
  51. public long getPos() throws IOException {
  52. return lineReader.getPos();
  53. }
  54. @Override
  55. public void close() throws IOException {
  56. lineReader.close();
  57. }
  58. @Override
  59. public float getProgress() throws IOException {
  60. return lineReader.getProgress();
  61. }
  62. }



使用:

  1. job.setInputFormat(TextInputFormatWithFilePath.class);
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注