@awsekfozc
2016-01-22T22:56:22.000000Z
字数 4082
阅读 2165
Hbase
tsv格式数据导入Hbase
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2export HADOOP_HOME=/opt/modules/hadoop-2.5.0HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \$HADOOP_HOME/bin/yarn jar \$HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \importtsv \-Dimporttsv.columns=HBASE_ROW_KEY,\info:name,info:age,info:sex,info:address,info:phone \student \hdfs://hadoop.zc.com:8020/user/zc/hbase/importtsv/input###大批量数据导入加入-Dimporttsv.bulk.outputHADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \$HADOOP_HOME/bin/yarn jar \$HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \importtsv \-Dimporttsv.columns=HBASE_ROW_KEY,\info:name,info:age,info:sex,info:address,info:phone \-Dimporttsv.bulk.output=hdfs://hadoop.zc.com:8020/user/zc/hbase/importtsv/output \student \hdfs://hadoop.zc.com:8020/user/zc/hbase/importtsv/input###导入-Dimporttsv.bulk.output生成的HFileHADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \$HADOOP_HOME/bin/yarn jar \$HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \completebulkload \hdfs://hadoop.zc.com:8020/user/zc/hbase/importtsv/output \student
大批量的数据迁移至Hbase,使用Map task生成Hfile文件,直接importtsv值Hbase表中速度消耗大大减小。
package com.zc.bigdata.hadoop.hbase.util;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;import org.apache.hadoop.hbase.mapreduce.PutSortReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class TransformHFileMapReduce extends Configured implements Tool {public static final String COLUMN_FAMILY = "info" ;public static final String[] COLUMNS = new String[]{"rowkey","name","age","sex","address","phone"} ;public static class TransformHFileMapper extends //Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {private ImmutableBytesWritable rowkey = new ImmutableBytesWritable();//@Overrideprotected void map(LongWritable key, Text value, //Context context) throws IOException, InterruptedException {// line valueString lineValue = value.toString() ;// splitString[] vals = lineValue.split("\t") ;// create rowkeyrowkey.set(Bytes.toBytes(vals[0]));// create Put instancePut put = new Put(rowkey.get()) ;for(int index = 1 ;index < vals.length ; index++){put.add(//Bytes.toBytes(COLUMN_FAMILY), //Bytes.toBytes(COLUMNS[index]), //Bytes.toBytes(vals[index]) //);}// context writecontext.write(rowkey, put);}}public int run(String[] args) throws Exception {// get configurationConfiguration conf = this.getConf() ;// create jobJob job = Job.getInstance(conf, //this.getClass().getSimpleName());// set run job classjob.setJarByClass(TransformHFileMapReduce.class);// set job// step 1: set inputPath inPath = new Path(args[1]) ;FileInputFormat.addInputPath(job, inPath);// step 2: set map classjob.setMapperClass(TransformHFileMapper.class);job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(Put.class);// step 3: set reduce classjob.setReducerClass(PutSortReducer.class);// job.setOutputKeyClass(ImmutableBytesWritable.class);// job.setOutputValueClass(KeyValue.class);// step 4 : outputPath outputDir = new Path(args[2]) ;FileOutputFormat.setOutputPath(job, outputDir);// set// get table instanceHTable table = new HTable(conf, args[0]) ;HFileOutputFormat2.configureIncrementalLoad(job, table);// submit jobreturn job.waitForCompletion(true) ? 0: 1;}public static void main(String[] args) throws Exception {// get configurationConfiguration conf = HBaseConfiguration.create() ;// run jobint status = ToolRunner.run(//conf,//new TransformHFileMapReduce(),//args //) ;// exit programSystem.exit(status);}}
在此输入正文