[关闭]
@awsekfozc 2016-01-23T06:56:22.000000Z 字数 4082 阅读 1976

Hbase数据迁移

Hbase

来源

RDBMS

  1. sqoop
  2. kettle
  3. APP

日志文件

  1. flume
  2. MapReduce
  3. completebulkload($HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar)

importtsv

tsv格式数据导入Hbase

  1. export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
  2. export HADOOP_HOME=/opt/modules/hadoop-2.5.0
  3. HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
  4. $HADOOP_HOME/bin/yarn jar \
  5. $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \
  6. importtsv \
  7. -Dimporttsv.columns=HBASE_ROW_KEY,\
  8. info:name,info:age,info:sex,info:address,info:phone \
  9. student \
  10. hdfs://hadoop.zc.com:8020/user/zc/hbase/importtsv/input
  11. ###大批量数据导入加入-Dimporttsv.bulk.output
  12. HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
  13. $HADOOP_HOME/bin/yarn jar \
  14. $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \
  15. importtsv \
  16. -Dimporttsv.columns=HBASE_ROW_KEY,\
  17. info:name,info:age,info:sex,info:address,info:phone \
  18. -Dimporttsv.bulk.output=hdfs://hadoop.zc.com:8020/user/zc/hbase/importtsv/output \
  19. student \
  20. hdfs://hadoop.zc.com:8020/user/zc/hbase/importtsv/input
  21. ###导入-Dimporttsv.bulk.output生成的HFile
  22. HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
  23. $HADOOP_HOME/bin/yarn jar \
  24. $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar \
  25. completebulkload \
  26. hdfs://hadoop.zc.com:8020/user/zc/hbase/importtsv/output \
  27. student

Hfile

大批量的数据迁移至Hbase,使用Map task生成Hfile文件,直接importtsv值Hbase表中速度消耗大大减小。
  1. package com.zc.bigdata.hadoop.hbase.util;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.hbase.HBaseConfiguration;
  7. import org.apache.hadoop.hbase.client.HTable;
  8. import org.apache.hadoop.hbase.client.Put;
  9. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  10. import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
  11. import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
  12. import org.apache.hadoop.hbase.util.Bytes;
  13. import org.apache.hadoop.io.LongWritable;
  14. import org.apache.hadoop.io.Text;
  15. import org.apache.hadoop.mapreduce.Job;
  16. import org.apache.hadoop.mapreduce.Mapper;
  17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  19. import org.apache.hadoop.util.Tool;
  20. import org.apache.hadoop.util.ToolRunner;
  21. public class TransformHFileMapReduce extends Configured implements Tool {
  22. public static final String COLUMN_FAMILY = "info" ;
  23. public static final String[] COLUMNS = new String[]{
  24. "rowkey","name","age","sex","address","phone"
  25. } ;
  26. public static class TransformHFileMapper extends //
  27. Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
  28. private ImmutableBytesWritable rowkey = new ImmutableBytesWritable();
  29. //
  30. @Override
  31. protected void map(LongWritable key, Text value, //
  32. Context context) throws IOException, InterruptedException {
  33. // line value
  34. String lineValue = value.toString() ;
  35. // split
  36. String[] vals = lineValue.split("\t") ;
  37. // create rowkey
  38. rowkey.set(Bytes.toBytes(vals[0]));
  39. // create Put instance
  40. Put put = new Put(rowkey.get()) ;
  41. for(int index = 1 ;index < vals.length ; index++){
  42. put.add(//
  43. Bytes.toBytes(COLUMN_FAMILY), //
  44. Bytes.toBytes(COLUMNS[index]), //
  45. Bytes.toBytes(vals[index]) //
  46. );
  47. }
  48. // context write
  49. context.write(rowkey, put);
  50. }
  51. }
  52. public int run(String[] args) throws Exception {
  53. // get configuration
  54. Configuration conf = this.getConf() ;
  55. // create job
  56. Job job = Job.getInstance(conf, //
  57. this.getClass().getSimpleName()
  58. );
  59. // set run job class
  60. job.setJarByClass(TransformHFileMapReduce.class);
  61. // set job
  62. // step 1: set input
  63. Path inPath = new Path(args[1]) ;
  64. FileInputFormat.addInputPath(job, inPath);
  65. // step 2: set map class
  66. job.setMapperClass(TransformHFileMapper.class);
  67. job.setMapOutputKeyClass(ImmutableBytesWritable.class);
  68. job.setMapOutputValueClass(Put.class);
  69. // step 3: set reduce class
  70. job.setReducerClass(PutSortReducer.class);
  71. // job.setOutputKeyClass(ImmutableBytesWritable.class);
  72. // job.setOutputValueClass(KeyValue.class);
  73. // step 4 : output
  74. Path outputDir = new Path(args[2]) ;
  75. FileOutputFormat.setOutputPath(job, outputDir);
  76. // set
  77. // get table instance
  78. HTable table = new HTable(conf, args[0]) ;
  79. HFileOutputFormat2.configureIncrementalLoad(job, table);
  80. // submit job
  81. return job.waitForCompletion(true) ? 0: 1;
  82. }
  83. public static void main(String[] args) throws Exception {
  84. // get configuration
  85. Configuration conf = HBaseConfiguration.create() ;
  86. // run job
  87. int status = ToolRunner.run(//
  88. conf,//
  89. new TransformHFileMapReduce(),//
  90. args //
  91. ) ;
  92. // exit program
  93. System.exit(status);
  94. }
  95. }

在此输入正文

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注