springboot集成Hbase

前提:hbase版本是1.2.6,详情看windows安装hbase_Fullmark0608-****博客

1、添加maven

  <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.2.3</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

2、添加配置

# hbase 数据库配置
hbase.config.hbase.zookeeper.quorum=127.0.0.1
hbase.config.hbase.zookeeper.property.clientPort=2181

3、编写代码

@ConfigurationProperties(prefix = "hbase")
public class HbaseProperties {
    private Map<String, String> config;

    public Map<String, String> getConfig() {
        return config;
    }

    public void setConfig(Map<String, String> config) {
        this.config = config;
    }
}
@EnableConfigurationProperties(HbaseProperties.class)
@org.springframework.context.annotation.Configuration
public class HbaseConfig {
    private final HbaseProperties prop;

    public HbaseConfig(HbaseProperties properties) {
        this.prop = properties;
    }

    @Bean
    public Configuration configuration() {
        Configuration configuration = HBaseConfiguration.create();
        Map<String, String> config = prop.getConfig();
        config.forEach(configuration::set);
        return configuration;
    }

    @Bean
    public Connection getConnection() throws IOException {
        return ConnectionFactory.createConnection(configuration());
    }
}
package com.catt.service;

import com.catt.config.HbaseConfig;
import com.catt.tools.system.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.*;

/**
 * @author fanghuanbiao
 */
@Service
@Slf4j
public class HbaseService {
    @Autowired
    private HbaseConfig config;

    private static Connection connection = null;
    private static Admin admin = null;

    @PostConstruct
    private void init() {
        if (connection != null) {
            return;
        }
        try {
            connection = ConnectionFactory.createConnection(config.configuration());
            admin = connection.getAdmin();
        } catch (IOException e) {
            log.error("HBase create connection failed:", e);
        }
    }

    /**
     * create 'tableName','[Column Family 1]','[Column Family 2]'
     *
     * @param tableName      表名
     * @param columnFamilies 列族名
     * @throws IOException 异常
     */
    public void createTable(String tableName, String... columnFamilies) throws IOException {
        TableName name = TableName.valueOf(tableName);
        boolean isExists = this.tableExists(tableName);
        if (isExists) {
            throw new TableExistsException(tableName + "is exists!");
        }
        TableDescriptorBuilder descriptorBuilder = TableDescriptorBuilder.newBuilder(name);
        List<ColumnFamilyDescriptor> columnFamilyList = new ArrayList<>();
        for (String columnFamily : columnFamilies) {
            ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
                    .newBuilder(columnFamily.getBytes()).build();
            columnFamilyList.add(columnFamilyDescriptor);
        }
        descriptorBuilder.setColumnFamilies(columnFamilyList);
        TableDescriptor tableDescriptor = descriptorBuilder.build();
        admin.createTable(tableDescriptor);
    }

