文章目录
HBase简介
HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩、实时读写的分布式数据库
利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为其分布式协同服务,主要用来存储非结构化和半结构化的松散数据(列存 NoSQL 数据库)。
HBase特点
-
大:一个表可以有上亿行,上百万列。
-
面向列:面向列表(簇)的存储和权限控制,列(簇)独立检索。
-
稀疏:对于为空(NULL)的列,并不占用存储空间,因此,表可以设计的非常稀疏。
-
无模式:每一行都有一个可以排序的主键和任意多的列,列可以根据需要动态增 加,同一张表中不同的行可以有截然不同的列。
-
数据多版本:每个单元中的数据可以有多个版本,默认情况下,版本号自动分配, 版本号就是单元格插入时的时间戳。
-
数据类型单一:HBase中的数据都是字节数组,没有类型。
HBase架构
Master
- 为Region server分配region
- 负责Region server的负载均衡
- 发现失效的Region server并重新分配其上的region
- 管理用户对table的增删改操作
RegionServer
- Region server维护region,处理对这些region的IO请求
- Region server负责切分在运行过程中变得过大的region
Region
-
HBase自动把表水平划分成多个区域(region),每个region会保存一个表里面某段连续的数据;每个表一开始只有一个region,随着数据不断插入表,region不断增大,当增大到一个阀值的时候,region就会等分会两个新的region(裂变)。
-
当table中的行不断增多,就会有越来越多的region。这样一张完整的表被保存在多个Regionserver 上。
Memstore&Storefile
-
一个region由多个store组成,一个store对应一个CF(列族)store包括位于内存中的memstore和位于磁盘的storefile写操作先写入memstore,当memstore中的数据达到某个阈值,hregionserver会启动flashcache进程写入storefile,每次写入形成单独的一个storefile
-
当storefile文件的数量增长到一定阈值后,系统会进行合并(minor、major compaction),在合并过程中会进行版本合并和删除工作(majar),形成更大的storefile
-
当一个region所有storefile的大小和数量超过一定阈值后,会把当前的region分割为两个,并由hmaster分配到相应的regionserver服务器,实现负载均衡
-
客户端检索数据,先在memstore找,找不到再找storefile
HLog
-
HLog文件就是一个普通的Hadoop Sequence File,Sequence File 的Key是HLogKey对象,HLogKey中记录了写入数据的归属信息,除了table和region名字外,同时还包括 sequence number和timestamp,timestamp是”写入时间”,sequence number的起始值为0,或者是最近一次存入文件系统中sequence number。
-
HLog SequeceFile的Value是HBase的KeyValue对象,即对应HFile中的KeyValue。
HBase数据模型
RowKey(行键)
-
唯一标识一行数据
-
可以通过RowKey获取一行数据
-
按照字典顺序排序的。
-
Row key只能存储64k的字节数据 10-100byte
ColumnFamily&Qualifier(列簇和列)
-
HBase表中的每个列都归属于某个列族,列族必须作为表模式(schema)定义的一部分预先给出。如 create ‘test’, ‘course’。
-
列名以列族作为前缀,每个“列族”都可以有多个列成员(column);如course:math, course:english, 新的列族成员(列)可以随后按需、动态加入。
-
权限控制、存储以及调优都是在列族层面进行的;
-
HBase把同一列族里面的数据存储在同一目录下,由几个文件保存。
TimeStamp(时间戳)
-
在HBase每个cell存储单元对同一份数据有多个版本,根据唯一的时间戳来区分每个版本之间的差异,不同版本的数据按照时间倒序排序,最新的数据版本排在最前面。
-
时间戳的类型是 64位整型。
-
时间戳可以由HBase(在数据写入时自动)赋值,此时时间戳是精确到毫秒的当前系统时间
-
时间戳也可以由客户显式赋值,如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳。
Cell(存储单元)
-
由行和列的坐标交叉决定。
-
单元格是有版本的。
-
单元格的内容是未解析的字节数组。
-
由{row key, column( = +), version} 唯一确定的单元。cell中的数据是没有类型的,全部是字节码形式存贮。
HBase读写流程
HBase Shell
HBaseAPI
常用java类
java类 | HBase数据模型 |
---|---|
Admin / HBaseAdmin/ HBaseConfiguration | 数据库 |
HTable/HTableDescriptor | 表 |
HColumnDescriptor | 列簇 |
Put/Delete/Get/Scan/ResultScanner/ | 列 |
CellUtil | 存储单元 |
示例代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
public class Demo2API {
Configuration conf = null;
Connection conn = null;
@Before
public void init() {
conf = HBaseConfiguration.create();
// 其他配置参考http://hbase.apache.org/1.4/book.html#config.files
conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");
try {
conn = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
// 创建表
@Test
public void create_table() throws IOException {
Admin admin = conn.getAdmin();
// admin 相当于HBase的管理员
// 创建表 传入表名(TableName.valueOf())
HTableDescriptor tableName = new HTableDescriptor(TableName.valueOf("tableName"));
// 创建列簇 传入列簇名
HColumnDescriptor cf1 = new HColumnDescriptor("cf1");
// 对列簇进行一些配置
cf1.setMaxVersions(5); // 设置版本号
cf1.setTimeToLive(30); // 设置TTL时间
// 将创建好的列簇加入表
tableName.addFamily(cf1);
// 使用admin对象创建表
admin.createTable(tableName);
}
// 删除表
@Test
public void drop_table() throws IOException {
Admin admin = conn.getAdmin();
String tableName = "tableName";
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
}
// 添加一条数据
@Test
public void put() throws IOException {
// 如果想要插入数据 查询数据 需要使用Table对象
// 如果需要对表进行修改,获取表的一些配置、结构 使用HTableDescriptor对象
Table testJavaAPI = conn.getTable(TableName.valueOf("testJavaAPI"));
Put put = new Put("00001".getBytes());
put.addColumn("cf1".getBytes(), "name".getBytes(), "zhangSan".getBytes());
testJavaAPI.put(put);
}
// 获取一条数据
@Test
public void get() throws IOException {
Table testJavaAPI = conn.getTable(TableName.valueOf("testJavaAPI"));
Get get = new Get("00001".getBytes());
Result rs = testJavaAPI.get(get);
byte[] value = rs.getValue("cf1".getBytes(), "name".getBytes());
System.out.println(Bytes.toString(value));
}
// 指定rowkey范围 扫描表
@Test
public void scan() throws IOException {
Table testJavaAPI = conn.getTable(TableName.valueOf("test1"));
Scan scan = new Scan();
// 包含startRow 不包含 endRow
scan.withStartRow("001".getBytes());
scan.withStopRow("007".getBytes());
ResultScanner scanner = testJavaAPI.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
byte[] row = rs.getRow();// 获取rowkey
String rk = Bytes.toString(row);
System.out.println();
if ("001".equals(rk)) {
byte[] value = rs.getValue("cf1".getBytes(), "name".getBytes());
System.out.println(Bytes.toString(value));
} else if ("002".equals(rk)) {
byte[] value = rs.getValue("cf1".getBytes(), "name0".getBytes());
System.out.println(Bytes.toString(value));
byte[] value1 = rs.getValue("cf1".getBytes(), "name1".getBytes());
System.out.println(Bytes.toString(value1));
byte[] value2 = rs.getValue("cf1".getBytes(), "name100".getBytes());
System.out.println(Bytes.toString(value2));
byte[] value3 = rs.getValue("cf1".getBytes(), "name2".getBytes());
System.out.println(Bytes.toString(value3));
byte[] value4 = rs.getValue("cf1".getBytes(), "name3".getBytes());
System.out.println(Bytes.toString(value4));
byte[] value5 = rs.getValue("cf1".getBytes(), "name4".getBytes());
System.out.println(Bytes.toString(value5));
byte[] value6 = rs.getValue("cf1".getBytes(), "name5".getBytes());
System.out.println(Bytes.toString(value6));
} else if ("007".equals(rk)) {
byte[] value6 = rs.getValue("cf1".getBytes(), "name".getBytes());
System.out.println(Bytes.toString(value6));
byte[] value7 = rs.getValue("cf1".getBytes(), "age1".getBytes());
System.out.println(Bytes.toString(value7));
}
rs = scanner.next();
}
}
@Test
public void cellUtil() throws IOException {
Table testJavaAPI = conn.getTable(TableName.valueOf("test1"));
Scan scan = new Scan();
// 包含startRow 不包含 endRow
scan.withStartRow("001".getBytes());
scan.withStopRow("007".getBytes());
ResultScanner scanner = testJavaAPI.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
for (Cell cell : rs.listCells()) {
byte[] rk = CellUtil.cloneRow(cell);
byte[] cf = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
byte[] value = CellUtil.cloneValue(cell);
System.out.println("rowkey:" + Bytes.toString(rk) + ",columnsFamily:" + Bytes.toString(cf) + ",qualifier:" + Bytes.toString(qualifier) + ",value:" + Bytes.toString(value));
}
rs = scanner.next();
}
}
// 读取文件并写入HBase
@Test
public void putAll() throws IOException {
Admin admin = conn.getAdmin();
// 判断表是否存在,不存在即创建
if (!admin.tableExists(TableName.valueOf("students"))) {
HTableDescriptor students = new HTableDescriptor(TableName.valueOf("students"));
HColumnDescriptor info = new HColumnDescriptor("info");
students.addFamily(info);
admin.createTable(students);
}
Table students = conn.getTable(TableName.valueOf("students"));
// 读取文件
BufferedReader br = new BufferedReader(new FileReader("data/students.txt"));
String line = br.readLine();
while (line != null) {
String[] splits = line.split(",");
String rk = splits[0];
String name = splits[1];
String age = splits[2];
String gender = splits[3];
String clazz = splits[4];
Put put = new Put(Bytes.toBytes(rk));
put.addColumn("info".getBytes(), "name".getBytes(), name.getBytes());
put.addColumn("info".getBytes(), "age".getBytes(), age.getBytes());
put.addColumn("info".getBytes(), "gender".getBytes(), gender.getBytes());
put.addColumn("info".getBytes(), "clazz".getBytes(), clazz.getBytes());
students.put(put);
line = br.readLine();
}
br.close();
}
@After
public void closeAll() throws IOException {
if (conn != null) {
conn.close();
}
}
}
HBase过滤器
作用
- 过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端
- 过滤器的类型很多,但是可以分为两大类:
- 比较过滤器:可应用于rowkey、列簇、列、列值过滤器
- 专用过滤器:只能适用于特定的过滤器
比较过滤器
比较运算符
-
LESS <
-
LESS_OR_EQUAL <=
-
EQUAL =
-
NOT_EQUAL <>
-
GREATER_OR_EQUAL >=
-
GREATER >
-
NO_OP 排除所有
常见的六大比较过滤器
BinaryComparator
- 按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[])
BinaryPrefixComparator
- 通BinaryComparator,只是比较左端前缀的数据是否相同
NullComparator
- 判断给定的是否为空
BitComparator
- 按位比较
RegexStringComparator
- 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL
SubstringComparator
- 判断提供的子串是否出现在中
示例代码
rowKey过滤器:RowFilter
通过RowFilter与BinaryComparator过滤比rowKey 1500100010小的所有值出来
@Test
// 通过RowFilter过滤比rowKey 1500100010 小的所有值出来
public void BinaryComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
BinaryComparator binaryComparator = new BinaryComparator(Bytes.toBytes(1500100010));
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.LESS, binaryComparator);
Scan scan = new Scan();
scan.setFilter(rowFilter);
ResultScanner scanner = students.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
String id = Bytes.toString(rs.getRow());
String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));
System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");
rs = scanner.next();
}
}
列簇过滤器:FamilyFilter
通过FamilyFilter与SubstringComparator查询列簇名包含in的所有列簇下面的数据
@Test
// 通过FamilyFilter查询列簇名包含in的所有列簇下面的数据
public void SubstringComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
SubstringComparator substringComparator = new SubstringComparator("in");
FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
Scan scan = new Scan();
scan.setFilter(familyFilter);
ResultScanner scanner = students.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
String id = Bytes.toString(rs.getRow());
String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));
System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");
rs = scanner.next();
}
}
通过FamilyFilter与 BinaryPrefixComparator 过滤出列簇以info开头的列簇下的所有数据
// 通过FamilyFilter与 BinaryPrefixComparator 过滤出列簇以info开头的所有列簇下的所有数据
@Test
public void BinaryPrefixComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
// 二进制前缀比较器
BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator("info".getBytes());
// FamilyFilter 作用于列簇的过滤器
FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);
Scan scan = new Scan();
scan.withStartRow("1500100001".getBytes());
scan.withStopRow("1500100011".getBytes());
// 通过setFilter方法设置过滤器
scan.setFilter(familyFilter);
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
}
列过滤器:QualifierFilter
通过QualifierFilter与SubstringComparator查询列名包含in的列的值
public void printRS(ResultScanner scanner) throws IOException {
for (Result rs : scanner) {
String rowkey = Bytes.toString(rs.getRow());
System.out.println("当前行的rowkey为:" + rowkey);
for (Cell cell : rs.listCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
byte[] bytes = CellUtil.cloneValue(cell);
if ("age".equals(qualifier)) {
int value = Bytes.toInt(bytes);
System.out.println(family + ":" + qualifier + "的值为" + value);
} else {
String value = Bytes.toString(bytes);
System.out.println(family + ":" + qualifier + "的值为" + value);
}
}
}
}
@Test
// 通过FamilyFilter查询列簇名包含in的所有列簇下面的数据
public void SubstringComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
SubstringComparator substringComparator = new SubstringComparator("in");
FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
Scan scan = new Scan();
scan.setFilter(familyFilter);
ResultScanner scanner = students.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
String id = Bytes.toString(rs.getRow());
String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));
System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");
rs = scanner.next();
}
}
过滤出 列的名字 中 包含 “am” 所有的列 及列的值
// 过滤出 列的名字 中 包含 "am" 所有的列 及列的值
@Test
public void SubstringComparatorQualifierFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
SubstringComparator substringComparator = new SubstringComparator("am");
// 作用在列名上的过滤器
QualifierFilter qualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
Scan scan = new Scan();
scan.withStartRow("1500100001".getBytes());
scan.withStopRow("1500100011".getBytes());
// 通过setFilter方法设置过滤器
scan.setFilter(qualifierFilter);
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
}
列值过滤器:ValueFilter
通过ValueFilter与BinaryPrefixComparator过滤出所有的cell中值以 “张” 开头的学生
@Test
// 通过ValueFilter与BinaryPrefixComparator过滤出所有的cell中值以 "张" 开头的学生
public void BinaryPrefixComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator("张".getBytes());
ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);
Scan scan = new Scan();
scan.setFilter(valueFilter);
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
}
过滤出文科的学生,只会返回clazz列,其他列的数据不符合条件,不会返回
// 过滤出文科的学生
// 只会返回clazz列,其他列的数据不符合条件,不会返回
@Test
public void RegexStringComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
// 使用正则表达式比较器
RegexStringComparator regexStringComparator = new RegexStringComparator("^文科.*");
// ValueFilter 会返回符合条件的cell,并不会返回整条数据
ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, regexStringComparator);
Scan scan = new Scan();
scan.withStartRow("1500100001".getBytes());
scan.withStopRow("1500100011".getBytes());
// 通过setFilter方法设置过滤器
scan.setFilter(valueFilter);
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
}
专用过滤器
单列值过滤器:SingleColumnValueFilter
- SingleColumnValueFilter会返回满足条件的cell所在行的所有cell的值(即会返回一行数据)
通过SingleColumnValueFilter与查询文科班所有学生信息
@Test
// 通过SingleColumnValueFilter与查询文科班所有学生信息
public void RegexStringComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
"info".getBytes(),
"clazz".getBytes(),
CompareFilter.CompareOp.EQUAL,
new RegexStringComparator("^文科.*")
);
Scan scan = new Scan();
scan.setFilter(singleColumnValueFilter);
ResultScanner scanner = students.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
String id = Bytes.toString(rs.getRow());
String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));
System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");
rs = scanner.next();
}
}
列值排除过滤器:SingleColumnValueExcludeFilter
- 与SingleColumnValueFilter相反,会排除掉指定的列,其他的列全部返回
通过SingleColumnValueExcludeFilter与BinaryComparator查询文科一班所有学生信息,最终不返回clazz列
@Test
// 通过SingleColumnValueExcludeFilter与BinaryComparator查询文科一班所有学生信息,最终不返回clazz列
public void RegexStringComparatorExcludeFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(
"info".getBytes(),
"clazz".getBytes(),
CompareFilter.CompareOp.EQUAL,
new BinaryComparator("文科一班".getBytes())
);
Scan scan = new Scan();
scan.setFilter(singleColumnValueExcludeFilter);
ResultScanner scanner = students.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
String id = Bytes.toString(rs.getRow());
String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
// clazz列为空
String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));
System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");
rs = scanner.next();
}
}
rowkey前缀过滤器:PrefixFilter
通过PrefixFilter查询以150010008开头的所有前缀的rowkey
@Test
// 通过PrefixFilter查询以150010008开头的所有前缀的rowkey
public void PrefixFilterFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
PrefixFilter prefixFilter = new PrefixFilter("150010008".getBytes());
Scan scan = new Scan();
scan.setFilter(prefixFilter);
ResultScanner scanner = students.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
String id = Bytes.toString(rs.getRow());
String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
// clazz列为空
String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));
System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");
rs = scanner.next();
}
}
分页过滤器PageFilter
- 通过PageFilter查询第三页的数据,每页10条
- 使用PageFilter分页效率比较低,每次都需要扫描前面的数据,直到扫描到所需要查的数据
- 可设计一个合理的rowkey来实现分页需求
@Test
// 通过PageFilter查询第三页的数据,每页10条
public void PageFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
int PageNum = 3;
int PageSize = 10;
Scan scan = new Scan();
if (PageNum == 1) {
scan.withStartRow("".getBytes());
//使用分页过滤器,实现数据的分页
PageFilter pageFilter = new PageFilter(PageSize);
scan.setFilter(pageFilter);
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
} else {
String current_page_start_rows = "";
int scanDatas = (PageNum - 1) * PageSize + 1;
PageFilter pageFilter = new PageFilter(scanDatas);
scan.setFilter(pageFilter);
ResultScanner scanner = students.getScanner(scan);
for (Result rs : scanner) {
current_page_start_rows = Bytes.toString(rs.getRow());
}
scan.withStartRow(current_page_start_rows.getBytes());
PageFilter pageFilter1 = new PageFilter(PageSize);
scan.setFilter(pageFilter1);
ResultScanner scanner1 = students.getScanner(scan);
printRS(scanner1);
}
}
通过合理的设置rowkey来实现分页功能
@Test
// 通过合理的设置rowkey来实现分页功能,提高效率
public void PageFilterTest2() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
int PageSize = 10;
int PageNum = 3;
int baseId = 1500100000;
int start_row = baseId + (PageNum - 1) * PageSize + 1;
int end_row = start_row + PageSize;
Scan scan = new Scan();
scan.withStartRow(String.valueOf(start_row).getBytes());
scan.withStopRow(String.valueOf(end_row).getBytes());
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
}
多过滤器综合查询
查询文科班中的学生中学号以150010008开头并且年龄小于23的学生信息
@Test
// 查询文科班中的学生中学号以150010008开头并且年龄小于23的学生信息
public void FilterListFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
Scan scan = new Scan();
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
"info".getBytes()
, "clazz".getBytes()
, CompareFilter.CompareOp.EQUAL
, new RegexStringComparator("^文科.*"));
PrefixFilter prefixFilter = new PrefixFilter("150010008".getBytes());
SingleColumnValueFilter singleColumnValueFilter1 = new SingleColumnValueFilter(
"info".getBytes()
, "age".getBytes()
, CompareFilter.CompareOp.LESS
, new BinaryComparator(Bytes.toBytes(23)));
FilterList filterList = new FilterList();
filterList.addFilter(singleColumnValueFilter);
filterList.addFilter(prefixFilter);
filterList.addFilter(singleColumnValueFilter1);
scan.setFilter(filterList);
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
}
Phoenix
Phoenix搭建
Phoenix 4.15 HBase 1.4.6 hadoop 2.7.6
关闭hbase集群,在master中执行
stop-hbase.sh
上传解压配置环境变量
解压
tar -xvf apache-phoenix-4.15.0-HBase-1.4-bin.tar.gz
改名
mv apache-phoenix-4.15.0-HBase-1.4-bin phoenix-4.15.0
将phoenix-4.15.0-HBase-1.4-server.jar复制到所有节点的hbase lib目录下
scp /usr/local/soft/phoenix-4.15.0/phoenix-4.15.0-HBase-1.4-server.jar master:/usr/local/soft/hbase-1.4.6/lib/
scp /usr/local/soft/phoenix-4.15.0/phoenix-4.15.0-HBase-1.4-server.jar node1:/usr/local/soft/hbase-1.4.6/lib/
scp /usr/local/soft/phoenix-4.15.0/phoenix-4.15.0-HBase-1.4-server.jar node2:/usr/local/soft/hbase-1.4.6/lib/
启动hbase , 在master中执行
start-hbase.sh
配置环境变量
vim /etc/profile
增加:export PHOENIX_HOME=/usr/local/soft/phoenix-4.15.0
path:$PHOENIX_HOME/bin
source /etc/profile
Phoenix使用
连接sqlline
sqlline.py master,node1,node2
# 出现
163/163 (100%) Done
Done
sqlline version 1.5.0
0: jdbc:phoenix:master,node1,node2>
常用命令
# 1、创建表
CREATE TABLE IF NOT EXISTS STUDENT (
id VARCHAR NOT NULL PRIMARY KEY,
name VARCHAR,
age BIGINT,
gender VARCHAR ,
clazz VARCHAR
);
# 2、显示所有表
!table
# 3、插入数据
upsert into STUDENT values('1500100004','葛德曜',24,'男','理科三班');
upsert into STUDENT values('1500100005','宣谷芹',24,'男','理科六班');
upsert into STUDENT values('1500100006','羿彦昌',24,'女','理科三班');
# 4、查询数据,支持大部分sql语法,
select * from STUDENT ;
select * from STUDENT where age=24;
select gender ,count(*) from STUDENT group by gender;
select * from student order by gender;
# 5、删除数据
delete from STUDENT where id='1500100004';
# 6、删除表
drop table STUDENT;
# 7、退出命令行
!quit
更多语法参照官网
https://phoenix.apache.org/language/index.html#upsert_select
phoenix映射
-
默认情况下,直接在hbase中创建的表,通过phoenix是查看不到的
-
如果需要在phoenix中操作直接在hbase中创建的表,则需要在phoenix中进行表的映射。映射方式有两种:视图映射和表映射
视图映射
Phoenix创建的视图是只读的,所以只能用来做查询,无法通过视图对源数据进行修改等操作
# hbase shell 进入hbase命令行
hbase shell
# 创建hbase表
create 'test','name','company'
# 插入数据
put 'test','001','name:firstname','zhangsan'
put 'test','001','name:lastname','zhangsan'
put 'test','001','company:name','数加'
put 'test','001','company:address','合肥'
# 在phoenix创建视图, primary key 对应到hbase中的rowkey
create view "test" (
empid varchar primary key,
"name"."firstname" varchar,
"name"."lastname" varchar,
"company"."name" varchar,
"company"."address" varchar
);
CREATE view "students" (
id VARCHAR NOT NULL PRIMARY KEY,
"info"."name" VARCHAR,
"info"."age" VARCHAR,
"info"."gender" VARCHAR ,
"info"."clazz" VARCHAR
) column_encoded_bytes=0;
# 在phoenix查询数据,表名通过双引号引起来
select * from "test";
# 删除视图
drop view "test";
表映射
使用Apache Phoenix创建对HBase的表映射,有两类:
1) 当HBase中已经存在表时,可以以类似创建视图的方式创建关联表,只需要将create view改为create table即可。
2)当HBase中不存在表时,可以直接使用create table指令创建需要的表,并且在创建指令中可以根据需要对HBase表结构进行显示的说明。
第1)种情况下,如在之前的基础上已经存在了test表,则表映射的语句如下:
create table "test" (
empid varchar primary key,
"name"."firstname" varchar,
"name"."lastname"varchar,
"company"."name" varchar,
"company"."address" varchar
)column_encoded_bytes=0;
upsert into "test" values('1','2','3','4','5');
CREATE table "students" (
id VARCHAR NOT NULL PRIMARY KEY,
"info"."name" VARCHAR,
"info"."age" VARCHAR,
"info"."gender" VARCHAR ,
"info"."clazz" VARCHAR
) column_encoded_bytes=0;
upsert into "students" values('1500110004','葛德曜','24','男','理科三班');
使用create table创建的关联表,如果对表进行了修改,源数据也会改变,同时如果关联表被删除,源表也会被删除。但是视图就不会,如果删除视图,源数据不会发生改变。
Phoenix二级索引
-
建立行键与列值的映射关系
-
对于Hbase,如果想精确定位到某行记录,唯一的办法就是通过rowkey查询。如果不通过rowkey查找数据,就必须逐行比较每一行的值,对于较大的表,全表扫描的代价是不可接受的。
开启索引支持
# 关闭hbase集群
stop-hbase.sh
# 在/usr/local/soft/hbase-1.4.6/conf/hbase-site.xml中增加如下配置
<property>
<name>hbase.regionserver.wal.codec</name>
<value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
<property>
<name>hbase.rpc.timeout</name>
<value>60000000</value>
</property>
<property>
<name>hbase.client.scanner.timeout.period</name>
<value>60000000</value>
</property>
<property>
<name>phoenix.query.timeoutMs</name>
<value>60000000</value>
</property>
# 同步到所有节点
scp hbase-site.xml node1:`pwd`
scp hbase-site.xml node2:`pwd`
# 修改phoenix目录下的bin目录中的hbase-site.xml
<property>
<name>hbase.rpc.timeout</name>
<value>60000000</value>
</property>
<property>
<name>hbase.client.scanner.timeout.period</name>
<value>60000000</value>
</property>
<property>
<name>phoenix.query.timeoutMs</name>
<value>60000000</value>
</property>
# 启动hbase
start-hbase.sh
# 重新进入phoenix客户端
sqlline.sql master,node1,node2
创建索引
全局索引
全局索引适合读多写少的场景。如果使用全局索引,读数据基本不损耗性能,所有的性能损耗都来源于写数据。数据表的添加、删除和修改都会更新相关的索引表(数据删除了,索引表中的数据也会删除;数据增加了,索引表的数据也会增加)
注意: 对于全局索引在默认情况下,在查询语句中检索的列如果不在索引表中,Phoenix不会使用索引表将,除非使用hint。
# 创建DIANXIN.sql
CREATE TABLE IF NOT EXISTS DIANXIN (
mdn VARCHAR ,
start_date VARCHAR ,
end_date VARCHAR ,
county VARCHAR,
x DOUBLE ,
y DOUBLE,
bsid VARCHAR,
grid_id VARCHAR,
biz_type VARCHAR,
event_type VARCHAR ,
data_source VARCHAR ,
CONSTRAINT PK PRIMARY KEY (mdn,start_date)
) column_encoded_bytes=0;
# 上传数据DIANXIN.csv
# 导入数据
psql.py master,node1,node2 DIANXIN.sql DIANXIN.csv
# 创建全局索引
CREATE INDEX DIANXIN_INDEX ON DIANXIN ( end_date );
# 查询数据 ( 索引未生效)
select * from DIANXIN where end_date = '20180503154014';
# 强制使用索引 (索引生效) hint
select /*+ INDEX(DIANXIN DIANXIN_INDEX) */ * from DIANXIN where end_date = '20180503154014';
select /*+ INDEX(DIANXIN DIANXIN_INDEX) */ * from DIANXIN where end_date = '20180503154014' and start_date = '20180503154614';
# 取索引列,(索引生效)
select end_date from DIANXIN where end_date = '20180503154014';
# 创建多列索引
CREATE INDEX DIANXIN_INDEX1 ON DIANXIN ( end_date,COUNTY );
# 多条件查询 (索引生效)
select end_date,MDN,COUNTY from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';
# 查询所有列 (索引未生效)
select * from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';
# 查询所有列 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX1) */ * from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';
# 单条件 (索引未生效)
select end_date from DIANXIN where COUNTY = '8340103';
本地索引
本地索引适合写多读少的场景,或者存储空间有限的场景。和全局索引一样,Phoenix也会在查询的时候自动选择是否使用本地索引。本地索引因为索引数据和原数据存储在同一台机器上,避免网络数据传输的开销,所以更适合写多的场景。由于无法提前确定数据在哪个Region上,所以在读数据的时候,需要检查每个Region上的数据从而带来一些性能损耗。
注意:对于本地索引,查询中无论是否指定hint或者是查询的列是否都在索引表中,都会使用索引表。
@ 创建本地索引
CREATE LOCAL INDEX DIANXIN_LOCAL_IDEX ON DIANXIN(grid_id);
# 索引生效
select grid_id from dianxin where grid_id='117285031820040';
# 索引生效
select * from dianxin where grid_id='117285031820040';
全局索引与本地索引区别
-
全局索引:读多写少, 会单独建立索引表
-
本地索引:读少写多, 索引数据和原数据保存在同一台机器上
覆盖索引
覆盖索引是把原数据存储在索引数据表中,这样在查询时不需要再去HBase的原表获取数据就,直接返回查询结果。
注意:查询是 select 的列和 where 的列都需要在索引中出现。
# 创建覆盖索引
CREATE INDEX DIANXIN_INDEX_COVER ON DIANXIN ( x,y ) INCLUDE ( county );
# 查询所有列 (索引未生效)
select * from dianxin where x=117.288 and y =31.822;
# 强制使用索引 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ * from dianxin where x=117.288 and y =31.822;
# 查询索引中的列 (索引生效)
select mdn,x,y,county from dianxin where x=117.288 and y =31.822;
查询条件必须放在索引中 select 中的列可以放在INCLUDE (将数据保存在索引中)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ x,y,count(*) from dianxin group by x,y;
PhoenixJDBC
# 导入依赖
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.15.0-HBase-1.4</version>
</dependency>
Connection conn = DriverManager.getConnection("jdbc:phoenix:master,node1,node2:2181");
Statement stat = conn.createStatement();
ResultSet rs = stat.executeQuery("select * from dianxin limit 10");
while(rs.next()){
String mdn = rs.getString("mdn");
System.out.println(mdn);
}
stat.close();
conn.close();
HBase SQL与Hive SQL执行流程
HBase的MapReduce过程
#### 代码示例
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
public class Demo7MapReduceReadAndWriteHBase {
// 读取HBase中的students表,统计每个班级的人数,将结果写回HBase clazz_num表
public static class ReadHBaseMapper extends TableMapper<Text, IntWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
String rowkey = Bytes.toString(key.get());
String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));
context.write(new Text(clazz), new IntWritable(1));
}
}
public static class WriteHBaseReducer extends TableReducer<Text, IntWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : values) {
sum += i.get();
}
// create 'clazz_num','cf1'
Put put = new Put(key.toString().getBytes());
put.addColumn("cf1".getBytes(), "num".getBytes(), Bytes.toBytes(sum));
context.write(NullWritable.get(), put);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master,node1,node2:2181");
Job job = Job.getInstance(conf);
job.setJobName(Demo7MapReduceReadAndWriteHBase.class.getName());
job.setJarByClass(Demo7MapReduceReadAndWriteHBase.class);
// 配置map任务
// 使用HBase提供的TableMapReduceUtil工具进行配置
TableMapReduceUtil.initTableMapperJob(TableName.valueOf("students")
, new Scan()
, ReadHBaseMapper.class
, Text.class
, IntWritable.class
, job
);
// 配置reduce任务
// 使用HBase提供的TableMapReduceUtil工具进行配置
TableMapReduceUtil.initTableReducerJob("clazz_num", WriteHBaseReducer.class, job);
job.waitForCompletion(true);
/*
hadoop jar HBaseJavaAPI11-1.0-jar-with-dependencies.jar Demo7MapReduceReadAndWriteHBase
*/
}
}
Hive关联Hbase表
# Hive整合HBase
create external table students_hbase
(
id string,
name string,
age string,
gender string,
clazz string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" = "
:key,
info:name,
info:age,
info:gender,
info:clazz
")
tblproperties("hbase.table.name" = "default:students");
HBaseHA
//在node1,node2中启用备用HMaster
hbase-daemon.sh start master
HBase调优
Pre-Creating Regions(预分区)
- 默认情况下,在创建HBase表的时候会自动创建一个region分区,当导入数据的时候, 所有的HBase客户端都向这一个region写数据,直到这个region足够大了才进行切分。 一种可以加快批量写入速度的方法是通过预先创建一些空的regions,这样当数据写入 HBase时,会按照region分区情况,在集群内做数据的负载均衡。
- 如果知道hbase数据表的key的分布情况,就可以在建表的时候对hbase进行region的预分区。这样做的好处是防止大数据量插入的热点问题,提高数据插入的效率。
//在phoenix中
CREATE TABLE IF NOT EXISTS STUDENT (
id VARCHAR NOT NULL PRIMARY KEY,
name VARCHAR,
age BIGINT,
gender VARCHAR ,
clazz VARCHAR
)split on('15001006|','15001007|','15001008|') ;
//在hbase中
create 'split_table_test', 'cf', SPLITS_FILE => 'region_split_info.txt'
create 'split_table_test', 'cf', SPLITS => ['a','e','r']
Rowkey设计
原则:
- 唯一原则
- 长度原则(10-100bytes 定长)
- 散列原则
常用方式
反转
202133 -> 331202
202134 -> 431202
202135 -> 531202
202136 -> 631202
202137 -> 731202
202138 -> 831202
202139 -> 931202
202140 -> 041202
hash
md5 sha1
202133 -> 41DDBBCED55669818B2A40F4FED46F56
202134 -> 19D329403F02E2DA265CFC05D41FD253
202135 -> F6D06AEC4FB72A04F9CD4020BEF5E10F
202136 -> 0B512404B0411E623F64EC8981F8AE21
加上随机前缀
随机散列
第一次:202133 -> 41DDBBCED55669818B2A40F4FED46F56
第二次:202133 -> D55669818B2A40F4
第二次:202133 -> 02E2DA265CFC05D4
需求:查看某个时刻的数据
- 倒序、hash、 但是不能用随机散列
需求:将最新的数据放到最前面
- 大数减小数
- 通常数据里有时间戳
- 时间戳实际上是跟Long类型非常相似 一个很大的数
- Long.MAX_VALUE - 值
大数:300000
202137 -> 300000 - 202137 = 97863
202138 -> 300000 - 202138 = 97862
202139 -> 300000 - 202139 = 97861
202140 -> 300000 - 202140 = 97860
加盐
-
会在rowkey前面加上一个随机的前缀,
-
优点:不需要知道rowkey的分步情况
-
缺点:不能再hbase中对数据进行查询和修改
CREATE TABLE IF NOT EXISTS STUDENT (
id VARCHAR NOT NULL PRIMARY KEY,
name VARCHAR,
age BIGINT,
gender VARCHAR ,
clazz VARCHAR
)salt_buckets=6;
inmemory
创建表的时候,可以通过HColumnDescriptor.setInMemory(true)将表放到 RegionServer的缓存中,保证在读取的时候被cache命中。
maxversion
创建表的时候,可以通过HColumnDescriptor.setMaxVersions(int maxVersions)设置 表中数据的最大版本,如果只需要保存最新版本的数据,那么可以设置 setMaxVersions(1)。
建立索引超时,查询超时
修改配置文件,hbase-site.xml
两个位置
/usr/local/soft/phoenix-4.15.0/bin
/usr/local/soft/hbase-1.4.6/conf/ 所有节点
增加配置
<property>
<name>hbase.rpc.timeout</name>
<value>60000000</value>
</property>
<property>
<name>hbase.client.scanner.timeout.period</name>
<value>60000000</value>
</property>
<property>
<name>phoenix.query.timeoutMs</name>
<value>60000000</value>
</property>
需要重启hbase
Compact & Split
Minor Compaction:
- 指选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,在这个过程中不会处理已经Deleted或Expired的Cell。一次 Minor Compaction 的结果是更少并且更大的StoreFile。
Major Compaction:
- 指将所有的StoreFile合并成一个StoreFile,这个过程会清理三类没有意义的数据:被删除的数据、TTL过期数据、版本号超过设定版本号的数据。另外,一般情况下,major compaction时间会持续比较长,整个过程会消耗大量系统资源,对上层业务有比较大的影响。因此线上业务都会将关闭自动触发major compaction功能,改为手动在业务低峰期触发。
RegionSplit
- region中存储的是一张表的数据,当region中的数据条数过多的时候,会直接影响查询效率。当region过大的时候,region会被 拆分为两个region,HMaster会将分裂的region分配到不同的regionserver上,这样可以让请求分散到不同的RegionServer 上,已达到负载均衡 , 这也是Hbase的一个优点 。
ConstantSizeRegionSplitPolicy
-
0.94版本前,HBase region的默认切分策略
-
当region中最大的store大小超过某个阈值(hbase.hregion.max.filesize=10G)之后就会触发切分,一个region等分为2个region。但是在生产线上这种切分策略却有相当大的弊端(切分策略对于大表和小表没有明显的区分):
-
阈值(hbase.hregion.max.filesize)设置较大对大表比较友好,但是小表就有可能不会触发分裂,极端情况下可能就1个,形成热点,这对业务来说并不是什么好事。
-
如果设置较小则对小表友好,但一个大表就会在整个集群产生大量的region,这对于集群的管理、资源使用、failover来说都不是一件好事。
IncreasingToUpperBoundRegionSplitPolicy
-
0.94版本~2.0版本默认切分策略
-
总体看和ConstantSizeRegionSplitPolicy思路相同,一个region中最大的store大小大于设置阈值就会触发切分。
但是这个阈值并不像ConstantSizeRegionSplitPolicy是一个固定的值,而是会在一定条件下不断调整,调整规则和region所属表在当前regionserver上的region个数有关系.
region split阈值的计算公式是:
-
设regioncount:是region所属表在当前regionserver上的region的个数
-
阈值 = regioncount^3 * 128M * 2,当然阈值并不会无限增长,最大不超过MaxRegionFileSize(10G),当region中最大的store的大小达到该阈值的时候进行region split
例如:
- 第一次split阈值 = 1^3 * 256 = 256MB
- 第二次split阈值 = 2^3 * 256 = 2048MB
- 第三次split阈值 = 3^3 * 256 = 6912MB
- 第四次split阈值 = 4^3 * 256 = 16384MB > 10GB,因此取较小的值10GB
- 后面每次split的size都是10GB了
特点
- 相比ConstantSizeRegionSplitPolicy,可以自适应大表、小表;
- 在集群规模比较大的情况下,对大表的表现比较优秀
- 对小表不友好,小表可能产生大量的小region,分散在各regionserver上
- 小表达不到多次切分条件,导致每个split都很小,所以分散在各个regionServer上
SteppingSplitPolicy
-
2.0版本默认切分策略
-
相比 IncreasingToUpperBoundRegionSplitPolicy 简单了一些
region切分的阈值依然和待分裂region所属表在当前regionserver上的region个数有关系 -
如果region个数等于1,切分阈值为flush size 128M * 2
-
否则为MaxRegionFileSize。
-
这种切分策略对于大集群中的大表、小表会比 IncreasingToUpperBoundRegionSplitPolicy 更加友好,小表不会再产生大量的小region,而是适可而止。
KeyPrefixRegionSplitPolicy
根据rowKey的前缀对数据进行分区,这里是指定rowKey的前多少位作为前缀,比如rowKey都是16位的,指定前5位是前缀,那么前5位相同的rowKey在相同的region中。
DelimitedKeyPrefixRegionSplitPolicy
- 保证相同前缀的数据在同一个region中,例如rowKey的格式为:userid_eventtype_eventid,指定的delimiter为 _ ,则split的的时候会确保userid相同的数据在同一个region中。
- 按照分隔符进行切分,而KeyPrefixRegionSplitPolicy是按照指定位数切分。
BusyRegionSplitPolicy
-
按照一定的策略判断Region是不是Busy状态,如果是即进行切分
-
如果你的系统常常会出现热点Region,而你对性能有很高的追求,那么这种策略可能会比较适合你。它会通过拆分热点Region来缓解热点Region的压力,但是根据热点来拆分Region也会带来很多不确定性因素,因为你也不知道下一个被拆分的Region是哪个。
DisabledRegionSplitPolicy
不启用自动拆分, 需要指定手动拆分
HBase BulkLoading
优点:
-
如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。
-
它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。
限制:
-
仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。
-
HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群
代码
- 生成HFile部分
package com.shujia;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Demo10BulkLoading {
public static class BulkLoadingMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split(",");
String mdn = splits[0];
String start_time = splits[1];
// 经度
String longitude = splits[4];
// 维度
String latitude = splits[5];
String rowkey = mdn + "_" + start_time;
KeyValue lg = new KeyValue(rowkey.getBytes(), "info".getBytes(), "lg".getBytes(), longitude.getBytes());
KeyValue lt = new KeyValue(rowkey.getBytes(), "info".getBytes(), "lt".getBytes(), latitude.getBytes());
context.write(new ImmutableBytesWritable(rowkey.getBytes()), lg);
context.write(new ImmutableBytesWritable(rowkey.getBytes()), lt);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");
// 创建Job实例
Job job = Job.getInstance(conf);
job.setJarByClass(Demo10BulkLoading.class);
job.setJobName("Demo10BulkLoading");
// 保证全局有序
job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
// 设置reduce个数
job.setNumReduceTasks(4);
// 配置map任务
job.setMapperClass(BulkLoadingMapper.class);
// 配置reduce任务
// KeyValueSortReducer 保证在每个Reduce有序
job.setReducerClass(KeyValueSortReducer.class);
// 输入输出路径
FileInputFormat.addInputPath(job, new Path("/data/DIANXIN/"));
FileOutputFormat.setOutputPath(job, new Path("/data/hfile"));
// 创建HBase连接
Connection conn = ConnectionFactory.createConnection(conf);
// create 'dianxin_bulk','info'
// 获取dianxin_bulk 表
Table dianxin_bulk = conn.getTable(TableName.valueOf("dianxin_bulk"));
// 获取dianxin_bulk 表 region定位器
RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf("dianxin_bulk"));
// 使用HFileOutputFormat2将输出的数据按照HFile的形式格式化
HFileOutputFormat2.configureIncrementalLoad(job, dianxin_bulk, regionLocator);
// 等到MapReduce任务执行完成
job.waitForCompletion(true);
// 加载HFile到 dianxin_bulk 中
LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
load.doBulkLoad(new Path("/data/hfile"), conn.getAdmin(), dianxin_bulk, regionLocator);
/**
* create 'dianxin_bulk','info'
* hadoop jar HBaseJavaAPI10-1.0-jar-with-dependencies.jar com.shujia.Demo10BulkLoading
*/
}
}
说明
-
最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
-
最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。
-
MR例子中HFileOutputFormat2.configureIncrementalLoad(job, dianxin_bulk, regionLocator);自动对job进行配置。SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。
-
MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了,但不能直接使用mv命令移动,因为直接移动不能更新HBase的元数据。
-
HFile入库到HBase通过HBase中 LoadIncrementalHFiles的doBulkLoad方法,对生成的HFile文件入库