[关闭]
@zhangyy 2018-11-28T10:38:15.000000Z 字数 8366 阅读 162

hbase 数据库深入使用与相关数据的加载

hbase的部分


  • 一:hbase 数据检索流程
  • 二:hbase 数据库java api 调用
  • 三:hbase 各个服务的作用
  • 四:hbase 与mapreduce集成

一:hbase 数据检索流程

1.1 hbase 数据检索流程图:

image_1al15815q17ek1imf1qisme015r89.png-184.9kB

1.2 hbase 读的流程:

  1. 读流程:
  2. 1client请求zookeeper集群(root/meta)(meta)
  3. --有多少tabletable有哪些regionstartrowstoprow
  4. 2client找到region对应的region server
  5. 3region server响应客户端请求

1.3. hhbase 写的流程

  1. 1client请求zookeeper集群,该数据应该写入哪个region
  2. 2、向region所在的region server 发起写请求
  3. 3、数据先写进HLOGWAL
  4. 4、然后写入memstoreflush
  5. 5、当memstore达到阀值,写入storefilecompact
  6. 6、当storefile达到阀值,合并成新的storefile
  7. 7、当region达到阀值,当前region会划分为两个新的regionsplit

1.4 hbase 读写流程存储核心的三个机制

  1. 1. flush机制:当memstore满了以后会flush陈一个storefile
  2. 2. compact机制:当storefile达到阀值,合并storefile,合并过程中cell版本合并和数据删除
  3. 3. split机制:当region不断增大,达到阀值,region会分成两个新的region

二:hbase 数据库java api 调用

2.1 eclipse 环境配置

  1. 更改maven 的源:
  2. 上传repository.tar.gz
  1. cd .m2
  2. mv repository repository.bak2016612
  3. rz repository.tar.gz
  4. tar -zxvf repository.tar.gz
  5. cd /home/hadoop/yangyang/hbase
  6. cp -p hbase-site.xml log4j.properties /home/hadoop/workspace/studyhbase/src/main/rescourse

更改eclipse 的pom.xml

增加:

  1. <dependency>
  2. <groupId>org.apache.hbase</groupId>
  3. <artifactId>hbase-server</artifactId>
  4. <version>0.98.6-hadoop2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hbase</groupId>
  8. <artifactId>hbase-client</artifactId>
  9. <version>0.98.6-hadoop2</version>
  10. </dependency>

2.2 hbase java api 掉用:

  1. package org.apache.hadoop.hbase;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.hbase.Cell;
  5. import org.apache.hadoop.hbase.CellUtil;
  6. import org.apache.hadoop.hbase.HBaseConfiguration;
  7. import org.apache.hadoop.hbase.client.Get;
  8. import org.apache.hadoop.hbase.client.HBaseAdmin;
  9. import org.apache.hadoop.hbase.client.HTable;
  10. import org.apache.hadoop.hbase.client.Put;
  11. import org.apache.hadoop.hbase.client.Result;
  12. import org.apache.hadoop.hbase.client.ResultScanner;
  13. import org.apache.hadoop.hbase.client.Scan;
  14. import org.apache.hadoop.hbase.util.Bytes;
  15. public class HbaseOperation {
  16. /**
  17. *
  18. * @param args
  19. * @throws IOException
  20. */
  21. public static HTable getTable(String tableName) throws IOException {
  22. // Get configuration
  23. Configuration conf = HBaseConfiguration.create();
  24. // Get Table
  25. HTable table = new HTable(conf, tableName);
  26. return table;
  27. }
  28. public static void getData() throws IOException {
  29. HTable table = HbaseOperation.getTable("user");
  30. // Get Data
  31. Get get = new Get(Bytes.toBytes("1001"));
  32. Result result = table.get(get);
  33. Cell[] cells = result.rawCells();
  34. for (Cell cell : cells) {
  35. System.out.print(Bytes.toString(CellUtil.cloneFamily(cell)) + ":");
  36. System.out.print(Bytes.toString(CellUtil.cloneQualifier(cell))
  37. + "==>");
  38. System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
  39. }
  40. table.close();
  41. }
  42. /**
  43. *
  44. * @param args
  45. * @throws IOException
  46. */
  47. public static void putData() throws IOException {
  48. HTable table = HbaseOperation.getTable("user");
  49. Put put = new Put(Bytes.toBytes("1004"));
  50. put.add(Bytes.toBytes("info"), Bytes.toBytes("name"),
  51. Bytes.toBytes("zhaoliu"));
  52. put.add(Bytes.toBytes("info"), Bytes.toBytes("age"),
  53. Bytes.toBytes("50"));
  54. put.add(Bytes.toBytes("info"), Bytes.toBytes("sex"),
  55. Bytes.toBytes("male"));
  56. table.put(put);
  57. table.close();
  58. }
  59. public static void main(String[] args) throws IOException {
  60. HTable table = HbaseOperation.getTable("user");
  61. Scan scan = new Scan();
  62. scan.setStartRow(Bytes.toBytes("1001")) ;
  63. scan.setStopRow(Bytes.toBytes("1002")) ;
  64. ResultScanner resultScanner = table.getScanner(scan);
  65. for (Result res : resultScanner) {
  66. Cell[] ress = res.rawCells();
  67. for (Cell cell : ress) {
  68. System.out.print(Bytes.toString(CellUtil.cloneRow(cell))
  69. + "\t");
  70. System.out.print(Bytes.toString(CellUtil.cloneFamily(cell))
  71. + ":");
  72. System.out.print(Bytes.toString(CellUtil.cloneQualifier(cell))
  73. + "==>");
  74. System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
  75. }
  76. table.close();
  77. }
  78. }
  79. }

