@songlaf
2016-04-20T20:50:59.000000Z
字数 2369
阅读 636
北风网大数据培训
package njt.song.study.hadoop;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class NjtHadoopMergeFile {
public static final int BUFSIZE = 1024 * 8;
/*
* 合并文件
*/
public static void mergeFiles(String outFile,ArrayList<String> files) {
FileChannel outChannel = null;
try {
outChannel = new FileOutputStream(outFile).getChannel();
for(String f : files){
FileInputStream fileInputStream = new FileInputStream(f);
FileChannel fc = fileInputStream.getChannel();
ByteBuffer bb = ByteBuffer.allocate(BUFSIZE);
while(fc.read(bb) != -1){
bb.flip();
outChannel.write(bb);
bb.clear();
}
fc.close();
fileInputStream.close();
}
} catch (IOException ioe) {
ioe.printStackTrace();
} finally {
try {if (outChannel != null) {outChannel.close();}} catch (IOException ignore) {}
}
}
/*
* 获取目录下的所有文件
*/
public static ArrayList<String> getListFiles(String filepath) {
ArrayList<String> result = new ArrayList<String>();
File directory = new File(filepath);
File[] files = directory.listFiles(getFileExtensionFilter(".xml"));//使用具体对象,把过滤后的以.java文件的文件放到数组当中
for (int i = 0; i < files.length; i++) {
result.add(files[i].toString());
}
return result;
}
public static FilenameFilter getFileExtensionFilter(String extension) {
final String _extension = extension;
return new FilenameFilter() {
public boolean accept(File file, String name) {
boolean ret = name.endsWith(_extension);
return ret;
}
};
}
public static void main(String[] args) throws IOException {
String localPath = args[0];//本地目录
String remotePath = args[1]; // 上传的HDFS目录
ArrayList<String> files = getListFiles(localPath); //读取本地目录的xml文件
mergeFiles("/home/sjf/merge.data",files);//合并文件,临时合并到本地的一个目录,然后上传。
Configuration conf = new Configuration();//读取配置信息
FileSystem fs = FileSystem.get(conf);
Path src = new Path("/home/sjf/merge.data");
Path dst = new Path(remotePath);
fs.copyFromLocalFile(src, dst);//上传文件
fs.close();
}
}
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.5.1</version>
</dependency>
./hadoop jar /home/sjf/NjtHadoopMergeFile.jar "/opt/modules/hadoop-2.5.0/etc/hadoop" "/input"
bin/hdfs dfs -ls /input