前提:hbase版本是1.2.6,详情看windows安装hbase_Fullmark0608-****博客
1、添加maven
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.3</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
2、添加配置
# hbase 数据库配置
hbase.config.hbase.zookeeper.quorum=127.0.0.1
hbase.config.hbase.zookeeper.property.clientPort=2181
3、编写代码
@ConfigurationProperties(prefix = "hbase")
public class HbaseProperties {
private Map<String, String> config;
public Map<String, String> getConfig() {
return config;
}
public void setConfig(Map<String, String> config) {
this.config = config;
}
}
@EnableConfigurationProperties(HbaseProperties.class)
@org.springframework.context.annotation.Configuration
public class HbaseConfig {
private final HbaseProperties prop;
public HbaseConfig(HbaseProperties properties) {
this.prop = properties;
}
@Bean
public Configuration configuration() {
Configuration configuration = HBaseConfiguration.create();
Map<String, String> config = prop.getConfig();
config.forEach(configuration::set);
return configuration;
}
@Bean
public Connection getConnection() throws IOException {
return ConnectionFactory.createConnection(configuration());
}
}
package com.catt.service;
import com.catt.config.HbaseConfig;
import com.catt.tools.system.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.*;
/**
* @author fanghuanbiao
*/
@Service
@Slf4j
public class HbaseService {
@Autowired
private HbaseConfig config;
private static Connection connection = null;
private static Admin admin = null;
@PostConstruct
private void init() {
if (connection != null) {
return;
}
try {
connection = ConnectionFactory.createConnection(config.configuration());
admin = connection.getAdmin();
} catch (IOException e) {
log.error("HBase create connection failed:", e);
}
}
/**
* create 'tableName','[Column Family 1]','[Column Family 2]'
*
* @param tableName 表名
* @param columnFamilies 列族名
* @throws IOException 异常
*/
public void createTable(String tableName, String... columnFamilies) throws IOException {
TableName name = TableName.valueOf(tableName);
boolean isExists = this.tableExists(tableName);
if (isExists) {
throw new TableExistsException(tableName + "is exists!");
}
TableDescriptorBuilder descriptorBuilder = TableDescriptorBuilder.newBuilder(name);
List<ColumnFamilyDescriptor> columnFamilyList = new ArrayList<>();
for (String columnFamily : columnFamilies) {
ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
.newBuilder(columnFamily.getBytes()).build();
columnFamilyList.add(columnFamilyDescriptor);
}
descriptorBuilder.setColumnFamilies(columnFamilyList);
TableDescriptor tableDescriptor = descriptorBuilder.build();
admin.createTable(tableDescriptor);
}
/**
* put <tableName>,<rowKey>,<family:column>,<value>,<timestamp>
*
* @param tableName 表名
* @param rowKey 行主键
* @param columnFamily 列族
* @param column 列
* @param value 值
* @throws IOException
*/
public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String column, String value)
throws IOException {
this.insertOrUpdate(tableName, rowKey, columnFamily, new String[]{column}, new String[]{value});
}
/**
* put <tableName>,<rowKey>,<family:column>,<value>,<timestamp>
*
* @param tableName 表名
* @param rowKey 行主键
* @param columnFamily 列族
* @param columns 多个列
* @param values 多个值
* @throws IOException
*/
public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String[] columns, String[] values)
throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
for (int i = 0; i < columns.length; i++) {
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
table.put(put);
}
}
/**
* @param tableName 表名
* @param rowKey 行主键
* @throws IOException 异常
*/
public void deleteRow(String tableName, String rowKey) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(rowKey.getBytes());
table.delete(delete);
}
/**
* @param tableName 表名
* @param rowKey 行主键
* @param columnFamily 列族
* @throws IOException 异常
*/
public void deleteColumnFamily(String tableName, String rowKey, String columnFamily) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(rowKey.getBytes());
delete.addFamily(Bytes.toBytes(columnFamily));
table.delete(delete);
}
/**
* delete 'tableName','rowKey','columnFamily:column'
*
* @param tableName 表名
* @param rowKey 行主键
* @param columnFamily 列族
* @param column 列
* @throws IOException 异常
*/
public void deleteColumn(String tableName, String rowKey, String columnFamily, String column) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(rowKey.getBytes());
delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
table.delete(delete);
}
/**
* disable 'tableName' 之后 drop 'tableName'
*
* @param tableName 表名
* @throws IOException 异常
*/
public void deleteTable(String tableName) throws IOException {
boolean isExists = this.tableExists(tableName);
if (!isExists) {
return;
}
TableName name = TableName.valueOf(tableName);
admin.disableTable(name);
admin.deleteTable(name);
}
/**
* get 'tableName','rowkey','family:column'
*
* @param tableName 表名
* @param rowkey 行主键
* @param family 列族
* @param column 列
* @return
*/
public String getValue(String tableName, String rowkey, String family, String column) {
Table table = null;
String value = "";
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(family) || StringUtils.isBlank(rowkey) || StringUtils
.isBlank(column)) {
return null;
}
try {
table = connection.getTable(TableName.valueOf(tableName));
Get g = new Get(rowkey.getBytes());
g.addColumn(family.getBytes(), column.getBytes());
Result result = table.get(g);
List<Cell> ceList = result.listCells();
if (ceList != null && ceList.size() > 0) {
for (Cell cell : ceList) {
value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return value;
}
/**
* get 'tableName','rowKey'
*
* @param tableName 表名
* @param rowKey 行主键
* @return 行值
* @throws IOException 异常
*/
public String selectOneRow(String tableName, String rowKey) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(rowKey.getBytes());
Result result = table.get(get);
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = result.getMap();
for (Cell cell : result.rawCells()) {
String row = Bytes.toString(cell.getRowArray());
String columnFamily = Bytes.toString(cell.getFamilyArray());
String column = Bytes.toString(cell.getQualifierArray());
String value = Bytes.toString(cell.getValueArray());
// 可以通过反射封装成对象(列名和Java属性保持一致)
System.out.println(row);
System.out.println(columnFamily);
System.out.println(column);
System.out.println(value);
}
return null;
}
/**
* scan 't1',{FILTER=>"PrefixFilter('2015')"}
*
* @param tableName 表名
* @param rowKeyFilter 行主键前缀
* @return 主键前缀的所有行
* @throws IOException 异常
*/
public String scanTable(String tableName, String rowKeyFilter) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
if (!StringUtils.isEmpty(rowKeyFilter)) {
RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator(rowKeyFilter));
scan.setFilter(rowFilter);
}
ResultScanner scanner = table.getScanner(scan);
try {
for (Result result : scanner) {
System.out.println(Bytes.toString(result.getRow()));
for (Cell cell : result.rawCells()) {
System.out.println(cell);
}
}
} finally {
if (scanner != null) {
scanner.close();
}
}
return null;
}
/**
* 判断表是否已经存在,这里使用间接的方式来实现
*
* @param tableName 表名
* @return 真or假
* @throws IOException 异常
*/
public boolean tableExists(String tableName) throws IOException {
TableName[] tableNames = admin.listTableNames();
if (tableNames != null && tableNames.length > 0) {
for (int i = 0; i < tableNames.length; i++) {
if (tableName.equals(tableNames[i].getNameAsString())) {
return true;
}
}
}
return false;
}
}
4、测试
行键 | 列族 StuInfo | 列族 Grades | 时间戳 | |||||
---|---|---|---|---|---|---|---|---|
Name | Age | Sex | Class | BigData | Computer | Math | ||
0001 | Tom Green | 18 | Male | 80 | 90 | 85 | T2 | |
0002 | Amy | 19 | 01 | 95 | 89 | T1 | ||
0003 | Allen | 19 | Male | 02 | 90 | 88 | T1 |
public static void main(String[] args) {
ConfigurableApplicationContext context = new SpringApplicationBuilder(HbaseFzpGxydApplication.class)
.run(args);
HbaseService hbaseService = (HbaseService) context.getBean("hbaseService");
try {
// hbaseService.createTable("Student", "StuInfo", "Grades");
// hbaseService.insertOrUpdate("Student", "0001", "StuInfo", "name", "Tom Green");
// hbaseService.insertOrUpdate("Student", "0001", "StuInfo", "Age", "18");
System.out.println("=================" + hbaseService.getValue("Student", "0001", "StuInfo", "name"));
System.out.println("=================" + hbaseService.getValue("Student", "0001", "StuInfo", "Age"));
System.out.println("=================" + hbaseService.selectOneRow("Student", "0001"));
System.out.println("=================" + hbaseService.selectOneRow("Student", "0002"));
} catch (Exception e) {
e.printStackTrace();
}
context.close();
}