    /**
     * put <tableName>,<rowKey>,<family:column>,<value>,<timestamp>
     *
     * @param tableName    表名
     * @param rowKey       行主键
     * @param columnFamily 列族
     * @param column       列
     * @param value        值
     * @throws IOException
     */
    public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String column, String value)
            throws IOException {
        this.insertOrUpdate(tableName, rowKey, columnFamily, new String[]{column}, new String[]{value});
    }

    /**
     * put <tableName>,<rowKey>,<family:column>,<value>,<timestamp>
     *
     * @param tableName    表名
     * @param rowKey       行主键
     * @param columnFamily 列族
     * @param columns      多个列
     * @param values       多个值
     * @throws IOException
     */
    public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String[] columns, String[] values)
            throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        for (int i = 0; i < columns.length; i++) {
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
            table.put(put);
        }
    }

    /**
     * @param tableName 表名
     * @param rowKey    行主键
     * @throws IOException 异常
     */
    public void deleteRow(String tableName, String rowKey) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(rowKey.getBytes());
        table.delete(delete);
    }

    /**
     * @param tableName    表名
     * @param rowKey       行主键
     * @param columnFamily 列族
     * @throws IOException 异常
     */
    public void deleteColumnFamily(String tableName, String rowKey, String columnFamily) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(rowKey.getBytes());
        delete.addFamily(Bytes.toBytes(columnFamily));
        table.delete(delete);
    }

    /**
     * delete 'tableName','rowKey','columnFamily:column'
     *
     * @param tableName    表名
     * @param rowKey       行主键
     * @param columnFamily 列族
     * @param column       列
     * @throws IOException 异常
     */
    public void deleteColumn(String tableName, String rowKey, String columnFamily, String column) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(rowKey.getBytes());
        delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
        table.delete(delete);
    }

    /**
     * disable 'tableName' 之后 drop 'tableName'
     *
     * @param tableName 表名
     * @throws IOException 异常
     */
    public void deleteTable(String tableName) throws IOException {
        boolean isExists = this.tableExists(tableName);
        if (!isExists) {
            return;
        }
        TableName name = TableName.valueOf(tableName);
        admin.disableTable(name);
        admin.deleteTable(name);
    }

    /**
     * get 'tableName','rowkey','family:column'
     *
     * @param tableName 表名
     * @param rowkey    行主键
     * @param family    列族
     * @param column    列
     * @return
     */
    public String getValue(String tableName, String rowkey, String family, String column) {
        Table table = null;
        String value = "";
        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(family) || StringUtils.isBlank(rowkey) || StringUtils
                .isBlank(column)) {
            return null;
        }
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            Get g = new Get(rowkey.getBytes());
            g.addColumn(family.getBytes(), column.getBytes());
            Result result = table.get(g);
            List<Cell> ceList = result.listCells();
            if (ceList != null && ceList.size() > 0) {
                for (Cell cell : ceList) {
                    value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return value;
    }

    /**
     * get 'tableName','rowKey'
     *
     * @param tableName 表名
     * @param rowKey    行主键
     * @return 行值
     * @throws IOException 异常
     */
    public String selectOneRow(String tableName, String rowKey) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(rowKey.getBytes());
        Result result = table.get(get);
        NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = result.getMap();
        for (Cell cell : result.rawCells()) {
            String row = Bytes.toString(cell.getRowArray());
            String columnFamily = Bytes.toString(cell.getFamilyArray());
            String column = Bytes.toString(cell.getQualifierArray());
            String value = Bytes.toString(cell.getValueArray());
            // 可以通过反射封装成对象(列名和Java属性保持一致)
            System.out.println(row);
            System.out.println(columnFamily);
            System.out.println(column);
            System.out.println(value);
        }
        return null;
    }

    /**
     * scan 't1',{FILTER=>"PrefixFilter('2015')"}
     *
     * @param tableName    表名
     * @param rowKeyFilter 行主键前缀
     * @return 主键前缀的所有行
     * @throws IOException 异常
     */
    public String scanTable(String tableName, String rowKeyFilter) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        if (!StringUtils.isEmpty(rowKeyFilter)) {
            RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator(rowKeyFilter));
            scan.setFilter(rowFilter);
        }
        ResultScanner scanner = table.getScanner(scan);
        try {
            for (Result result : scanner) {
                System.out.println(Bytes.toString(result.getRow()));
                for (Cell cell : result.rawCells()) {
                    System.out.println(cell);
                }
            }
        } finally {
            if (scanner != null) {
                scanner.close();
            }
        }
        return null;
    }


    /**
     * 判断表是否已经存在,这里使用间接的方式来实现
     *
     * @param tableName 表名
     * @return 真or假
     * @throws IOException 异常
     */
    public boolean tableExists(String tableName) throws IOException {
        TableName[] tableNames = admin.listTableNames();
        if (tableNames != null && tableNames.length > 0) {
            for (int i = 0; i < tableNames.length; i++) {
                if (tableName.equals(tableNames[i].getNameAsString())) {
                    return true;
                }
            }
        }
        return false;
    }
}

4、测试

示例:Student 数据表
行键 列族 StuInfo 列族 Grades 时间戳
Name Age Sex Class BigData Computer Math
0001 Tom Green 18 Male 80 90 85 T2
0002 Amy 19 01 95 89 T1
0003 Allen 19 Male 02 90 88 T1
public static void main(String[] args) {
        ConfigurableApplicationContext context = new SpringApplicationBuilder(HbaseFzpGxydApplication.class)
                .run(args);
        HbaseService hbaseService = (HbaseService) context.getBean("hbaseService");
        try {
            // hbaseService.createTable("Student", "StuInfo", "Grades");
            // hbaseService.insertOrUpdate("Student", "0001", "StuInfo", "name", "Tom Green");
            // hbaseService.insertOrUpdate("Student", "0001", "StuInfo", "Age", "18");
            System.out.println("=================" + hbaseService.getValue("Student", "0001", "StuInfo", "name"));
            System.out.println("=================" + hbaseService.getValue("Student", "0001", "StuInfo", "Age"));
            System.out.println("=================" + hbaseService.selectOneRow("Student", "0001"));
            System.out.println("=================" + hbaseService.selectOneRow("Student", "0002"));

        } catch (Exception e) {
            e.printStackTrace();
        }
        context.close();
    }

上一篇:java异常分类和处理


下一篇:使用java自定义生成二维码