HBase详解

文章目录

HBase简介

HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩、实时读写的分布式数据库

利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为其分布式协同服务,主要用来存储非结构化和半结构化的松散数据(列存 NoSQL 数据库)。

HBase特点

  1. 大:一个表可以有上亿行,上百万列。

  2. 面向列:面向列表(簇)的存储和权限控制,列(簇)独立检索。

  3. 稀疏:对于为空(NULL)的列,并不占用存储空间,因此,表可以设计的非常稀疏。

  4. 无模式:每一行都有一个可以排序的主键和任意多的列,列可以根据需要动态增 加,同一张表中不同的行可以有截然不同的列。

  5. 数据多版本:每个单元中的数据可以有多个版本,默认情况下,版本号自动分配, 版本号就是单元格插入时的时间戳。

  6. 数据类型单一:HBase中的数据都是字节数组,没有类型。

HBase架构

HBase详解

Master

  • 为Region server分配region
  • 负责Region server的负载均衡
  • 发现失效的Region server并重新分配其上的region
  • 管理用户对table的增删改操作

RegionServer

  • Region server维护region,处理对这些region的IO请求
  • Region server负责切分在运行过程中变得过大的region

Region

  • HBase自动把表水平划分成多个区域(region),每个region会保存一个表里面某段连续的数据;每个表一开始只有一个region,随着数据不断插入表,region不断增大,当增大到一个阀值的时候,region就会等分会两个新的region(裂变)。

  • 当table中的行不断增多,就会有越来越多的region。这样一张完整的表被保存在多个Regionserver 上。

Memstore&Storefile

  • 一个region由多个store组成,一个store对应一个CF(列族)store包括位于内存中的memstore和位于磁盘的storefile写操作先写入memstore,当memstore中的数据达到某个阈值,hregionserver会启动flashcache进程写入storefile,每次写入形成单独的一个storefile

  • 当storefile文件的数量增长到一定阈值后,系统会进行合并(minor、major compaction),在合并过程中会进行版本合并和删除工作(majar),形成更大的storefile

  • 当一个region所有storefile的大小和数量超过一定阈值后,会把当前的region分割为两个,并由hmaster分配到相应的regionserver服务器,实现负载均衡

  • 客户端检索数据,先在memstore找,找不到再找storefile

HLog

  • HLog文件就是一个普通的Hadoop Sequence File,Sequence File 的Key是HLogKey对象,HLogKey中记录了写入数据的归属信息,除了table和region名字外,同时还包括 sequence number和timestamp,timestamp是”写入时间”,sequence number的起始值为0,或者是最近一次存入文件系统中sequence number。

  • HLog SequeceFile的Value是HBase的KeyValue对象,即对应HFile中的KeyValue。

HBase数据模型

HBase详解

RowKey(行键)

  • 唯一标识一行数据

  • 可以通过RowKey获取一行数据

  • 按照字典顺序排序的。

  • Row key只能存储64k的字节数据 10-100byte

ColumnFamily&Qualifier(列簇和列)

  • HBase表中的每个列都归属于某个列族,列族必须作为表模式(schema)定义的一部分预先给出。如 create ‘test’, ‘course’。

  • 列名以列族作为前缀,每个“列族”都可以有多个列成员(column);如course:math, course:english, 新的列族成员(列)可以随后按需、动态加入。

  • 权限控制、存储以及调优都是在列族层面进行的;

  • HBase把同一列族里面的数据存储在同一目录下,由几个文件保存。

TimeStamp(时间戳)

  • 在HBase每个cell存储单元对同一份数据有多个版本,根据唯一的时间戳来区分每个版本之间的差异,不同版本的数据按照时间倒序排序,最新的数据版本排在最前面。

  • 时间戳的类型是 64位整型。

  • 时间戳可以由HBase(在数据写入时自动)赋值,此时时间戳是精确到毫秒的当前系统时间

  • 时间戳也可以由客户显式赋值,如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳。

Cell(存储单元)

  • 由行和列的坐标交叉决定。

  • 单元格是有版本的。

  • 单元格的内容是未解析的字节数组。

  • 由{row key, column( = +), version} 唯一确定的单元。cell中的数据是没有类型的,全部是字节码形式存贮。

HBase读写流程

HBase详解

HBase Shell

HBase详解

HBaseAPI

常用java类

java类 HBase数据模型
Admin / HBaseAdmin/ HBaseConfiguration 数据库
HTable/HTableDescriptor
HColumnDescriptor 列簇
Put/Delete/Get/Scan/ResultScanner/
CellUtil 存储单元

示例代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;

public class Demo2API {
    Configuration conf = null;
    Connection conn = null;

