@awsekfozc
2016-01-23T06:56:22.000000Z
字数 4082
阅读 1976
Hbase
tsv格式数据导入Hbase
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
$HADOOP_HOME/bin/yarn jar \
$HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \
importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:name,info:age,info:sex,info:address,info:phone \
student \
hdfs://hadoop.zc.com:8020/user/zc/hbase/importtsv/input
###大批量数据导入加入-Dimporttsv.bulk.output
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
$HADOOP_HOME/bin/yarn jar \
$HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \
importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:name,info:age,info:sex,info:address,info:phone \
-Dimporttsv.bulk.output=hdfs://hadoop.zc.com:8020/user/zc/hbase/importtsv/output \
student \
hdfs://hadoop.zc.com:8020/user/zc/hbase/importtsv/input
###导入-Dimporttsv.bulk.output生成的HFile
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
$HADOOP_HOME/bin/yarn jar \
$HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \
completebulkload \
hdfs://hadoop.zc.com:8020/user/zc/hbase/importtsv/output \
student
大批量的数据迁移至Hbase,使用Map task生成Hfile文件,直接importtsv值Hbase表中速度消耗大大减小。
package com.zc.bigdata.hadoop.hbase.util;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class TransformHFileMapReduce extends Configured implements Tool {
public static final String COLUMN_FAMILY = "info" ;
public static final String[] COLUMNS = new String[]{
"rowkey","name","age","sex","address","phone"
} ;
public static class TransformHFileMapper extends //
Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private ImmutableBytesWritable rowkey = new ImmutableBytesWritable();
//
@Override
protected void map(LongWritable key, Text value, //
Context context) throws IOException, InterruptedException {
// line value
String lineValue = value.toString() ;
// split
String[] vals = lineValue.split("\t") ;
// create rowkey
rowkey.set(Bytes.toBytes(vals[0]));
// create Put instance
Put put = new Put(rowkey.get()) ;
for(int index = 1 ;index < vals.length ; index++){
put.add(//
Bytes.toBytes(COLUMN_FAMILY), //
Bytes.toBytes(COLUMNS[index]), //
Bytes.toBytes(vals[index]) //
);
}
// context write
context.write(rowkey, put);
}
}
public int run(String[] args) throws Exception {
// get configuration
Configuration conf = this.getConf() ;
// create job
Job job = Job.getInstance(conf, //
this.getClass().getSimpleName()
);
// set run job class
job.setJarByClass(TransformHFileMapReduce.class);
// set job
// step 1: set input
Path inPath = new Path(args[1]) ;
FileInputFormat.addInputPath(job, inPath);
// step 2: set map class
job.setMapperClass(TransformHFileMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
// step 3: set reduce class
job.setReducerClass(PutSortReducer.class);
// job.setOutputKeyClass(ImmutableBytesWritable.class);
// job.setOutputValueClass(KeyValue.class);
// step 4 : output
Path outputDir = new Path(args[2]) ;
FileOutputFormat.setOutputPath(job, outputDir);
// set
// get table instance
HTable table = new HTable(conf, args[0]) ;
HFileOutputFormat2.configureIncrementalLoad(job, table);
// submit job
return job.waitForCompletion(true) ? 0: 1;
}
public static void main(String[] args) throws Exception {
// get configuration
Configuration conf = HBaseConfiguration.create() ;
// run job
int status = ToolRunner.run(//
conf,//
new TransformHFileMapReduce(),//
args //
) ;
// exit program
System.exit(status);
}
}
在此输入正文