solr源码解读(转)

solr源码解读(转)原文地址:http://blog.csdn.net/duck_genuine/article/details/6962624

配置

solr 对一个搜索请求的的流程

在solrconfig.xml会配置一个handler。配置了前置处理组件preParams,还有后置处理组件filterResult,当然还有默认的组件

  1. <requestHandler name="standard" class="solr.SearchHandler" default="true">
  2. <arr name="first-components">
  3. <str>preParams</str>
  4. </arr>
  5. <lst name="defaults">
  6. <str name="echoParams">explicit</str>
  7. <int name="rows">10</int>
  8. <int name="start">0</int>
  9. <str name="q">*:*</str>
  10. </lst>
  11. <arr name="last-components">
  12. <str>filterResult</str>
  13. </arr>
  14. </requestHandler>

http请求控制器

当一个查询请求过来的时候,先到类SolrDispatchFilter,由这个分发器寻找对应的handler来处理。

  1. String qt = solrReq.getParams().get( CommonParams.QT );
  2. handler = core.getRequestHandler( qt );

---------------------------------------------------------------------------------------------------

  1. this.execute( req, handler, solrReq, solrRsp );
  2. HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);

-----------------------------------------------------------------------------------------------

从上面的代码里看出是由solrCore留下的接口来处理请求。从代码框架上,从此刻开始进入solr的核心代码。

  1. protected void execute( HttpServletRequest req, SolrRequestHandler handler, SolrQueryRequest sreq, SolrQueryResponse rsp) {
  2. sreq.getContext().put( "webapp", req.getContextPath() );
  3. sreq.getCore().execute( handler, sreq, rsp );
  4. }

看一下solrCore代码execute的方法 的主要代码

  1. public void execute(SolrRequestHandler handler, SolrQueryRequest req, SolrQueryResponse rsp) {
  2. 。。。。。
  3. handler.handleRequest(req,rsp);
  4. setResponseHeaderValues(handler,req,rsp);
  5. 。。。。。。。
  6. }

主要实现对请求的处理,并将请求结果的状态信息写到响应的头部

SolrRequestHandler 处理器

再看一下对请求的处理。。先看定义该请求处理器的接口,可以更好理解。只有两个方法,一个是初始化信息,主要是配置时的默认参数,另一个就是处理请求的接口。

  1. public interface SolrRequestHandler extends SolrInfoMBean {
  2. public void init(NamedList args);
  3. public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp);
  4. }

先看一下实现该接口的类RequestHandlerBase

  1. public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {
  2. numRequests++;
  3. try {
  4. SolrPluginUtils.setDefaults(req,defaults,appends,invariants);
  5. rsp.setHttpCaching(httpCaching);
  6. handleRequestBody( req, rsp );
  7. // count timeouts
  8. NamedList header = rsp.getResponseHeader();
  9. if(header != null) {
  10. Object partialResults = header.get("partialResults");
  11. boolean timedOut = partialResults == null ? false : (Boolean)partialResults;
  12. if( timedOut ) {
  13. numTimeouts++;
  14. rsp.setHttpCaching(false);
  15. }
  16. }
  17. } catch (Exception e) {
  18. SolrException.log(SolrCore.log,e);
  19. if (e instanceof ParseException) {
  20. e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
  21. }
  22. rsp.setException(e);
  23. numErrors++;
  24. }
  25. totalTime += rsp.getEndTime() - req.getStartTime();
  26. }

主要记录该请求处理的状态与处理时间记录。真正的实现方法交由各个子类      handleRequestBody( req, rsp );

现在看一下SearchHandler对于搜索处理的实现方法