    @Before
    public void init() {
        conf = HBaseConfiguration.create();
        // 其他配置参考http://hbase.apache.org/1.4/book.html#config.files
        conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");
        try {
            conn = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 创建表
    @Test
    public void create_table() throws IOException {
        Admin admin = conn.getAdmin();
        // admin 相当于HBase的管理员
        // 创建表 传入表名(TableName.valueOf())
        HTableDescriptor tableName = new HTableDescriptor(TableName.valueOf("tableName"));
        // 创建列簇 传入列簇名
        HColumnDescriptor cf1 = new HColumnDescriptor("cf1");
        // 对列簇进行一些配置
        cf1.setMaxVersions(5); // 设置版本号
        cf1.setTimeToLive(30); // 设置TTL时间
        // 将创建好的列簇加入表
        tableName.addFamily(cf1);
        // 使用admin对象创建表
        admin.createTable(tableName);
    }

    // 删除表
    @Test
    public void drop_table() throws IOException {
        Admin admin = conn.getAdmin();
        String tableName = "tableName";
        admin.disableTable(TableName.valueOf(tableName));
        admin.deleteTable(TableName.valueOf(tableName));
    }

    // 添加一条数据
    @Test
    public void put() throws IOException {
        // 如果想要插入数据 查询数据 需要使用Table对象
        // 如果需要对表进行修改,获取表的一些配置、结构 使用HTableDescriptor对象
        Table testJavaAPI = conn.getTable(TableName.valueOf("testJavaAPI"));

        Put put = new Put("00001".getBytes());
        put.addColumn("cf1".getBytes(), "name".getBytes(), "zhangSan".getBytes());

        testJavaAPI.put(put);


    }

    // 获取一条数据
    @Test
    public void get() throws IOException {
        Table testJavaAPI = conn.getTable(TableName.valueOf("testJavaAPI"));
        Get get = new Get("00001".getBytes());
        Result rs = testJavaAPI.get(get);
        byte[] value = rs.getValue("cf1".getBytes(), "name".getBytes());
        System.out.println(Bytes.toString(value));
    }

    // 指定rowkey范围 扫描表
    @Test
    public void scan() throws IOException {
        Table testJavaAPI = conn.getTable(TableName.valueOf("test1"));
        Scan scan = new Scan();
        // 包含startRow 不包含 endRow
        scan.withStartRow("001".getBytes());
        scan.withStopRow("007".getBytes());
        ResultScanner scanner = testJavaAPI.getScanner(scan);
        Result rs = scanner.next();
        while (rs != null) {

            byte[] row = rs.getRow();// 获取rowkey
            String rk = Bytes.toString(row);
            System.out.println();
            if ("001".equals(rk)) {
                byte[] value = rs.getValue("cf1".getBytes(), "name".getBytes());
                System.out.println(Bytes.toString(value));
            } else if ("002".equals(rk)) {
                byte[] value = rs.getValue("cf1".getBytes(), "name0".getBytes());
                System.out.println(Bytes.toString(value));
                byte[] value1 = rs.getValue("cf1".getBytes(), "name1".getBytes());
                System.out.println(Bytes.toString(value1));
                byte[] value2 = rs.getValue("cf1".getBytes(), "name100".getBytes());
                System.out.println(Bytes.toString(value2));
                byte[] value3 = rs.getValue("cf1".getBytes(), "name2".getBytes());
                System.out.println(Bytes.toString(value3));
                byte[] value4 = rs.getValue("cf1".getBytes(), "name3".getBytes());
                System.out.println(Bytes.toString(value4));
                byte[] value5 = rs.getValue("cf1".getBytes(), "name4".getBytes());
                System.out.println(Bytes.toString(value5));
                byte[] value6 = rs.getValue("cf1".getBytes(), "name5".getBytes());
                System.out.println(Bytes.toString(value6));
            } else if ("007".equals(rk)) {
                byte[] value6 = rs.getValue("cf1".getBytes(), "name".getBytes());
                System.out.println(Bytes.toString(value6));
                byte[] value7 = rs.getValue("cf1".getBytes(), "age1".getBytes());
                System.out.println(Bytes.toString(value7));
            }

            rs = scanner.next();
        }
    }

    @Test
    public void cellUtil() throws IOException {
        Table testJavaAPI = conn.getTable(TableName.valueOf("test1"));
        Scan scan = new Scan();
        // 包含startRow 不包含 endRow
        scan.withStartRow("001".getBytes());
        scan.withStopRow("007".getBytes());
        ResultScanner scanner = testJavaAPI.getScanner(scan);
        Result rs = scanner.next();
        while (rs != null) {
            for (Cell cell : rs.listCells()) {
                byte[] rk = CellUtil.cloneRow(cell);
                byte[] cf = CellUtil.cloneFamily(cell);
                byte[] qualifier = CellUtil.cloneQualifier(cell);
                byte[] value = CellUtil.cloneValue(cell);

                System.out.println("rowkey:" + Bytes.toString(rk) + ",columnsFamily:" + Bytes.toString(cf) + ",qualifier:" + Bytes.toString(qualifier) + ",value:" + Bytes.toString(value));
            }

            rs = scanner.next();
        }
    }

    // 读取文件并写入HBase
    @Test
    public void putAll() throws IOException {
        Admin admin = conn.getAdmin();
        // 判断表是否存在,不存在即创建
        if (!admin.tableExists(TableName.valueOf("students"))) {
            HTableDescriptor students = new HTableDescriptor(TableName.valueOf("students"));
            HColumnDescriptor info = new HColumnDescriptor("info");
            students.addFamily(info);
            admin.createTable(students);
        }

        Table students = conn.getTable(TableName.valueOf("students"));
        // 读取文件
        BufferedReader br = new BufferedReader(new FileReader("data/students.txt"));
        String line = br.readLine();
        while (line != null) {
            String[] splits = line.split(",");
            String rk = splits[0];
            String name = splits[1];
            String age = splits[2];
            String gender = splits[3];
            String clazz = splits[4];

            Put put = new Put(Bytes.toBytes(rk));
            put.addColumn("info".getBytes(), "name".getBytes(), name.getBytes());
            put.addColumn("info".getBytes(), "age".getBytes(), age.getBytes());
            put.addColumn("info".getBytes(), "gender".getBytes(), gender.getBytes());
            put.addColumn("info".getBytes(), "clazz".getBytes(), clazz.getBytes());

            students.put(put);

            line = br.readLine();
        }
        br.close();

    }

    @After
    public void closeAll() throws IOException {
        if (conn != null) {
            conn.close();
        }
    }
}

HBase过滤器

作用

  • 过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端
  • 过滤器的类型很多,但是可以分为两大类:
    • 比较过滤器:可应用于rowkey、列簇、列、列值过滤器
    • 专用过滤器:只能适用于特定的过滤器

比较过滤器

比较运算符
  • LESS <

  • LESS_OR_EQUAL <=

  • EQUAL =

  • NOT_EQUAL <>

  • GREATER_OR_EQUAL >=

  • GREATER >

  • NO_OP 排除所有

常见的六大比较过滤器
BinaryComparator
  • 按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[])
BinaryPrefixComparator
  • 通BinaryComparator,只是比较左端前缀的数据是否相同
NullComparator
  • 判断给定的是否为空
BitComparator
  • 按位比较
RegexStringComparator
  • 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL
SubstringComparator
  • 判断提供的子串是否出现在中
示例代码
rowKey过滤器:RowFilter

通过RowFilter与BinaryComparator过滤比rowKey 1500100010小的所有值出来

