大数据架构-使用HBase和Solr将存储与索引放在不同的机器上
摘要HBase可以通过协处理器Coprocessor的方式向Solr发出请求Solr对于接收到的数据可以做相关的同步增、删、改索引的操作这样就可以同时使用HBase存储量大和Solr检索性能高的优点了更何况HBase和Solr都可以集群。这对海量数据存储、检索提供了一种方式将存储与索引放在不同的机器上是大数据架构的必须品。
关键词HBase, Solr, Coprocessor, 大数据, 架构
正如我的之前的博客“Solr与HBase架构设计”http://http://www.cnblogs.com/wgp13x/p/a8bb8ccd469c96917652201007ad3c50.html中所述HBase和Solr可以通过协处理器Coprocessor的方式向Solr发出请求Solr对于接收到的数据可以做相关的同步增、删、改索引的操作。将存储与索引放在不同的机器上这是大数据架构的必须品但目前还有很多不懂得此道的同学他们对于这种思想感到很新奇不过这绝对是好的方向所以不懂得抓紧学习吧。
有个朋友给我的那篇博客留言说CDH也可以做这样的事情我还没有试过他还问我要与此相关的代码于是我就稍微整理了一下作为本篇文章的主要内容。关于CDH的事我会尽快尝试有知道的同学可以给我留言。
下面我主要讲述一下我测试对HBase和Solr的性能时使用HBase协处理器向HBase添加数据所编写的相关代码及解释说明。
一、编写HBase协处理器Coprocessor
一旦有数据postPut就立即对Solr里相应的Core更新。这里使用了ConcurrentUpdateSolrServer它是Solr速率性能的保证使用它不要忘记在Solr里面配置autoCommit哟。
/*
*版权王安琪
*描述监视HBase一有数据postPut就向Solr发送本类要作为触发器添加到HBase
*修改时间2014-05-27
*修改内容新增
*/
package solrHbase.test;
import java.io.UnsupportedEncodingException;
import ***;
public class SorlIndexCoprocessorObserver extends BaseRegionObserver {
private static final Logger LOG = LoggerFactory
.getLogger(SorlIndexCoprocessorObserver.class);
private static final String solrUrl = “http://192.1.11.108:80/solr/core1″;
private static final SolrServer solrServer = new ConcurrentUpdateSolrServer(
solrUrl, 10000, 20);
/**
* 建立solr索引
*
* @throws UnsupportedEncodingException
*/
@Override
public void postPut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Put put, final WALEdit edit, final boolean writeToWAL)
throws UnsupportedEncodingException {
inputSolr(put);
}
public void inputSolr(Put put) {
try {
solrServer.add(TestSolrMain.getInputDoc(put));
} catch (Exception ex) {
LOG.error(ex.getMessage());
}
}
}
|
注意getInputDoc是这个HBase协处理器Coprocessor的精髓所在它可以把HBase内的Put里的内容转化成Solr需要的值。其中String fieldName = key.substring(key.indexOf(columnFamily) + 3, key.indexOf(“我在这“)).trim();这里有一个乱码字符在这里看不到请大家注意一下。
public static SolrInputDocument getInputDoc(Put put) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField(“test_ID”, Bytes.toString(put.getRow()));
for (KeyValue c : put.getFamilyMap().get(Bytes.toBytes(columnFamily))) {
String key = Bytes.toString(c.getKey());
String value = Bytes.toString(c.getValue());
if (value.isEmpty()) {
continue;
}
String fieldName = key.substring(key.indexOf(columnFamily) + 3,
key.indexOf(“”)).trim();
doc.addField(fieldName, value);
}
return doc;
} |
二、编写测试程序入口代码main
这段代码向HBase请求建了一张表并将模拟的数据向HBase连续地提交数据内容在HBase中不断地插入数据同时记录时间测试插入性能。
/*
*版权王安琪
*描述测试HBaseInsertHBase插入性能
*修改时间2014-05-27
*修改内容新增
*/
package solrHbase.test;
import hbaseInput.HbaseInsert;
import ***;
public class TestHBaseMain {
private static Configuration config;
private static String tableName = “angelHbase”;
private static HTable table = null;
private static final String columnFamily = “wanganqi”;
/**
* @param args
*/
public static void main(String[] args) {
config = HBaseConfiguration.create();
config.set(“hbase.zookeeper.quorum”, “192.103.101.104”);
HbaseInsert.createTable(config, tableName, columnFamily);
try {
table = new HTable(config, Bytes.toBytes(tableName));
for (int k = 0; k < 1; k++) {
Thread t = new Thread() {
public void run() {
for (int i = 0; i < 100000; i++) {
HbaseInsert.inputData(table,
PutCreater.createPuts(1000, columnFamily));
Calendar c = Calendar.getInstance();
String dateTime = c.get(Calendar.YEAR) + “-“
+ c.get(Calendar.MONTH) + “-“
+ c.get(Calendar.DATE) + “T”
+ c.get(Calendar.HOUR) + “:”
+ c.get(Calendar.MINUTE) + “:”
+ c.get(Calendar.SECOND) + “:”
+ c.get(Calendar.MILLISECOND) + “Z 写入: “
+ i * 1000;
System.out.println(dateTime);
}
}
};
t.start();
}
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
|
下面的是与HBase相关的操作把它封装到一个类中这里就只有建表与插入数据的相关代码。
/*
*版权王安琪
*描述与HBase相关操作建表与插入数据
*修改时间2014-05-27
*修改内容新增
*/
package hbaseInput;
import ***;
import org.apache.hadoop.hbase.client.Put;
public class HbaseInsert {
public static void createTable(Configuration config, String tableName,
String columnFamily) {
HBaseAdmin hBaseAdmin;
try {
hBaseAdmin = new HBaseAdmin(config);
if (hBaseAdmin.tableExists(tableName)) {
return;
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
tableDescriptor.addFamily(new HColumnDescriptor(columnFamily));
hBaseAdmin.createTable(tableDescriptor);
hBaseAdmin.close();
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void inputData(HTable table, ArrayList<Put> puts) {
try {
table.put(puts);
table.flushCommits();
puts.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
|
三、编写模拟数据Put
向HBase中写入数据需要构造Put下面是我构造模拟数据Put的方式有字符串的生成我是由mmseg提供的词典words.dic中随机读取一些词语连接起来生成一句字符串的下面的代码没有体现不过很easy你自己造你自己想要的数据就OK了。
public static Put createPut(String columnFamily) {
String ss = getSentence();
byte[] family = Bytes.toBytes(columnFamily);
byte[] rowKey = Bytes.toBytes(“” + Math.abs(r.nextLong()));
Put put = new Put(rowKey);
put.add(family, Bytes.toBytes(“DeviceID”),
Bytes.toBytes(“” + Math.abs(r.nextInt())));
******
put.add(family, Bytes.toBytes(“Company_mmsegsm“), Bytes.toBytes(“ss”));
return put;
} |
当然在运行上面这个程序之前需要先在Solr里面配置好你需要的列信息HBase、Solr安装与配置它们的基础使用方法将会在之后的文章中介绍。在这里Solr的列配置就跟你使用createPut生成的Put搞成一样的列名就行了当然也可以使用动态列的形式。
四、直接对Solr性能测试
如果你不想对HBase与Solr的相结合进行测试只想单独对Solr的性能进行测试这就更简单了完全可以利用上面的代码段来测试稍微组装一下就可以了。
private static void sendConcurrentUpdateSolrServer(final String url,
final int count) throws SolrServerException, IOException {
SolrServer solrServer = new ConcurrentUpdateSolrServer(url, 10000, 20);
for (int i = 0; i < count; i++) {
solrServer.add(getInputDoc(PutCreater.createPut(columnFamily)));
}
} |
希望可以帮助到你规格严格-功夫到家。这次的文章代码又偏多了点但代码是解释思想的最好的语言我的提倡就是尽可能的减少代码的注释尽力简化你的代码使你的代码足够的清晰易懂甚至于相似于伪代码了这也是《重构》这本书里所提倡的。