首先是将solrconfig.xml上配置的各个处理组件按一定顺序组装起来,先是first-Component,默认的component,last-component.这些处理组件会按照它们的顺序来执行,以下是searchHandler的实现主体。方法handleRequestBody

  1. @Override
  2. public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception, ParseException, InstantiationException, IllegalAccessException
  3. {
  4. // int sleep = req.getParams().getInt("sleep",0);
  5. // if (sleep > 0) {log.error("SLEEPING for " + sleep);  Thread.sleep(sleep);}
  6. ResponseBuilder rb = new ResponseBuilder();
  7. rb.req = req;
  8. rb.rsp = rsp;
  9. rb.components = components;
  10. rb.setDebug(req.getParams().getBool(CommonParams.DEBUG_QUERY, false));
  11. final RTimer timer = rb.isDebug() ? new RTimer() : null;
  12. if (timer == null) {
  13. // non-debugging prepare phase
  14. for( SearchComponent c : components ) {
  15. c.prepare(rb);
  16. }
  17. } else {
  18. // debugging prepare phase
  19. RTimer subt = timer.sub( "prepare" );
  20. for( SearchComponent c : components ) {
  21. rb.setTimer( subt.sub( c.getName() ) );
  22. c.prepare(rb);
  23. rb.getTimer().stop();
  24. }
  25. subt.stop()<span style="color:#FF0000;">;</span>
  26. }
  27. //单机版
  28. if (rb.shards == null) {
  29. // a normal non-distributed request
  30. // The semantics of debugging vs not debugging are different enough that
  31. // it makes sense to have two control loops
  32. if(!rb.isDebug()) {
  33. // Process
  34. for( SearchComponent c : components ) {
  35. c.process(rb);
  36. }
  37. }
  38. else {
  39. // Process
  40. RTimer subt = timer.sub( "process" );
  41. for( SearchComponent c : components ) {
  42. rb.setTimer( subt.sub( c.getName() ) );
  43. c.process(rb);
  44. rb.getTimer().stop();
  45. }
  46. subt.stop();
  47. timer.stop();
  48. // add the timing info
  49. if( rb.getDebugInfo() == null ) {
  50. rb.setDebugInfo( new SimpleOrderedMap<Object>() );
  51. }
  52. rb.getDebugInfo().add( "timing", timer.asNamedList() );
  53. }
  54. } else {//分布式请求
  55. // a distributed request
  56. HttpCommComponent comm = new HttpCommComponent();
  57. if (rb.outgoing == null) {
  58. rb.outgoing = new LinkedList<ShardRequest>();
  59. }
  60. rb.finished = new ArrayList<ShardRequest>();
  61. //起始状态为0,结束状态为整数的最大值
  62. int nextStage = 0;
  63. do {
  64. rb.stage = nextStage;
  65. nextStage = ResponseBuilder.STAGE_DONE;
  66. // call all components
  67. for( SearchComponent c : components ) {
  68. //得到所有组件运行后返回的下一个状态,并取最小值
  69. nextStage = Math.min(nextStage, c.distributedProcess(rb));
  70. }
  71. // 如果有需要向子机发送请求
  72. while (rb.outgoing.size() > 0) {
  73. // submit all current request tasks at once
  74. while (rb.outgoing.size() > 0) {
  75. ShardRequest sreq = rb.outgoing.remove(0);
  76. sreq.actualShards = sreq.shards;
  77. if (sreq.actualShards==ShardRequest.ALL_SHARDS) {
  78. sreq.actualShards = rb.shards;
  79. }
  80. sreq.responses = new ArrayList<ShardResponse>();
  81. // 向各个子机发送请求
  82. for (String shard : sreq.actualShards) {
  83. ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
  84. params.remove(ShardParams.SHARDS);      // not a top-level request
  85. params.remove("indent");
  86. params.remove(CommonParams.HEADER_ECHO_PARAMS);
  87. params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request
  88. String shardHandler = req.getParams().get(ShardParams.SHARDS_QT);
  89. if (shardHandler == null) {
  90. params.remove(CommonParams.QT);
  91. } else {
  92. params.set(CommonParams.QT, shardHandler);
  93. }
  94. //提交子请求
  95. comm.submit(sreq, shard, params);
  96. }
  97. }
  98. // now wait for replies, but if anyone puts more requests on
  99. // the outgoing queue, send them out immediately (by exiting
  100. // this loop)
  101. while (rb.outgoing.size() == 0) {
  102. ShardResponse srsp = comm.takeCompletedOrError();
  103. if (srsp == null) break;  // no more requests to wait for
  104. // Was there an exception?  If so, abort everything and
  105. // rethrow
  106. if (srsp.getException() != null) {
  107. comm.cancelAll();
  108. if (srsp.getException() instanceof SolrException) {
  109. throw (SolrException)srsp.getException();
  110. } else {
  111. throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException());
  112. }
  113. }
  114. rb.finished.add(srsp.getShardRequest());
  115. //每个组件都对于返回的数据处理
  116. for(SearchComponent c : components) {
  117. c.handleResponses(rb, srsp.getShardRequest());
  118. }
  119. }
  120. }//请求队列结束
  121. //再对该轮请求进行收尾工作
  122. for(SearchComponent c : components) {
  123. c.finishStage(rb);
  124. }
  125. //如果状态未到结束,则继续循环
  126. } while (nextStage != Integer.MAX_VALUE);
  127. }
  128. }

首先运行的是各个组件的方法prepare

  1. for( SearchComponent c : components ) {
  2. c.prepare(rb);
  3. }