    @Test
    // 通过RowFilter过滤比rowKey 1500100010 小的所有值出来
    public void BinaryComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        BinaryComparator binaryComparator = new BinaryComparator(Bytes.toBytes(1500100010));
        RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.LESS, binaryComparator);
        Scan scan = new Scan();
        scan.setFilter(rowFilter);
        ResultScanner scanner = students.getScanner(scan);
        Result rs = scanner.next();
        while (rs != null) {
            String id = Bytes.toString(rs.getRow());
            String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
            String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
            String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));

            System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");

            rs = scanner.next();
        }

    }
列簇过滤器:FamilyFilter

通过FamilyFilter与SubstringComparator查询列簇名包含in的所有列簇下面的数据

    @Test
    // 通过FamilyFilter查询列簇名包含in的所有列簇下面的数据
    public void SubstringComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        SubstringComparator substringComparator = new SubstringComparator("in");
        FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
        Scan scan = new Scan();
        scan.setFilter(familyFilter);
        ResultScanner scanner = students.getScanner(scan);
        Result rs = scanner.next();
        while (rs != null) {
            String id = Bytes.toString(rs.getRow());
            String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
            String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
            String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));

            System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");

            rs = scanner.next();
        }

    }

通过FamilyFilter与 BinaryPrefixComparator 过滤出列簇以info开头的列簇下的所有数据

// 通过FamilyFilter与 BinaryPrefixComparator 过滤出列簇以info开头的所有列簇下的所有数据
    @Test
    public void BinaryPrefixComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));

        // 二进制前缀比较器
        BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator("info".getBytes());
        // FamilyFilter 作用于列簇的过滤器
        FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);

        Scan scan = new Scan();

        scan.withStartRow("1500100001".getBytes());
        scan.withStopRow("1500100011".getBytes());
        // 通过setFilter方法设置过滤器
        scan.setFilter(familyFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);

    }
列过滤器:QualifierFilter

通过QualifierFilter与SubstringComparator查询列名包含in的列的值

    public void printRS(ResultScanner scanner) throws IOException {
        for (Result rs : scanner) {
            String rowkey = Bytes.toString(rs.getRow());
            System.out.println("当前行的rowkey为:" + rowkey);
            for (Cell cell : rs.listCells()) {
                String family = Bytes.toString(CellUtil.cloneFamily(cell));
                String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                byte[] bytes = CellUtil.cloneValue(cell);
                if ("age".equals(qualifier)) {
                    int value = Bytes.toInt(bytes);
                    System.out.println(family + ":" + qualifier + "的值为" + value);
                } else {
                    String value = Bytes.toString(bytes);
                    System.out.println(family + ":" + qualifier + "的值为" + value);
                }
            }
        }
    }

    @Test
    // 通过FamilyFilter查询列簇名包含in的所有列簇下面的数据
    public void SubstringComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        SubstringComparator substringComparator = new SubstringComparator("in");
        FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
        Scan scan = new Scan();
        scan.setFilter(familyFilter);
        ResultScanner scanner = students.getScanner(scan);
        Result rs = scanner.next();
        while (rs != null) {
            String id = Bytes.toString(rs.getRow());
            String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
            String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
            String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));

            System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");

            rs = scanner.next();
        }

    }

过滤出 列的名字 中 包含 “am” 所有的列 及列的值

    // 过滤出 列的名字 中 包含 "am" 所有的列 及列的值
    @Test
    public void SubstringComparatorQualifierFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));

        SubstringComparator substringComparator = new SubstringComparator("am");
        // 作用在列名上的过滤器
        QualifierFilter qualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
        Scan scan = new Scan();

        scan.withStartRow("1500100001".getBytes());
        scan.withStopRow("1500100011".getBytes());
        // 通过setFilter方法设置过滤器
        scan.setFilter(qualifierFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);

    }
列值过滤器:ValueFilter

通过ValueFilter与BinaryPrefixComparator过滤出所有的cell中值以 “张” 开头的学生

    @Test
    // 通过ValueFilter与BinaryPrefixComparator过滤出所有的cell中值以 "张" 开头的学生
    public void BinaryPrefixComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator("张".getBytes());
        ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);
        Scan scan = new Scan();
        scan.setFilter(valueFilter);
        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);

    }

过滤出文科的学生,只会返回clazz列,其他列的数据不符合条件,不会返回

    // 过滤出文科的学生
    // 只会返回clazz列,其他列的数据不符合条件,不会返回
    @Test
    public void RegexStringComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));

        // 使用正则表达式比较器
        RegexStringComparator regexStringComparator = new RegexStringComparator("^文科.*");
        // ValueFilter 会返回符合条件的cell,并不会返回整条数据
        ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, regexStringComparator);

        Scan scan = new Scan();

        scan.withStartRow("1500100001".getBytes());
        scan.withStopRow("1500100011".getBytes());
        // 通过setFilter方法设置过滤器
        scan.setFilter(valueFilter);

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);
    }

专用过滤器

单列值过滤器:SingleColumnValueFilter
  • SingleColumnValueFilter会返回满足条件的cell所在行的所有cell的值(即会返回一行数据)

通过SingleColumnValueFilter与查询文科班所有学生信息

    @Test
    // 通过SingleColumnValueFilter与查询文科班所有学生信息
    public void RegexStringComparatorFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
                "info".getBytes(),
                "clazz".getBytes(),
                CompareFilter.CompareOp.EQUAL,
                new RegexStringComparator("^文科.*")
        );

        Scan scan = new Scan();
        scan.setFilter(singleColumnValueFilter);
        ResultScanner scanner = students.getScanner(scan);

        Result rs = scanner.next();
        while (rs != null) {
            String id = Bytes.toString(rs.getRow());
            String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
            String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
            String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));

            System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");

            rs = scanner.next();
        }

    }
