Java,python操作Hbase
python操作Hbase
由于Hbase是java开发的,所有如需要用python进行对Hbase的操作就需要借助Thrift等工具让语言透明化
安装Thrift之前所需准备
wget http://archive.apache.org/dist/thrift/0.8.0/thrift-0.8.0.tar.gz
tar xzf thrift-0.8.0.tar.gz
yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libeventdevel zlib-devel python-devel ruby-devel openssl-devel
yum install boost-devel.x86_64
yum install libevent-devel.x86_64
安装Thrift
进入Thrift解压目录
运行:
./configure --with-cpp=no --with-ruby=no
如图:运行:
make
运行:
make install
产生针对Python的Hbase的API
下载hbase源码:wget http://mirrors.hust.edu.cn/apache/hbase/0.98.24/hbase-0.98.24-src.tar.gz
进入源码目录并查找thrift对python的支持模块:
find . -name Hbase.thrift
,查找后地址为:./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
进入查找后的目录:
cd ./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/
运行命令:
thrift -gen py Hbase.thrift
,生成python对Hbase的模块
如图:进入
gen-py
目录,将hbase
目录拷贝到需要运行python脚本文件的同级目录中,命令:cp -raf gen-py/hbase/ /test/hbase_test
启动Thrift服务
命令:
hbase-daemon.sh start thrift
如图:检查端口是否被监听
命令:netstat -antup | grep 9090
执行python文件,对hbase进行操作
- 创建
create_table.py
文件,进行创建表操作
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
from hbase.ttypes import *
transport = TSocket.TSocket('master', 9090)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Hbase.Client(protocol)
transport.open()
#==============================
base_info_contents = ColumnDescriptor(name='meta-data:', maxVersions=1)
other_info_contents = ColumnDescriptor(name='flags:', maxVersions=1)
client.createTable('new_music_table', [base_info_contents, other_info_contents])
print client.getTableNames()
运行python文件,命令:python create_table.py
- 创建
insert_data.py
文件,进行插入数据操作
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
from hbase.ttypes import *
transport = TSocket.TSocket('master', 9090)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Hbase.Client(protocol)
transport.open()
tableName = 'new_music_table'
rowKey = '1100'
mutations = [Mutation(column="meta-data:name", value="wangqingshui"), \
Mutation(column="meta-data:tag", value="pop"), \
Mutation(column="flags:is_valid", value="TRUE")]
client.mutateRow(tableName, rowKey, mutations, None)
- 创建
get_one_line.py
文件,进行获取数据操作
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
from hbase.ttypes import *
transport = TSocket.TSocket('master', 9090)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Hbase.Client(protocol)
transport.open()
tableName = 'new_music_table'
rowKey = '1100'
result = client.getRow(tableName, rowKey, None)
for r in result:
print 'the row is ' , r.row
print 'the name is ' , r.columns.get('meta-data:name').value
print 'the flag is ' , r.columns.get('flags:is_valid').value
- 创建
scan_many_lines.py
文件,进行对hbase数据查询操作(扫描)
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
from hbase.ttypes import *
transport = TSocket.TSocket('master', 9090)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Hbase.Client(protocol)
transport.open()
tableName = 'new_music_table'
scan = TScan()
id = client.scannerOpenWithScan(tableName, scan, None)
result = client.scannerGetList(id, 10)
for r in result:
print '======'
print 'the row is ' , r.row
for k, v in r.columns.items():
print "\t".join([k, v.value])
模块存放位置
hbase >> python
以及thrift >> python
Java操作Hbase
向Hbase中写记录
package com.cxqy.baseoperation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
public class HbasePutOneRecord {
public static final String TableName = "user_action_table";
public static final String ColumnFamily = "action_log";
public static Configuration conf = HBaseConfiguration.create();
private static HTable table;
public static void addOneRecord(String tableName, String rowKey, String family, String qualifier, String value)
throws IOException {
table = new HTable(conf, tableName);
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
table.put(put);
System.out.println("insert record " + rowKey + " to table " + tableName + " success");
}
public static void main(String[] args) throws IOException {
conf.set("hbase.master", "192.168.87.200:60000");
conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202");
// TODO Auto-generated method stub
try {
addOneRecord(TableName, "ip=192.168.87.200-001", ColumnFamily, "ip", "192.168.87.101");
addOneRecord(TableName, "ip=192.168.87.200-001", ColumnFamily, "userid", "1100");
addOneRecord(TableName, "ip=192.168.87.200-002", ColumnFamily, "ip", "192.168.1.201");
addOneRecord(TableName, "ip=192.168.87.200-002", ColumnFamily, "userid", "1200");
addOneRecord(TableName, "ip=192.168.87.200-003", ColumnFamily, "ip", "192.168.3.201");
addOneRecord(TableName, "ip=192.168.87.200-003", ColumnFamily, "userid", "1300");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
从Hbase中读记录
package com.cxqy.baseoperation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseGetOneRecord {
public static final String TableName = "user_action_table";
public static final String ColumnFamily = "action_log";
public static Configuration conf = HBaseConfiguration.create();
private static HTable table;
public static void selectRowKey(String tablename, String rowKey) throws IOException {
table = new HTable(conf, tablename);
Get g = new Get(rowKey.getBytes());
Result rs = table.get(g);
System.out.println("==> " + new String(rs.getRow()));
for (Cell kv : rs.rawCells()) {
System.out.println("--------------------" + new String(kv.getRow()) + "----------------------------");
System.out.println("Column Family: " + new String(kv.getFamily()));
System.out.println("Column :" + new String(kv.getQualifier()));
System.out.println("value : " + new String(kv.getValue()));
}
}
public static void main(String[] args) throws IOException {
conf.set("hbase.master", "192.168.87.200:60000");
conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202");
// TODO Auto-generated method stub
try {
selectRowKey(TableName, "ip=192.168.87.200-003");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
在Hbase中删除某个记录
package com.cxqy.baseoperation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseDelOneRecord {
public static final String TableName = "user_action_table";
public static final String ColumnFamily = "action_log";
public static Configuration conf = HBaseConfiguration.create();
private static HTable table;
public static void delOneRecord(String tableName, String rowKey) throws IOException {
table = new HTable(conf, tableName);
List<Delete> list = new ArrayList<Delete>();
Delete delete = new Delete(rowKey.getBytes());
list.add(delete);
table.delete(list);
System.out.println("delete record " + rowKey + " success!");
}
public static void main(String[] args) throws IOException {
conf.set("hbase.master", "192.168.87.200:60000");
conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202");
// TODO Auto-generated method stub
try {
delOneRecord(TableName, "ip=192.168.87.200-002");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
从Hbase中批量读记录
package com.cxqy.baseoperation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseScanManyRecords {
public static final String TableName = "user_action_table";
public static final String ColumnFamily = "action_log";
public static Configuration conf = HBaseConfiguration.create();
private static HTable table;
public static void getManyRecords(String tableName) throws IOException {
table = new HTable(conf, tableName);
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
for (KeyValue kv : result.raw()) {
System.out.print(new String(kv.getRow()) + " ");
System.out.print(new String(kv.getFamily()) + ":");
System.out.print(new String(kv.getQualifier()) + " ");
System.out.print(kv.getTimestamp() + " ");
System.out.println(new String(kv.getValue()));
}
}
}
public static void getManyRecordsWithFilter(String tableName, String rowKey) throws IOException {
table = new HTable(conf, tableName);
Scan scan = new Scan();
// scan.setStartRow(Bytes.toBytes("ip=10.11.1.2-996"));
// scan.setStopRow(Bytes.toBytes("ip=10.11.1.2-997"));
Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(rowKey)));
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
for (KeyValue kv : result.raw()) {
System.out.print(new String(kv.getRow()) + " ");
System.out.print(new String(kv.getFamily()) + ":");
System.out.print(new String(kv.getQualifier()) + " ");
System.out.print(kv.getTimestamp() + " ");
System.out.println(new String(kv.getValue()));
}
}
}
public static void getManyRecordsWithFilter(String tableName, ArrayList<String> rowKeyList) throws IOException {
table = new HTable(conf, tableName);
Scan scan = new Scan();
List<Filter> filters = new ArrayList<Filter>();
for(int i = 0; i < rowKeyList.size(); i++) {
filters.add(new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(rowKeyList.get(i)))));
}
FilterList filerList = new FilterList(FilterList.Operator.MUST_PASS_ONE, filters);
scan.setFilter(filerList);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println("===============");
for (KeyValue kv : result.raw()) {
System.out.print(new String(kv.getRow()) + " ");
System.out.print(new String(kv.getFamily()) + ":");
System.out.print(new String(kv.getQualifier()) + " ");
System.out.print(kv.getTimestamp() + " ");
System.out.println(new String(kv.getValue()));
}
}
}
public static void main(String[] args) throws IOException {
conf.set("hbase.master", "192.168.159.30:60000");
conf.set("hbase.zookeeper.quorum", "192.168.159.30,192.168.159.31,192.168.159.32");
// TODO Auto-generated method stub
try {
// getManyRecords(TableName);
// getManyRecordsWithFilter(TableName, "ip=192.11.1.200-0");
ArrayList<String> whiteRowKeyList =new ArrayList<>();
whiteRowKeyList.add("ip=192.168.87.200-001");
whiteRowKeyList.add("ip=192.168.87.200-003");
getManyRecordsWithFilter(TableName, whiteRowKeyList);
//getManyRecords(TableName);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}