es分页查询最新

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class ElasticsearchSearchAfterExample {

private final RestHighLevelClient client;

public ElasticsearchSearchAfterExample(RestHighLevelClient client) {
    this.client = client;
}

/**
 * 使用 search_after 实现分页查询,并每次都重新请求总文档数。
 *
 * @param indexName 索引名称
 * @param pageSize  每页大小
 * @param sortField 排序字段
 * @param pageNum   页码 (从1开始)
 * @param queryMap  查询条件映射
 * @return 包含当前页文档列表和总文档数的 Map
 */
public Map<String, Object> paginateWithSearchAfter(String indexName, int pageSize, String sortField, int pageNum, Map<String, Object> queryMap) throws IOException {
    List<Map<String, Object>> pageDocs = new ArrayList<>();
    Long totalHits = null;

    // 构建查询对象
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    sourceBuilder.query(buildQueryFromMap(queryMap));
    sourceBuilder.size(pageSize);
    sourceBuilder.sort(sortField, SortOrder.ASC);

    // 先获取总文档数
    SearchRequest countRequest = new SearchRequest(indexName);
    SearchSourceBuilder countSourceBuilder = new SearchSourceBuilder();
    countSourceBuilder.query(buildQueryFromMap(queryMap));
    countSourceBuilder.size(0); // 只关心总数,不需要返回具体文档
    countSourceBuilder.trackTotalHits(true); // 确保跟踪总命中数

    countRequest.source(countSourceBuilder);
    SearchResponse countResponse = client.search(countRequest, RequestOptions.DEFAULT);
    totalHits = countResponse.getHits().getTotalHits().value;

    // 构建分页查询请求
    SearchRequest searchRequest = new SearchRequest(indexName);
    searchRequest.source(sourceBuilder);

    // 如果不是第一页,则需要设置 search_after 参数
    if (pageNum > 1) {
        // 获取上一页最后一个文档的排序值,用于构建 search_after 请求
        // 注意:这里简化了处理,实际应用中需要根据具体情况实现
        // 这里假设有一个方法 getLastSortValues 来获取这些值
        List<Object> lastSortValues = getLastSortValues(pageNum - 1, indexName, pageSize, sortField, queryMap);
        if (lastSortValues != null && !lastSortValues.isEmpty()) {
            sourceBuilder.searchAfter(lastSortValues.toArray(new Object[0]));
        }
    }

    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    for (SearchHit hit : searchResponse.getHits()) {
        pageDocs.add(hit.getSourceAsMap());
    }

    // 返回包含文档列表和总文档数的结果
    return Map.of("docs", pageDocs, "totalHits", totalHits);
}

/**
 * 根据查询条件映射构建查询对象。
 */
private org.elasticsearch.index.query.QueryBuilder buildQueryFromMap(Map<String, Object> queryMap) {
    // 示例:假设你有一个名为 "field" 的查询参数用于匹配特定字段
    if (queryMap == null || queryMap.isEmpty()) {
        return QueryBuilders.matchAllQuery();
    }
    return QueryBuilders.termQuery((String) queryMap.getOrDefault("field", ""), queryMap.getOrDefault("value", ""));
}

/**
 * 获取指定页码前一页最后一个文档的排序值。
 */
private List<Object> getLastSortValues(int pageNum, String indexName, int pageSize, String sortField, Map<String, Object> queryMap) throws IOException {
    // 这是一个简化的例子,实际应用中可能需要更复杂的逻辑来获取正确的排序值
    // 这里假设只需要取最后一条记录的排序值作为 search_after 的参数

    SearchRequest searchRequest = new SearchRequest(indexName);
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    sourceBuilder.query(buildQueryFromMap(queryMap));
    sourceBuilder.size(1);
    sourceBuilder.sort(sortField, SortOrder.DESC); // 按降序排列以获取最后一条记录

    if (pageNum > 1) {
        // 计算实际需要跳过的文档数量并遍历到正确的起始位置...
        // (这部分代码与之前的例子相同)
        int skipDocs = (pageNum - 1) * pageSize;
        sourceBuilder.from(skipDocs);
    }

    searchRequest.source(sourceBuilder);
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    if (!searchResponse.getHits().getHits().isEmpty()) {
        SearchHit lastHit = searchResponse.getHits().getHits()[0];
        return List.of(lastHit.getSortValues());
    }

    return null;
}

public static void main(String[] args) throws IOException {
    try (RestHighLevelClient client = new RestHighLevelClient(
            RestClient.builder(new HttpHost("localhost", 9200, "http")))) {

        ElasticsearchSearchAfterExample example = new ElasticsearchSearchAfterExample(client);
        
        // 获取第1页的数据及总数,带查询参数
        Map<String, Object> queryMap = Map.of("field", "status", "value", "active");
        Map<String, Object> resultPage1 = example.paginateWithSearchAfter("my-index-000001", 10, "timestamp", 1, queryMap);
        System.out.println("Documents on Page 1: " + resultPage1.get("docs"));
        System.out.println("Total Hits: " + resultPage1.get("totalHits"));

        // 获取第2页的数据及总数,带相同的查询参数
        Map<String, Object> resultPage2 = example.paginateWithSearchAfter("my-index-000001", 10, "timestamp", 2, queryMap);
        System.out.println("Documents on Page 2: " + resultPage2.get("docs"));
        System.out.println("Total Hits: " + resultPage2.get("totalHits"));

        // 获取第1页的数据及总数,带不同查询参数
        Map<String, Object> queryMap2 = Map.of("field", "status", "value", "inactive");
        Map<String, Object> resultPage1Inactive = example.paginateWithSearchAfter("my-index-000001", 10, "timestamp", 1, queryMap2);
        System.out.println("Documents on Page 1 (Inactive): " + resultPage1Inactive.get("docs"));
        System.out.println("Total Hits (Inactive): " + resultPage1Inactive.get("totalHits"));
    }
}

}

上一篇:深入理解AQS之独占锁ReentrantLock源码分析-1.管程—Java同步的设计思想