列值排除过滤器:SingleColumnValueExcludeFilter
  • 与SingleColumnValueFilter相反,会排除掉指定的列,其他的列全部返回

通过SingleColumnValueExcludeFilter与BinaryComparator查询文科一班所有学生信息,最终不返回clazz列

    @Test
    // 通过SingleColumnValueExcludeFilter与BinaryComparator查询文科一班所有学生信息,最终不返回clazz列
    public void RegexStringComparatorExcludeFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(
                "info".getBytes(),
                "clazz".getBytes(),
                CompareFilter.CompareOp.EQUAL,
                new BinaryComparator("文科一班".getBytes())
        );

        Scan scan = new Scan();
        scan.setFilter(singleColumnValueExcludeFilter);
        ResultScanner scanner = students.getScanner(scan);

        Result rs = scanner.next();
        while (rs != null) {
            String id = Bytes.toString(rs.getRow());
            String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
            String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
            // clazz列为空
            String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));

            System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");

            rs = scanner.next();
        }

    }
rowkey前缀过滤器:PrefixFilter

通过PrefixFilter查询以150010008开头的所有前缀的rowkey

    @Test
    // 通过PrefixFilter查询以150010008开头的所有前缀的rowkey
    public void PrefixFilterFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        PrefixFilter prefixFilter = new PrefixFilter("150010008".getBytes());
        Scan scan = new Scan();
        scan.setFilter(prefixFilter);
        ResultScanner scanner = students.getScanner(scan);
        Result rs = scanner.next();
        while (rs != null) {
            String id = Bytes.toString(rs.getRow());
            String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
            String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
            // clazz列为空
            String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));

            System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");

            rs = scanner.next();
        }
    }
分页过滤器PageFilter
  • 通过PageFilter查询第三页的数据,每页10条
  • 使用PageFilter分页效率比较低,每次都需要扫描前面的数据,直到扫描到所需要查的数据
  • 可设计一个合理的rowkey来实现分页需求
    @Test
    // 通过PageFilter查询第三页的数据,每页10条
    public void PageFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        int PageNum = 3;
        int PageSize = 10;
        Scan scan = new Scan();
        if (PageNum == 1) {
            scan.withStartRow("".getBytes());
            //使用分页过滤器,实现数据的分页
            PageFilter pageFilter = new PageFilter(PageSize);
            scan.setFilter(pageFilter);
            ResultScanner scanner = students.getScanner(scan);
            printRS(scanner);
        } else {
            String current_page_start_rows = "";
            int scanDatas = (PageNum - 1) * PageSize + 1;
            PageFilter pageFilter = new PageFilter(scanDatas);
            scan.setFilter(pageFilter);
            ResultScanner scanner = students.getScanner(scan);
            for (Result rs : scanner) {
                current_page_start_rows = Bytes.toString(rs.getRow());
            }
            scan.withStartRow(current_page_start_rows.getBytes());
            PageFilter pageFilter1 = new PageFilter(PageSize);
            scan.setFilter(pageFilter1);
            ResultScanner scanner1 = students.getScanner(scan);
            printRS(scanner1);

        }

    }

通过合理的设置rowkey来实现分页功能

    @Test
    // 通过合理的设置rowkey来实现分页功能,提高效率
    public void PageFilterTest2() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        int PageSize = 10;
        int PageNum = 3;

        int baseId = 1500100000;
        int start_row = baseId + (PageNum - 1) * PageSize + 1;
        int end_row = start_row + PageSize;
        Scan scan = new Scan();
        scan.withStartRow(String.valueOf(start_row).getBytes());
        scan.withStopRow(String.valueOf(end_row).getBytes());

        ResultScanner scanner = students.getScanner(scan);

        printRS(scanner);


    }

多过滤器综合查询

查询文科班中的学生中学号以150010008开头并且年龄小于23的学生信息

    @Test
    // 查询文科班中的学生中学号以150010008开头并且年龄小于23的学生信息
    public void FilterListFilter() throws IOException {
        Table students = conn.getTable(TableName.valueOf("students"));
        Scan scan = new Scan();
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
                "info".getBytes()
                , "clazz".getBytes()
                , CompareFilter.CompareOp.EQUAL
                , new RegexStringComparator("^文科.*"));
        PrefixFilter prefixFilter = new PrefixFilter("150010008".getBytes());
        SingleColumnValueFilter singleColumnValueFilter1 = new SingleColumnValueFilter(
                "info".getBytes()
                , "age".getBytes()
                , CompareFilter.CompareOp.LESS
                , new BinaryComparator(Bytes.toBytes(23)));

        FilterList filterList = new FilterList();
        filterList.addFilter(singleColumnValueFilter);
        filterList.addFilter(prefixFilter);
        filterList.addFilter(singleColumnValueFilter1);
        scan.setFilter(filterList);
        ResultScanner scanner = students.getScanner(scan);
        printRS(scanner);

    }

Phoenix

Phoenix搭建

Phoenix 4.15 HBase 1.4.6 hadoop 2.7.6

关闭hbase集群,在master中执行
stop-hbase.sh
上传解压配置环境变量

解压

tar -xvf apache-phoenix-4.15.0-HBase-1.4-bin.tar.gz 

改名

mv apache-phoenix-4.15.0-HBase-1.4-bin phoenix-4.15.0
将phoenix-4.15.0-HBase-1.4-server.jar复制到所有节点的hbase lib目录下
scp /usr/local/soft/phoenix-4.15.0/phoenix-4.15.0-HBase-1.4-server.jar master:/usr/local/soft/hbase-1.4.6/lib/

scp /usr/local/soft/phoenix-4.15.0/phoenix-4.15.0-HBase-1.4-server.jar node1:/usr/local/soft/hbase-1.4.6/lib/

scp /usr/local/soft/phoenix-4.15.0/phoenix-4.15.0-HBase-1.4-server.jar node2:/usr/local/soft/hbase-1.4.6/lib/

