[关闭]
@songlaf 2016-04-20T20:50:59.000000Z 字数 2369 阅读 636

作业四【HDFS API基本操作】

北风网大数据培训

一)java代码:

  1. package njt.song.study.hadoop;
  2. import java.io.File;
  3. import java.io.FileInputStream;
  4. import java.io.FileOutputStream;
  5. import java.io.FilenameFilter;
  6. import java.io.IOException;
  7. import java.nio.ByteBuffer;
  8. import java.nio.channels.FileChannel;
  9. import java.util.ArrayList;
  10. import org.apache.hadoop.conf.Configuration;
  11. import org.apache.hadoop.fs.FileSystem;
  12. import org.apache.hadoop.fs.Path;
  13. public class NjtHadoopMergeFile {
  14. public static final int BUFSIZE = 1024 * 8;
  15. /*
  16. * 合并文件
  17. */
  18. public static void mergeFiles(String outFile,ArrayList<String> files) {
  19. FileChannel outChannel = null;
  20. try {
  21. outChannel = new FileOutputStream(outFile).getChannel();
  22. for(String f : files){
  23. FileInputStream fileInputStream = new FileInputStream(f);
  24. FileChannel fc = fileInputStream.getChannel();
  25. ByteBuffer bb = ByteBuffer.allocate(BUFSIZE);
  26. while(fc.read(bb) != -1){
  27. bb.flip();
  28. outChannel.write(bb);
  29. bb.clear();
  30. }
  31. fc.close();
  32. fileInputStream.close();
  33. }
  34. } catch (IOException ioe) {
  35. ioe.printStackTrace();
  36. } finally {
  37. try {if (outChannel != null) {outChannel.close();}} catch (IOException ignore) {}
  38. }
  39. }
  40. /*
  41. * 获取目录下的所有文件
  42. */
  43. public static ArrayList<String> getListFiles(String filepath) {
  44. ArrayList<String> result = new ArrayList<String>();
  45. File directory = new File(filepath);
  46. File[] files = directory.listFiles(getFileExtensionFilter(".xml"));//使用具体对象,把过滤后的以.java文件的文件放到数组当中
  47. for (int i = 0; i < files.length; i++) {
  48. result.add(files[i].toString());
  49. }
  50. return result;
  51. }
  52. public static FilenameFilter getFileExtensionFilter(String extension) {
  53. final String _extension = extension;
  54. return new FilenameFilter() {
  55. public boolean accept(File file, String name) {
  56. boolean ret = name.endsWith(_extension);
  57. return ret;
  58. }
  59. };
  60. }
  61. public static void main(String[] args) throws IOException {
  62. String localPath = args[0];//本地目录
  63. String remotePath = args[1]; // 上传的HDFS目录
  64. ArrayList<String> files = getListFiles(localPath); //读取本地目录的xml文件
  65. mergeFiles("/home/sjf/merge.data",files);//合并文件,临时合并到本地的一个目录,然后上传。
  66. Configuration conf = new Configuration();//读取配置信息
  67. FileSystem fs = FileSystem.get(conf);
  68. Path src = new Path("/home/sjf/merge.data");
  69. Path dst = new Path(remotePath);
  70. fs.copyFromLocalFile(src, dst);//上传文件
  71. fs.close();
  72. }
  73. }

二)POM.xml文件

  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-hdfs</artifactId>
  4. <version>2.5.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hadoop</groupId>
  8. <artifactId>hadoop-common</artifactId>
  9. <version>2.5.1</version>
  10. </dependency>

三)执行命令:

  1. ./hadoop jar /home/sjf/NjtHadoopMergeFile.jar "/opt/modules/hadoop-2.5.0/etc/hadoop" "/input"

四)查看执行结果

  1. bin/hdfs dfs -ls /input

dsddddddddddddddddddd.png-43.4kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注