再则如果不是分布式搜索,则比较简单的运行

  1. for( SearchComponent c : components ) {
  2. c.process(rb);
  3. }

就结束!

如果是分布式搜索,过程会比较复杂些,对于每个组件处理都会返回一个状态,对于以下几个方法循环执行,直到状态结束 。

在类ResponseBuilder定义了几个状态。

  1. public static int STAGE_START           = 0;
  2. public static int STAGE_PARSE_QUERY     = 1000;
  3. public static int STAGE_EXECUTE_QUERY   = 2000;
  4. public static int STAGE_GET_FIELDS      = 3000;
  5. public static int STAGE_DONE            = Integer.MAX_VALUE;

从STAGE_START---->STAGE_PARSE_QUERY------>STAGE_EXECUTE_QUERY--------------->STAGE_GET_FIELDS------------>STAGE_DONE

从这些状态名称可以猜得出整个对应的过程。

每个组件先调用方法distributeProcess,并返回下一个状态

  1. for( SearchComponent c : components ) {
  2. // the next stage is the minimum of what all components report
  3. nextStage = Math.min(nextStage, c.distributedProcess(rb));
  4. }

而方法handleResponse主要处理返回来的数据

  1. for(SearchComponent c : components) {
  2. c.handleResponses(rb, srsp.getShardRequest());
  3. }

然后交由finishStage方法来对每一个状态的过程作结束动作。

------------------------------

  1. for(SearchComponent c : components) {
  2. c.finishStage(rb);
  3. }

-----------------------------

了解这个流程有助于扩展solr。比如有个业务是要我对搜索的自然结果排序进行干预,而这个干预只针对前几页结果,所以我不得不做个组件来对其中结果进行处理。

所以我想可以添加一个组件放在最后-------------》

1)如果是分布式搜索:

这个组件可以在重写finsihStage做处理。算是对最终结果的排序处理即可。

2)如果只是单机:

这个组件可以在重写process做处理

组件

现在看一下其中一个主要的组件QueryComponent

prepare

对于QueryComponent主要解析用户传送的语法解析参数defType,以及过滤查询fq,返回字段集fl.排序字段Sort

单机处理

process

分布式搜索过程中的某一步,这里应该是主机要合并文档,取出对应的文档的过程,