启动hbase , 在master中执行
start-hbase.sh
配置环境变量
vim /etc/profile
增加:export PHOENIX_HOME=/usr/local/soft/phoenix-4.15.0
	path:$PHOENIX_HOME/bin
source /etc/profile

Phoenix使用

连接sqlline
sqlline.py master,node1,node2

# 出现
163/163 (100%) Done
Done
sqlline version 1.5.0
0: jdbc:phoenix:master,node1,node2> 


常用命令
# 1、创建表

CREATE TABLE IF NOT EXISTS STUDENT (
 id VARCHAR NOT NULL PRIMARY KEY, 
 name VARCHAR,
 age BIGINT, 
 gender VARCHAR ,
 clazz VARCHAR
);

# 2、显示所有表
 !table

# 3、插入数据
upsert into STUDENT values('1500100004','葛德曜',24,'男','理科三班');
upsert into STUDENT values('1500100005','宣谷芹',24,'男','理科六班');
upsert into STUDENT values('1500100006','羿彦昌',24,'女','理科三班');


# 4、查询数据,支持大部分sql语法,
select * from STUDENT ;
select * from STUDENT where age=24;
select gender ,count(*) from STUDENT group by gender;
select * from student order by gender;

# 5、删除数据
delete from STUDENT where id='1500100004';


# 6、删除表
drop table STUDENT;
 
 
# 7、退出命令行
!quit

更多语法参照官网
https://phoenix.apache.org/language/index.html#upsert_select

phoenix映射
  • 默认情况下,直接在hbase中创建的表,通过phoenix是查看不到的

  • 如果需要在phoenix中操作直接在hbase中创建的表,则需要在phoenix中进行表的映射。映射方式有两种:视图映射和表映射

视图映射

Phoenix创建的视图是只读的,所以只能用来做查询,无法通过视图对源数据进行修改等操作

# hbase shell 进入hbase命令行
hbase shell 

# 创建hbase表
create 'test','name','company' 

# 插入数据
put 'test','001','name:firstname','zhangsan'
put 'test','001','name:lastname','zhangsan'
put 'test','001','company:name','数加'
put 'test','001','company:address','合肥'


# 在phoenix创建视图, primary key 对应到hbase中的rowkey

create view "test" (
empid varchar primary key,
"name"."firstname" varchar,
"name"."lastname"  varchar,
"company"."name"  varchar,
"company"."address" varchar
);



CREATE view "students" (
 id VARCHAR NOT NULL PRIMARY KEY, 
 "info"."name" VARCHAR,
 "info"."age" VARCHAR, 
 "info"."gender" VARCHAR ,
 "info"."clazz" VARCHAR
) column_encoded_bytes=0;

# 在phoenix查询数据,表名通过双引号引起来
select * from "test";

# 删除视图
drop view "test";

表映射

使用Apache Phoenix创建对HBase的表映射,有两类:

1) 当HBase中已经存在表时,可以以类似创建视图的方式创建关联表,只需要将create view改为create table即可。

2)当HBase中不存在表时,可以直接使用create table指令创建需要的表,并且在创建指令中可以根据需要对HBase表结构进行显示的说明。

第1)种情况下,如在之前的基础上已经存在了test表,则表映射的语句如下:

create table "test" (
empid varchar primary key,
"name"."firstname" varchar,
"name"."lastname"varchar,
"company"."name"  varchar,
"company"."address" varchar
)column_encoded_bytes=0;

upsert into  "test"  values('1','2','3','4','5');

CREATE table  "students" (
 id VARCHAR NOT NULL PRIMARY KEY, 
 "info"."name" VARCHAR,
 "info"."age" VARCHAR, 
 "info"."gender" VARCHAR ,
 "info"."clazz" VARCHAR
) column_encoded_bytes=0;

upsert into "students" values('1500110004','葛德曜','24','男','理科三班');

使用create table创建的关联表,如果对表进行了修改,源数据也会改变,同时如果关联表被删除,源表也会被删除。但是视图就不会,如果删除视图,源数据不会发生改变。

Phoenix二级索引

  • 建立行键与列值的映射关系

  • 对于Hbase,如果想精确定位到某行记录,唯一的办法就是通过rowkey查询。如果不通过rowkey查找数据,就必须逐行比较每一行的值,对于较大的表,全表扫描的代价是不可接受的。
    HBase详解

开启索引支持
# 关闭hbase集群
stop-hbase.sh

# 在/usr/local/soft/hbase-1.4.6/conf/hbase-site.xml中增加如下配置

<property>
  <name>hbase.regionserver.wal.codec</name>
  <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
<property>
    <name>hbase.rpc.timeout</name>
    <value>60000000</value>
</property>
<property>
    <name>hbase.client.scanner.timeout.period</name>
    <value>60000000</value>
</property>
<property>
    <name>phoenix.query.timeoutMs</name>
    <value>60000000</value>
</property>


# 同步到所有节点
scp hbase-site.xml node1:`pwd`
scp hbase-site.xml node2:`pwd`

# 修改phoenix目录下的bin目录中的hbase-site.xml
<property>
    <name>hbase.rpc.timeout</name>
    <value>60000000</value>
</property>
<property>
    <name>hbase.client.scanner.timeout.period</name>
    <value>60000000</value>
</property>
<property>
    <name>phoenix.query.timeoutMs</name>
    <value>60000000</value>
</property>


# 启动hbase
start-hbase.sh
# 重新进入phoenix客户端
sqlline.sql master,node1,node2
创建索引
全局索引

全局索引适合读多写少的场景。如果使用全局索引,读数据基本不损耗性能,所有的性能损耗都来源于写数据。数据表的添加、删除和修改都会更新相关的索引表(数据删除了,索引表中的数据也会删除;数据增加了,索引表的数据也会增加)

注意: 对于全局索引在默认情况下,在查询语句中检索的列如果不在索引表中,Phoenix不会使用索引表将,除非使用hint。