三:hbase 各个服务的作用

3.1 Hmaster 作用:

  1. 1、为region server分配region
  2. 2、负责region server的负责均衡
  3. 3、发现失效的region server,需要重新分配其上的region
  4. 4、监听zk,基于zookeeper感应region server的上下线
  5. 5、监听zk,基于zookeeper来保证HA
  6. 6、不参与客户端数据读写访问
  7. 7、负载低(通常情况下可以把它和其他服务器(NN/SNN)整合在一起)
  8. 8、无单点故障(SPOF

3.2 Hregionserver 作用:

  1. 1、维护master分配给它的region
  2. 2、响应客户端的IO访问请求(读写)
  3. 3、处理regionflushcompactsplit
  4. 4、维护regioncache

3.4 zookeeper 作用:

  1. 1、保证集群里面只有一个masterHA
  2. 2、保存了root region的位置(meta),访问入口地址
  3. 3、实时监控region server的状态,及时通知region server上下线消息给master
  4. 4、存储了hbaseschema,包括哪些table,每个表有哪些列簇

四:hbase 与mapreduce集成

4.1 hbase 获取jar命令

  1. bin/hbase mapredcp

4.2 配置环境变量

  1. vim .bash_profile
  2. export HADOOP_HOME=/home/hadoop/yangyang/hadoop
  3. export HBASE_HOME=/home/hadoop/yangyang/hbase
  4. export HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase mapredcp`
  5. PATH=$PATH:$HOME/bin:$JAVA_HOME/bin:${HADOOP_HOME}/bin:${MAVEN_HOME}/bin:${HBASE_HOME}:${HADOOP_CLASSPATH}
  6. soure .bash_profile

image_1al1qck6g2cjpmg402129542dm.png-38.6kB

4.3 统计一个hbase表:

  1. cd /home/hadoop/yangyang/hadoop
  2. bin/yarn jar /home/hadoop/yangyang/hbase/lib/hbase-server-0.98.6-cdh5.3.6.jar rowcounter user

image_1al1rbgq1v5moue17q1jr3miq13.png-73kB
image_1al1rc677gpm1ninpeu1v7s1vlr1g.png-50.4kB

4.4 导入一个生成的hbase 表的in.tsv

  1. vim in.tsv
  2. ---
  3. 10010 zhangsan 30 shanghai
  4. 10011 lisi 31 beijin
  5. 10012 wangwu 32 shanghai
  6. 10013 zaoliu 30 beijin
  1. hdfs dfs -put in.tsv /input
  2. yarn jar /home/hadoop/yangyang/hbase/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:address user /input/in.tsv

image_1al1vgrgg1tg4im61262ais10971t.png-72.6kB
image_1al1vhrb1jpjqs81g581le51rd52a.png-48.5kB
image_1al1vltnpsd199rg4t1etj1s8u34.png-59.9kB

4.5 使用BulkLoad加载数据

  1. vim out.tsv
  2. 110 zhangsan 30 shanghai
  3. 111 lisi 31 beijin
  4. 112 wangwu 32 shanghai
  5. 113 zaoliu 30 beijin
  1. hdfs dfs -put out.tsv /input
  2. <!-- tsv 文件转换成hfile 文件(在hdfs 上面)-->
  3. yarn jar /home/hadoop/yangyang/hbase/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv -Dimporttsv.bulk.output=/hfileoutput/ -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:tel user /input/out.tsv
  4. <!-- hfile 加载到hbase 的表中
  5. yarn jar /home/hadoop/yangyang/hbase/lib/hbase-server-0.98.6-cdh5.3.6.jar completebulkload /hfileoutput user

image_1al3mesfr1iml1mvk1e8i1t5c18pv3h.png-75.4kB
image_1al3mflhuue71rph157sbgm1ujg3u.png-35.4kB
image_1al3msr5519p8is866r19711avh4b.png-85.3kB
image_1al3mtd9v8a1pll1a7ced9aom4o.png-23.9kB
image_1al3mu0emepusmu7tg1n9p1st255.png-39.5kB

4.6 hbase 表中提取相关字段,生成新的表

  1. 提取hbase 表中的user 表的name age 字段 生成新表student
  1. package org.apache.hadoop.studyhbase;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.hbase.Cell;
  6. import org.apache.hadoop.hbase.CellUtil;
  7. import org.apache.hadoop.hbase.HBaseConfiguration;
  8. import org.apache.hadoop.hbase.client.Put;
  9. import org.apache.hadoop.hbase.client.Result;
  10. import org.apache.hadoop.hbase.client.Scan;
  11. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  12. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  13. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  14. import org.apache.hadoop.hbase.mapreduce.TableReducer;
  15. import org.apache.hadoop.hbase.util.Bytes;
  16. import org.apache.hadoop.io.NullWritable;
  17. import org.apache.hadoop.mapreduce.Job;
  18. import org.apache.hadoop.util.Tool;
  19. import org.apache.hadoop.util.ToolRunner;
  20. public class User2StudentMapReduce extends Configured implements Tool{
  21. // step 1: Mapper
  22. public static class ReadUserMapper //
  23. extends TableMapper<ImmutableBytesWritable, Put>{
  24. @Override
  25. protected void map(ImmutableBytesWritable key, Result value,
  26. Context context)
  27. throws IOException, InterruptedException {
  28. // user: name & age -> student: name & age : put
  29. // create Put
  30. Put put = new Put(key.get()) ;
  31. // add column
  32. for(Cell cell: value.rawCells()){
  33. // add family: info
  34. if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
  35. // add column: name
  36. if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
  37. put.add(cell) ;
  38. // CellUtil.cloneValue(cell)
  39. // put.add(family, qualifier, value) ;
  40. }
  41. // add column: age
  42. else if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
  43. put.add(cell) ;
  44. }
  45. }
  46. }
  47. // context output
  48. context.write(key, put);
  49. }
  50. }
  51. // step 2: Reducer
  52. public static class WriteStudentReducer //
  53. extends TableReducer<ImmutableBytesWritable, Put, NullWritable>{
  54. @Override
  55. protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
  56. Context context) throws IOException, InterruptedException {
  57. for(Put put : values){
  58. context.write(NullWritable.get(), put);
  59. }
  60. }
  61. }
  62. // step 3: Driver
  63. public int run(String[] args) throws Exception {
  64. // 1) Configuration
  65. Configuration conf = this.getConf();
  66. // 2) create job
  67. Job job = Job.getInstance(conf, this.getClass().getSimpleName()) ;
  68. job.setJarByClass(User2StudentMapReduce.class);
  69. // 3) set job
  70. // input -> mapper -> shuffle -> reducer -> output
  71. Scan scan = new Scan() ;
  72. scan.setCacheBlocks(false);
  73. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
  74. TableMapReduceUtil.initTableMapperJob(
  75. "user", // input table
  76. scan, // Scan instance to control CF and attribute selection
  77. ReadUserMapper.class, // mapper class
  78. ImmutableBytesWritable.class, // mapper output key
  79. Put.class, // mapper output value
  80. job //
  81. );
  82. TableMapReduceUtil.initTableReducerJob(
  83. "student", // output table
  84. WriteStudentReducer.class, // reducer class
  85. job //
  86. );
  87. job.setNumReduceTasks(1); // at least one, adjust as required
  88. boolean isSuccess = job.waitForCompletion(true);
  89. if (!isSuccess) {
  90. throw new IOException("error with job!");
  91. }
  92. return isSuccess ? 0 : 1;
  93. }
  94. public static void main(String[] args) throws Exception {
  95. Configuration conf = HBaseConfiguration.create();
  96. int status = ToolRunner.run(//
  97. conf, //
  98. new User2StudentMapReduce(), //
  99. args //
  100. );
  101. System.exit(status);
  102. }
  103. }
  1. hbase 上面新建空表student
  2. create 'student','info'

image_1al3opgk49tnbas1tn2d041dd35i.png-8.8kB

导出新生成jar包User2Student.jar运行:

  1. cd /home/hadoop/jars
  2. yarn jar User2Student.jar

image_1al3oud6fv38rl41iov9hg1g415v.png-46.8kB
image_1al3ousmm1v4f1un51u2e147onn06c.png-32.6kB

查询hbase进行验证:

image_1al3p0c7e1mtlq351denjs713jm6p.png-72.4kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注