主机发出指定的solr主键ids来取文档集,首先取出对应的lucene的内部id集。如果某些文档已不在则弃掉。

  1. String ids = params.get(ShardParams.IDS);
  2. if (ids != null) {//将传过来的ids,放进结果集中,并在后面取出对应的结果文档
  3. SchemaField idField = req.getSchema().getUniqueKeyField();
  4. List<String> idArr = StrUtils.splitSmart(ids, ",", true);
  5. int[] luceneIds = new int[idArr.size()];
  6. int docs = 0;
  7. for (int i=0; i<idArr.size(); i++) {
  8. //solr主键id对应的文档lucene内部的id
  9. int id = req.getSearcher().getFirstMatch(
  10. new Term(idField.getName(), idField.getType().toInternal(idArr.get(i))));
  11. if (id >= 0)
  12. luceneIds[docs++] = id;
  13. }
  14. DocListAndSet res = new DocListAndSet();
  15. //这里并没有传入scores[]
  16. res.docList = new DocSlice(0, docs, luceneIds, null, docs, 0);
  17. //需要另一种doc集合处理。
  18. if (rb.isNeedDocSet()) {
  19. List<Query> queries = new ArrayList<Query>();
  20. queries.add(rb.getQuery());
  21. List<Query> filters = rb.getFilters();
  22. if (filters != null)
  23. queries.addAll(filters);
  24. res.docSet = searcher.getDocSet(queries);
  25. }
  26. rb.setResults(res);
  27. rsp.add("response",rb.getResults().docList);
  28. return;
  29. }
    1. <pre name="code" class="java">  //封装搜索值对象与封装结果值对象
    2. SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand();
    3. //设置超时最大值
    4. cmd.setTimeAllowed(timeAllowed);
    5. SolrIndexSearcher.QueryResult result = new SolrIndexSearcher.QueryResult();
    6. //搜索
    7. searcher.search(result,cmd);
    8. //设置搜索结果
    9. rb.setResult( result );
    10. rsp.add("response",rb.getResults().docList);
    11. rsp.getToLog().add("hits", rb.getResults().docList.matches());
    12. //对含有字段排序处理
    13. doFieldSortValues(rb, searcher);
    14. //非分布查询过程,且搜索结果数小于50,进行缓存
    15. doPrefetch(rb);
    16. <pre name="code" class="java"><p>目前看到真实获取文档内容的是在</p><p>QueryResponseWriter</p><p>例如xml的输出格式类XMLWriter</p></pre><p></p>
    17. <pre></pre>
    18. <pre></pre>
    19. <br>
    20. <p></p>
    21. <h2><a name="t10"></a>分布式处理<br>
    22. </h2>
    23. <h3><a name="t11"></a>1)distributedProcess</h3>
    24. <p></p><pre name="code" class="java">  @Override
    25. public int distributedProcess(ResponseBuilder rb) throws IOException {
    26. if (rb.stage < ResponseBuilder.STAGE_PARSE_QUERY)
    27. return ResponseBuilder.STAGE_PARSE_QUERY;
    28. if (rb.stage == ResponseBuilder.STAGE_PARSE_QUERY) {
    29. createDistributedIdf(rb);
    30. return ResponseBuilder.STAGE_EXECUTE_QUERY;
    31. }
    32. if (rb.stage < ResponseBuilder.STAGE_EXECUTE_QUERY) return ResponseBuilder.STAGE_EXECUTE_QUERY;
    33. if (rb.stage == ResponseBuilder.STAGE_EXECUTE_QUERY) {
    34. //分布式查询
    35. createMainQuery(rb);
    36. return ResponseBuilder.STAGE_GET_FIELDS;
    37. }
    38. if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) return ResponseBuilder.STAGE_GET_FIELDS;
    39. if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
    40. //这里就会去对应的主机拿取需要的字段,封装请求字段的参数,放进请求队列里,可以由外部的searchHandler提交该请求,最后结果放在ShardResponse类里。
    41. createRetrieveDocs(rb);
    42. return ResponseBuilder.STAGE_DONE;
    43. }
    44. return ResponseBuilder.STAGE_DONE;
    45. }</pre><br>
    46. <br>
    47. <p></p>
    48. <p>   <br>
    49. </p>
    50. <p><br>
    51. </p>
    52. <h3><a name="t12"></a> 2) handleResponses<br>
    53. </h3>
    54. <pre name="code" class="java"> public void handleResponses(ResponseBuilder rb, ShardRequest sreq) {
    55. if ((sreq.purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0) {
    56. //合并ids
    57. mergeIds(rb, sreq);
    58. //合并groupCount
    59. mergeGroupCounts(rb, sreq);
    60. }
    61. if ((sreq.purpose & ShardRequest.PURPOSE_GET_FIELDS) != 0) {
    62. //获取文档的字段,并将结题组装起来放到最终结果列表对应的位置里
    63. returnFields(rb, sreq);
    64. return;
    65. }
    66. }</pre><br>
    67. <br>
    68. <h3><a name="t13"></a>   3)  finishStage</h3>
    69. <p><br>
    70. </p>
    71. <p> </p><pre name="code" class="java"> @Override
    72. public void finishStage(ResponseBuilder rb) {
    73. //这里说是==获取文档内容的值,在
    74. if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
    75. //有些文档可能已不存在了,则忽略掉
    76. for (Iterator<SolrDocument> iter = rb._responseDocs.iterator(); iter.hasNext();) {
    77. if (iter.next() == null) {
    78. iter.remove();
    79. rb._responseDocs.setNumFound(rb._responseDocs.getNumFound()-1);
    80. }
    81. }
    82. rb.rsp.add("response", rb._responseDocs);
    83. }
    84. }
    85. </pre><br>
    86. <p></p>
    87. <p><span style="color:#FF0000"><br>
    88. </span></p>
    89. <p><span style="color:#FF0000">同样最后的结果是保存在<br>
    90. <br>
    91. ResponseBuilder <br>
    92. <br>
    93. ResponseBuilder <br>
    94. NamedList values = new SimpleOrderedMap();<br>
    95. <br>
    96. 这个字段里,以键为"response",单机存储的是lucene 的内部id列表<br>
    97. 如果是分布式,则存储的是SolrDocumentList,不用再去索引拿出对应的存储字段,<br>
    98. 这个在QueryResponseWriter里有对应的处理</span><br>
    99. </p>
    100. <p></p>
    101. <p><br>
    102. </p>
    103. <p><br>
    104. </p>
    105. <p><br>
    106. </p>
    107. <p><br>
    108. </p>
    109. <p><br>
    110. </p>
    111. <p><br>
    112. </p>
    113. <p><br>
    114. </p>
    115. <p><br>
    116. </p>
    117. <p></p>
    118. </pre>
上一篇:JDK容器类List,Set,Queue源码解读


下一篇:LinkedList 源码解读