摘要: 大家在使用HBase和Solr搭建系统中经常遇到的一个问题就是:“我通过SOLR得到了RowKeys后,该怎样去HBase上取数据”。使用现有的Filter性能差劲,网上也没有现成的自定义Filter解决方案,我在这里把这个问题的解决办法分享给大家,抛砖引玉一下。先讲一下,虽然使用自定义过滤器来达到取数据的目的,但它其实并不是一个好的解决办法,因为它的性能是有问题的,具体分析还要看我的博客HBase 高性能获取数据 - 多线程批量式解决办法:http://www.cnblogs.com/wgp13x/p/4245182.html
Solr和HBase专辑
1、“关于Solr的使用总结的心得体会”(http://www.cnblogs.com/wgp13x/p/3742653.html)
2、“中文分词器性能比较”(http://www.cnblogs.com/wgp13x/p/3748764.html)
3、“Solr与HBase架构设计”(http://www.cnblogs.com/wgp13x/p/a8bb8ccd469c96917652201007ad3c50.html)
4、 “大数据架构: 使用HBase和Solr将存储与索引放在不同的机器上”(http://www.cnblogs.com/wgp13x/p/3927979.html)
5、“一个自定义 HBase Filter -通过RowKeys来高性能获取数据”(http://www.cnblogs.com/wgp13x/p/4196466.html)
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.util.Bytes;
/**
* @description 自定义过滤器,用来读取大量离散行
* @author 王安琪
* @time 2014年11月8日上午10:47:17
* @className RowKeyFilter
*/
public class RowKeyFilter extends FilterBase
{
private byte[] value = null;
private boolean filterRow = true;
/**
* map中存放需要读的行RowKey
*/
public Map<Object, Object> map = new HashMap<Object, Object>();
public RowKeyFilter()
{
super();
}
public RowKeyFilter(byte[] value)
{
this.value = value;
}
@Override
public ReturnCode filterKeyValue(KeyValue ignored)
{
if (this.filterRow == false)
return ReturnCode.INCLUDE;
else
return ReturnCode.NEXT_ROW;
}
/**
* 行过滤,查询该行RowKey是否在Map中
*/
@Override
public boolean filterRowKey(byte[] buffer, int offset, int length)
{
byte[] rowKey = Arrays.copyOfRange(buffer, offset, offset + length);
String str = new String(rowKey);
if (map.containsKey(str))
{ // 在0(1)时间内返回,效率较高
this.filterRow = false; // false表示包括这一行
}
return this.filterRow;
}
@Override
public void reset()
{
this.filterRow = true;
}
@Override
public boolean filterRow()
{
return filterRow;
}
/**
* 将Map中的数据以Byte[]形式传给服务器
*/
@Override
public void write(DataOutput dataOutput) throws IOException
{
Bytes.writeByteArray(dataOutput, this.value);
}
/**
* 服务器读取Byte[]数据,再将数据存储到Map中 不同的RowKey以","分割
*/
@Override
public void readFields(DataInput dataInput) throws IOException
{
this.value = Bytes.readByteArray(dataInput);
String string = new String(this.value);
String[] strs = string.split(",");
for (String str : strs)
{
map.put(str, str);
}
}
} |
/**
* 根据rowKeys获取数据
*
* @param rowKeys:每个rowkey之间使用逗号分隔符
* @param filterColumn:表示过滤的列,如果为空表示所有列的数据都返回
* @param isContiansRowkeys:设置为true,表示返回结果集中包含rowkeys;否则返回结果集中不包含rowkeys
* @return
*/
/* @Override */
public Datas getDatasFromHbase(String rowKeys, List<String> filterColumn,
boolean isContiansRowkeys)
{
Datas datas = new Datas();
HTableInterface hTableInterface = getTable(tableName);
Scan scan = new Scan();
if (filterColumn != null)
{
for (String column : filterColumn)
{
scan.addColumn(columnFamilyName.getBytes(), column.getBytes());
}
}
if (rowKeys != null && rowKeys.length() > 0)
{
RowKeyFilter rowKeyFilter = new RowKeyFilter(rowKeys.getBytes());
scan.setFilter(rowKeyFilter);
}
ResultScanner resultScanner = null;
List<Data> listData = new ArrayList<Data>();
try
{
resultScanner = hTableInterface.getScanner(scan);
for (Result result : resultScanner)
{
Data data = new Data();
if (isContiansRowkeys)
{
data.setRowkey(new String(result.getRow()));
}
Map<String, String> map = new HashMap<String, String>();
List<String> content = new ArrayList<String>();
String[] temp = null;
if (filterColumn != null)
{
temp = new String[filterColumn.size()];
}
for (KeyValue keyValue : result.raw())
{
if (filterColumn == null)
{
content.add(new String(keyValue.getValue()));
}
else if (filterColumn != null)
{
String qualifier = new String(keyValue.getQualifier());
String value = new String(keyValue.getValue());
if (filterColumn.contains(qualifier))
{
int index = filterColumn.indexOf(qualifier);
temp[index] = value;
}
}
}
if (temp != null)
{
for (int i = 0; i < temp.length; i++)
{
content.add(temp[i]);
}
}
data.setContent(content);
listData.add(data);
}
datas.setDatas(listData);
}
catch (IOException e)
{
e.printStackTrace();
}
finally
{
resultScanner.close();
try
{
hTableInterface.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
return datas;
} |