# 创建DIANXIN.sql
CREATE TABLE IF NOT EXISTS DIANXIN (
     mdn VARCHAR ,
     start_date VARCHAR ,
     end_date VARCHAR ,
     county VARCHAR,
     x DOUBLE ,
     y  DOUBLE,
     bsid VARCHAR,
     grid_id  VARCHAR,
     biz_type VARCHAR, 
     event_type VARCHAR , 
     data_source VARCHAR ,
     CONSTRAINT PK PRIMARY KEY (mdn,start_date)
) column_encoded_bytes=0;

# 上传数据DIANXIN.csv

# 导入数据
psql.py master,node1,node2 DIANXIN.sql DIANXIN.csv

# 创建全局索引
CREATE INDEX DIANXIN_INDEX ON DIANXIN ( end_date );

# 查询数据 ( 索引未生效)
select * from DIANXIN where end_date = '20180503154014';

# 强制使用索引 (索引生效) hint
select /*+ INDEX(DIANXIN DIANXIN_INDEX) */  * from DIANXIN where end_date = '20180503154014';

select /*+ INDEX(DIANXIN DIANXIN_INDEX) */  * from DIANXIN where end_date = '20180503154014'  and start_date = '20180503154614';

# 取索引列,(索引生效)
select end_date from DIANXIN where end_date = '20180503154014';

# 创建多列索引
CREATE INDEX DIANXIN_INDEX1 ON DIANXIN ( end_date,COUNTY );

# 多条件查询 (索引生效)
select end_date,MDN,COUNTY from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';

# 查询所有列 (索引未生效)
select  * from DIANXIN where end_date = '20180503154014'  and COUNTY = '8340104';

# 查询所有列 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX1) */ * from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';

# 单条件  (索引未生效)
select end_date from DIANXIN where  COUNTY = '8340103';

本地索引

本地索引适合写多读少的场景,或者存储空间有限的场景。和全局索引一样,Phoenix也会在查询的时候自动选择是否使用本地索引。本地索引因为索引数据和原数据存储在同一台机器上,避免网络数据传输的开销,所以更适合写多的场景。由于无法提前确定数据在哪个Region上,所以在读数据的时候,需要检查每个Region上的数据从而带来一些性能损耗。

注意:对于本地索引,查询中无论是否指定hint或者是查询的列是否都在索引表中,都会使用索引表。

@ 创建本地索引
CREATE LOCAL INDEX DIANXIN_LOCAL_IDEX ON DIANXIN(grid_id);

# 索引生效
select grid_id from dianxin where grid_id='117285031820040';

# 索引生效
select * from dianxin where grid_id='117285031820040';

全局索引与本地索引区别
  • 全局索引:读多写少, 会单独建立索引表

  • 本地索引:读少写多, 索引数据和原数据保存在同一台机器上

覆盖索引

覆盖索引是把原数据存储在索引数据表中,这样在查询时不需要再去HBase的原表获取数据就,直接返回查询结果。

注意:查询是 select 的列和 where 的列都需要在索引中出现。

# 创建覆盖索引
CREATE INDEX DIANXIN_INDEX_COVER ON DIANXIN ( x,y ) INCLUDE ( county );

# 查询所有列 (索引未生效)
select * from dianxin where x=117.288 and y =31.822;

# 强制使用索引 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ * from dianxin where x=117.288 and y =31.822;

# 查询索引中的列 (索引生效)
select mdn,x,y,county from dianxin where x=117.288 and y =31.822;

查询条件必须放在索引中  select 中的列可以放在INCLUDE (将数据保存在索引中)


select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ x,y,count(*) from dianxin group by x,y;

PhoenixJDBC

# 导入依赖

<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-core</artifactId>
    <version>4.15.0-HBase-1.4</version>
</dependency>


Connection conn = DriverManager.getConnection("jdbc:phoenix:master,node1,node2:2181");
Statement stat = conn.createStatement();
ResultSet rs = stat.executeQuery("select * from dianxin limit 10");
while(rs.next()){
	String mdn = rs.getString("mdn");
	System.out.println(mdn);
}
stat.close();
conn.close();


HBase SQL与Hive SQL执行流程

HBase详解

HBase的MapReduce过程

HBase详解#### 代码示例

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

public class Demo7MapReduceReadAndWriteHBase {
    // 读取HBase中的students表,统计每个班级的人数,将结果写回HBase clazz_num表
    public static class ReadHBaseMapper extends TableMapper<Text, IntWritable> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            String rowkey = Bytes.toString(key.get());
            String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));

            context.write(new Text(clazz), new IntWritable(1));
        }

    }

    public static class WriteHBaseReducer extends TableReducer<Text, IntWritable, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable i : values) {
                sum += i.get();
            }
            // create 'clazz_num','cf1'
            Put put = new Put(key.toString().getBytes());
            put.addColumn("cf1".getBytes(), "num".getBytes(), Bytes.toBytes(sum));

            context.write(NullWritable.get(), put);

        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master,node1,node2:2181");

        Job job = Job.getInstance(conf);
        job.setJobName(Demo7MapReduceReadAndWriteHBase.class.getName());
        job.setJarByClass(Demo7MapReduceReadAndWriteHBase.class);

        // 配置map任务
        // 使用HBase提供的TableMapReduceUtil工具进行配置
        TableMapReduceUtil.initTableMapperJob(TableName.valueOf("students")
                , new Scan()
                , ReadHBaseMapper.class
                , Text.class
                , IntWritable.class
                , job
        );

        // 配置reduce任务
        // 使用HBase提供的TableMapReduceUtil工具进行配置
        TableMapReduceUtil.initTableReducerJob("clazz_num", WriteHBaseReducer.class, job);

        job.waitForCompletion(true);
        /*
        hadoop jar HBaseJavaAPI11-1.0-jar-with-dependencies.jar  Demo7MapReduceReadAndWriteHBase
         */
    }
}

Hive关联Hbase表

