18 Rest高级客户端实践(五):ReIndex

文章目录

1 文档的ReIndex

当从一个或者更多的索引中复制相关的文档到一个新的索引中需要重建索引。需要指出的是不会尝试自动设置目标索引的相关设置,不会复制源索引的设置。因此用户需要在reIndex之前,设置目标索引,包括设置映射,分片数量,副本数量。

  1. 构建ReIndexRequest:核心就是设置源索引和目标索引,还有一下其他可选配置
  2. 执行请求
  3. 解析响应

先去创建一个索引:

PUT books_copy
{
  "settings": {
    "number_of_shards": 3, 
    "number_of_replicas": 1
  }, 
  "mappings": {
      "properties": {
        "id":{
          "type": "long"
        },
        "title":{
          "type": "text",
          "analyzer": "ik_max_word"
        },
        "language":{
          "type": "keyword"
        },
        "author":{
          "type": "keyword"
        },
        "price":{
          "type": "double"
        },
        "year":{
          "type": "date",
          "format": "yyyy-MM-dd"
        },
        "description":{
          "type": "text",
          "analyzer": "ik_max_word"
        }
      }
    }
  
}
package study.wyy.esclient.high.index;

import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.junit.Test;
import study.wyy.esclient.high.BaseTest;

import javax.lang.model.element.VariableElement;
import java.io.IOException;
import java.util.List;

/**
 * @author wyaoyao
 * @description
 * @date 2021/1/12 14:04
 */
@Slf4j
public class ReIndexTest extends BaseTest {

    @Test
    public void test() throws IOException {
        // 1 构建ReIndexRequest
        ReindexRequest request = buildReindexRequest();
        // 2 执行
        try {
            BulkByScrollResponse response = client.reindex(request, RequestOptions.DEFAULT);
            // 3 解析
            // 获取总耗时
            TimeValue took = response.getTook();
            log.info("总耗时: {}", took.getMillis());
            // 请求是否超时
            boolean timedOut = response.isTimedOut();
            log.info("请求是否超时: {}", timedOut);
            // 获取已经处理的文档数量
            long total = response.getTotal();
            log.info("处理的文档数量: {}", total);
            // 获取更新的文档数量
            long updated = response.getUpdated();
            log.info("更新的文档数量: {}", updated);
            // 获取创建的文档数量
            long created = response.getCreated();
            log.info("创建的文档数量: {}", created);
            // 获取删除的文档数量
            long deleted = response.getDeleted();
            log.info("删除的文档数量: {}", deleted);
            // 获取执行的批次
            int batches = response.getBatches();
            log.info("执行的批次数量: {}", batches);
            // 获取跳过的文档数量
            long noops = response.getNoops();
            log.info("跳过的文档数量: {}", noops);
            // 获取版本冲突数量
            long versionConflicts = response.getVersionConflicts();
            log.info("版本冲突数量: {}", versionConflicts);
            // 重试批量索引的次数
            long bulkRetries = response.getBulkRetries();
            log.info("重试批量索引的次数: {}", bulkRetries);
            // 重试搜索操作的次数
            long searchRetries = response.getSearchRetries();
            log.info("重试搜索操作的次数: {}", searchRetries);
            // 请求阻塞的总时间,不包括当前处于休眠状态的限制时间
            TimeValue throttled = response.getStatus().getThrottled();
            log.info("请求阻塞的总时间,不包括当前处于休眠状态的限制时间: {}", throttled.getMillis());
            // 获取查询失败
            List<ScrollableHitSource.SearchFailure> searchFailures = response.getSearchFailures();
            log.info("查询失败的数量: {}", searchFailures != null ? searchFailures.size() : 0);
            // 获取批量操作失败
            List<BulkItemResponse.Failure> bulkFailures = response.getBulkFailures();
            log.info("批量操作失败的数量: {}", bulkFailures != null ? bulkFailures.size() : 0);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            client.close();
        }

    }


    public ReindexRequest buildReindexRequest() {
        // 1 构建ReIndexRequest
        ReindexRequest request = new ReindexRequest();
        // 1.1 设置源索引,接收的是一个可变参数,可设置多个索引
        request.setSourceIndices("books");
        // 1.2 设置目标索引: 注意先去将该索引的映射,分片等信息设置好
        request.setDestIndex("books_copy");
        // 1.3 其他可选参数
        // 设置目标索引的版本类型
        // request.setDestVersionType(VersionType.EXTERNAL);
        // 设置目标索引的操作类型
        // request.setDestOpType("create");
        // 默认情况下,版本冲突会中止重新索引进程
        // request.setConflicts("proceed");
        // 通过添加查询限制文档,比如这里就是只对language字段词条是包括java的进行操作
        // 简单了来说就是进行文档的过滤
        request.setSourceQuery(new TermQueryBuilder("language", "java"));
        // 默认情况下是1000
        // request.setSourceBatchSize(100);
        // 设置超时时间
        request.setTimeout(TimeValue.timeValueMinutes(10));
        // reIndex之后刷新索引
        request.setRefresh(true);
        return request;
    }
}

输出:

总耗时: 319
请求是否超时: false
处理的文档数量: 2
更新的文档数量: 0
创建的文档数量: 2
删除的文档数量: 0
执行的批次数量: 1
跳过的文档数量: 0
版本冲突数量: 0
重试批量索引的次数: 0
重试搜索操作的次数: 0
请求阻塞的总时间,不包括当前处于休眠状态的限制时间: 0
查询失败的数量: 0
批量操作失败的数量: 0

查询一下:

GET books_copy/_search
{
  "query": {
    "match_all": {}
  }
}

结果: 只有language为java的文档被复制过来了

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "books_copy",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "author" : "葛一鸣",
          "price" : 45.6,
          "publish_time" : "2021-01-11T02:47:27.580Z",
          "description" : "让你的Java程序更快、更稳定。深入剖析软件设计层面、代码层面、JVM虚拟机层面的优化方法",
          "language" : "java",
          "id" : 2,
          "title" : "Java程序性能优化"
        }
      },
      {
        "_index" : "books_copy",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "id" : "1",
          "title" : "Java编程思想",
          "language" : "java",
          "author" : "Bruce Eckel",
          "price" : 70.2,
          "publish_time" : "2007-10-01",
          "description" : "Java学习必读经典,殿堂级著作!赢得了全球程序员的广泛赞誉。"
        }
      }
    ]
  }
}

关于VersionType的说明:

version type 可以有一下几种:

  • internal: 仅在给定版本与所存储文档的版本相同时才对文档建立索引。
  • external or external_gt: 仅当给定版本严格高于存储文档的版本或没有现有文档时,才对文档建立索引。 给定的版本将用作新版本,并将与新文档一起存储。 提供的版本必须为非负长整数。
    external_gte: 仅当给定版本等于或高于存储文档的版本时,才对文档建立索引。 如果没有现有文档,该操作也将成功。 给定的版本将用作新版本,并将与新文档一起存储。 提供的版本必须为非负长整数。
  • external_gte版本类型用于特殊用例,应谨慎使用。 如果使用不当,可能会导致数据丢失。 还有另一种选择,即force,它已被弃用,因为它可能导致主分片和副本分片分开。

在reIndex时,如果将其设置为internal或者是缺省设置,就会导致es盲目的将文档转存到目标索引中。如果设置为external,则es就会保留i源文档中的版本,并更新目标索引中版本比源文档旧的版本

上一篇:reindex简单使用学习总结,总结了在一些场景中使用reindex做Elasticsearch数据迁移的方式。


下一篇:ElasticSearch如何修改索引字段