# Hive整合HBase
create external table students_hbase
(
id string,
name string,
age string,
gender string, 
clazz string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" = "
:key,
info:name,
info:age,
info:gender,
info:clazz
")
tblproperties("hbase.table.name" = "default:students");

HBaseHA

//在node1,node2中启用备用HMaster
hbase-daemon.sh start master

HBase详解

HBase调优

Pre-Creating Regions(预分区)

  • 默认情况下,在创建HBase表的时候会自动创建一个region分区,当导入数据的时候, 所有的HBase客户端都向这一个region写数据,直到这个region足够大了才进行切分。 一种可以加快批量写入速度的方法是通过预先创建一些空的regions,这样当数据写入 HBase时,会按照region分区情况,在集群内做数据的负载均衡。
  • 如果知道hbase数据表的key的分布情况,就可以在建表的时候对hbase进行region的预分区。这样做的好处是防止大数据量插入的热点问题,提高数据插入的效率。
//在phoenix中
CREATE TABLE IF NOT EXISTS STUDENT (
 id VARCHAR NOT NULL PRIMARY KEY, 
 name VARCHAR,
 age BIGINT, 
 gender VARCHAR ,
 clazz VARCHAR
)split on('15001006|','15001007|','15001008|') ;
//在hbase中
create 'split_table_test', 'cf', SPLITS_FILE => 'region_split_info.txt'
create 'split_table_test', 'cf', SPLITS => ['a','e','r']

Rowkey设计

原则:
  • 唯一原则
  • 长度原则(10-100bytes 定长)
  • 散列原则
常用方式
反转

202133 -> 331202
202134 -> 431202
202135 -> 531202
202136 -> 631202
202137 -> 731202
202138 -> 831202
202139 -> 931202
202140 -> 041202

hash
md5 sha1

202133 -> 41DDBBCED55669818B2A40F4FED46F56
202134 -> 19D329403F02E2DA265CFC05D41FD253
202135 -> F6D06AEC4FB72A04F9CD4020BEF5E10F
202136 -> 0B512404B0411E623F64EC8981F8AE21

加上随机前缀
随机散列

第一次:202133 -> 41DDBBCED55669818B2A40F4FED46F56
第二次:202133 -> D55669818B2A40F4
第二次:202133 -> 02E2DA265CFC05D4

需求:查看某个时刻的数据
  • 倒序、hash、 但是不能用随机散列
需求:将最新的数据放到最前面
  • 大数减小数
  • 通常数据里有时间戳
  • 时间戳实际上是跟Long类型非常相似 一个很大的数
  • Long.MAX_VALUE - 值

大数:300000

202137 -> 300000 - 202137 = 97863
202138 -> 300000 - 202138 = 97862
202139 -> 300000 - 202139 = 97861
202140 -> 300000 - 202140 = 97860

加盐

  • 会在rowkey前面加上一个随机的前缀,

  • 优点:不需要知道rowkey的分步情况

  • 缺点:不能再hbase中对数据进行查询和修改

CREATE TABLE IF NOT EXISTS STUDENT (
 id VARCHAR NOT NULL PRIMARY KEY, 
 name VARCHAR,
 age BIGINT, 
 gender VARCHAR ,
 clazz VARCHAR
)salt_buckets=6;

inmemory

创建表的时候,可以通过HColumnDescriptor.setInMemory(true)将表放到 RegionServer的缓存中,保证在读取的时候被cache命中。

maxversion

创建表的时候,可以通过HColumnDescriptor.setMaxVersions(int maxVersions)设置 表中数据的最大版本,如果只需要保存最新版本的数据,那么可以设置 setMaxVersions(1)。

建立索引超时,查询超时

修改配置文件,hbase-site.xml

两个位置
/usr/local/soft/phoenix-4.15.0/bin
/usr/local/soft/hbase-1.4.6/conf/  所有节点

增加配置
<property>
    <name>hbase.rpc.timeout</name>
    <value>60000000</value>
</property>
<property>
    <name>hbase.client.scanner.timeout.period</name>
    <value>60000000</value>
</property>
<property>
    <name>phoenix.query.timeoutMs</name>
    <value>60000000</value>
</property>
需要重启hbase

Compact & Split

Minor Compaction:
  • 指选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,在这个过程中不会处理已经Deleted或Expired的Cell。一次 Minor Compaction 的结果是更少并且更大的StoreFile。
Major Compaction:
  • 指将所有的StoreFile合并成一个StoreFile,这个过程会清理三类没有意义的数据:被删除的数据、TTL过期数据、版本号超过设定版本号的数据。另外,一般情况下,major compaction时间会持续比较长,整个过程会消耗大量系统资源,对上层业务有比较大的影响。因此线上业务都会将关闭自动触发major compaction功能,改为手动在业务低峰期触发。

RegionSplit

  • region中存储的是一张表的数据,当region中的数据条数过多的时候,会直接影响查询效率。当region过大的时候,region会被 拆分为两个region,HMaster会将分裂的region分配到不同的regionserver上,这样可以让请求分散到不同的RegionServer 上,已达到负载均衡 , 这也是Hbase的一个优点 。
ConstantSizeRegionSplitPolicy
  • 0.94版本前,HBase region的默认切分策略

  • 当region中最大的store大小超过某个阈值(hbase.hregion.max.filesize=10G)之后就会触发切分,一个region等分为2个region。但是在生产线上这种切分策略却有相当大的弊端(切分策略对于大表和小表没有明显的区分):

  • 阈值(hbase.hregion.max.filesize)设置较大对大表比较友好,但是小表就有可能不会触发分裂,极端情况下可能就1个,形成热点,这对业务来说并不是什么好事。

  • 如果设置较小则对小表友好,但一个大表就会在整个集群产生大量的region,这对于集群的管理、资源使用、failover来说都不是一件好事。

IncreasingToUpperBoundRegionSplitPolicy
  • 0.94版本~2.0版本默认切分策略

  • 总体看和ConstantSizeRegionSplitPolicy思路相同,一个region中最大的store大小大于设置阈值就会触发切分。
    但是这个阈值并不像ConstantSizeRegionSplitPolicy是一个固定的值,而是会在一定条件下不断调整,调整规则和region所属表在当前regionserver上的region个数有关系.

region split阈值的计算公式是:

  • 设regioncount:是region所属表在当前regionserver上的region的个数

  • 阈值 = regioncount^3 * 128M * 2,当然阈值并不会无限增长,最大不超过MaxRegionFileSize(10G),当region中最大的store的大小达到该阈值的时候进行region split

例如:

  • 第一次split阈值 = 1^3 * 256 = 256MB
  • 第二次split阈值 = 2^3 * 256 = 2048MB
  • 第三次split阈值 = 3^3 * 256 = 6912MB
  • 第四次split阈值 = 4^3 * 256 = 16384MB > 10GB,因此取较小的值10GB
  • 后面每次split的size都是10GB了

特点

  • 相比ConstantSizeRegionSplitPolicy,可以自适应大表、小表;
  • 在集群规模比较大的情况下,对大表的表现比较优秀
  • 对小表不友好,小表可能产生大量的小region,分散在各regionserver上
  • 小表达不到多次切分条件,导致每个split都很小,所以分散在各个regionServer上
SteppingSplitPolicy
  • 2.0版本默认切分策略

  • ​ 相比 IncreasingToUpperBoundRegionSplitPolicy 简单了一些
    ​ region切分的阈值依然和待分裂region所属表在当前regionserver上的region个数有关系

  • 如果region个数等于1,切分阈值为flush size 128M * 2

  • 否则为MaxRegionFileSize。

  • 这种切分策略对于大集群中的大表、小表会比 IncreasingToUpperBoundRegionSplitPolicy 更加友好,小表不会再产生大量的小region,而是适可而止。

KeyPrefixRegionSplitPolicy

根据rowKey的前缀对数据进行分区,这里是指定rowKey的前多少位作为前缀,比如rowKey都是16位的,指定前5位是前缀,那么前5位相同的rowKey在相同的region中。

DelimitedKeyPrefixRegionSplitPolicy
  • 保证相同前缀的数据在同一个region中,例如rowKey的格式为:userid_eventtype_eventid,指定的delimiter为 _ ,则split的的时候会确保userid相同的数据在同一个region中。
  • 按照分隔符进行切分,而KeyPrefixRegionSplitPolicy是按照指定位数切分。
BusyRegionSplitPolicy
  • 按照一定的策略判断Region是不是Busy状态,如果是即进行切分

  • 如果你的系统常常会出现热点Region,而你对性能有很高的追求,那么这种策略可能会比较适合你。它会通过拆分热点Region来缓解热点Region的压力,但是根据热点来拆分Region也会带来很多不确定性因素,因为你也不知道下一个被拆分的Region是哪个。

DisabledRegionSplitPolicy

不启用自动拆分, 需要指定手动拆分

HBase BulkLoading

优点:

  1. 如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。

  2. 它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。

限制:

  1. 仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。

  2. HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群

代码

  1. 生成HFile部分
package com.shujia;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Demo10BulkLoading {
    public static class BulkLoadingMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] splits = value.toString().split(",");
            String mdn = splits[0];
            String start_time = splits[1];
            // 经度
            String longitude = splits[4];
            // 维度
            String latitude = splits[5];

            String rowkey = mdn + "_" + start_time;

            KeyValue lg = new KeyValue(rowkey.getBytes(), "info".getBytes(), "lg".getBytes(), longitude.getBytes());
            KeyValue lt = new KeyValue(rowkey.getBytes(), "info".getBytes(), "lt".getBytes(), latitude.getBytes());

            context.write(new ImmutableBytesWritable(rowkey.getBytes()), lg);
            context.write(new ImmutableBytesWritable(rowkey.getBytes()), lt);

        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");


        // 创建Job实例
        Job job = Job.getInstance(conf);
        job.setJarByClass(Demo10BulkLoading.class);
        job.setJobName("Demo10BulkLoading");

        // 保证全局有序
        job.setPartitionerClass(SimpleTotalOrderPartitioner.class);

        // 设置reduce个数
        job.setNumReduceTasks(4);
        // 配置map任务
        job.setMapperClass(BulkLoadingMapper.class);

        // 配置reduce任务
        // KeyValueSortReducer 保证在每个Reduce有序
        job.setReducerClass(KeyValueSortReducer.class);

        // 输入输出路径
        FileInputFormat.addInputPath(job, new Path("/data/DIANXIN/"));
        FileOutputFormat.setOutputPath(job, new Path("/data/hfile"));

        // 创建HBase连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // create 'dianxin_bulk','info'
        // 获取dianxin_bulk 表
        Table dianxin_bulk = conn.getTable(TableName.valueOf("dianxin_bulk"));
        // 获取dianxin_bulk 表 region定位器
        RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf("dianxin_bulk"));
        // 使用HFileOutputFormat2将输出的数据按照HFile的形式格式化
        HFileOutputFormat2.configureIncrementalLoad(job, dianxin_bulk, regionLocator);

        // 等到MapReduce任务执行完成
        job.waitForCompletion(true);

        // 加载HFile到 dianxin_bulk 中
        LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
        load.doBulkLoad(new Path("/data/hfile"), conn.getAdmin(), dianxin_bulk, regionLocator);

        /**
         *  create 'dianxin_bulk','info'
         *  hadoop jar HBaseJavaAPI10-1.0-jar-with-dependencies.jar com.shujia.Demo10BulkLoading
         */
    }
}

说明

  1. 最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。

  2. 最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。

  3. MR例子中HFileOutputFormat2.configureIncrementalLoad(job, dianxin_bulk, regionLocator);自动对job进行配置。SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。

  4. MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了,但不能直接使用mv命令移动,因为直接移动不能更新HBase的元数据。

  5. HFile入库到HBase通过HBase中 LoadIncrementalHFiles的doBulkLoad方法,对生成的HFile文件入库

思维导图

HBase详解

上一篇:中文字符的字节数


下一篇:(六)namespace