【Elastic Engineering】Elasticsearch: Reindex 接口

作者:刘晓国


在我们开发的过程中,我们有很多时候需要用到 Reindex 接口。它可以帮我们把数据从一个 index 到另外一个 index 进行重新 reindex。这个对于特别适用于我们在修改我们数据的 mapping 后,需要重新把数据从现有的 index 转到新的 index 建立新的索引,这是因为我们不能修改现有的 index 的 mapping 一旦已经定下来了。在接下来的介绍中,我们将学习如何使用 reindex 接口。


【Elastic Engineering】Elasticsearch: Reindex 接口


为了能够使用 reindex 接口,我们必须满足一下的条件:


_source 选项对所有的源 index 文档是启动的,也即源 index 的 source 是已经被存储的

reindex 不是帮我们尝试设置好目的地 index。它不拷贝源 index 的设置到目的地的 index 里去。你应该在做 reindex 之前把目的地的源的 index 设置好,这其中包括 mapping, shard 数目,replica 等

下面,我们来一个具体的例子,比如建立一个 blogs 的 index。

PUT twitter2/_doc/1
{
  "user" : "双榆树-张三",
  "message" : "今儿天气不错啊,出去转转去",
  "uid" : 2,
  "age" : 20,
  "city" : "北京",
  "province" : "北京",
  "country" : "中国",
  "address" : "中国北京市海淀区",
  "location" : {
    "lat" : "39.970718",
    "lon" : "116.325747"
  }
}

上面的命令让我们建立了一个叫做 twitter2 的 index,并同时帮我们生产了一个如下的 mapping:

GET /twitter2/_mapping

显示的结果是:

{
  "twitter2" : {
    "mappings" : {
      "properties" : {
        "address" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "age" : {
          "type" : "long"
        },
        "city" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "country" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "location" : {
          "properties" : {
            "lat" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "lon" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            }
          }
        },
        "message" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "province" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "uid" : {
          "type" : "long"
        },
        "user" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}

显然根据我们之前的练习 “开始使用Elasticsearch (2)”,系统帮我们生产的 location 数据类型是不对的,我们必须进行修改。一种办法是删除现有的 twitter2 索引,让后修改它的 mapping,再重新索引所有的数据。这对于一个两个文档还是可以的,但是如果已经有很多的数据了,这个方法并不可取。另外一种方式,是建立一个完全新的 index,使用新的 mapping 进行 reindex。下面我们展示如何使用这种方法。


创建一个新的 twitter3 的 index,使用如下的 mapping:

PUT twitter3
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "address": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "age": {
        "type": "long"
      },
      "city": {
        "type": "text"
      },
      "country": {
        "type": "text"
      },
      "location": {
        "type": "geo_point"
      },
      "message": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "province": {
        "type": "text"
      },
      "uid": {
        "type": "long"
      },
      "user": {
        "type": "text"
      }
    }
  }
}

这里我们我们修改了 location 及其它的一些数据项的数据类型。运行上面的指令,我们就可以创建一个完全新的 twitter3 的 index。我们可以通过如下的命令来进行 reindex:

POST _reindex
{
  "source": {
    "index": "twitter2"
  },
  "dest": {
    "index": "twitter3"
  }
}

显示的结果是:

{
  "took" : 52,
  "timed_out" : false,
  "total" : 1,
  "updated" : 0,
  "created" : 1,
  "deleted" : 0,
  "batches" : 1,
  "version_conflicts" : 0,
  "noops" : 0,
  "retries" : {
    "bulk" : 0,
    "search" : 0
  },
  "throttled_millis" : 0,
  "requests_per_second" : -1.0,
  "throttled_until_millis" : 0,
  "failures" : [ ]
}

我们可以通过如下的命令来检查我们的 twitter3 是否已经有新的数据:

GET /twitter3/_search

显示的结果是:

{
  "took" : 100,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "twitter3",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "user" : "双榆树-张三",
          "message" : "今儿天气不错啊,出去转转去",
          "uid" : 2,
          "age" : 20,
          "city" : "北京",
          "province" : "北京",
          "country" : "中国",
          "address" : "中国北京市海淀区",
          "location" : {
            "lat" : "39.970718",
            "lon" : "116.325747"
          }
        }
      }
    ]
  }
}

显然我们的数据已经从 twitter2 到 twitter3,并且它的数据类型已经是完全符合我们要求的数据类型。


在执行 reindex 的时候,为了提高效率,我们可以通过设置 slices 参数来并行处理。在通常的情况下,我们可以设置 slices为 auto Elasticsearch【Elastic Engineering】Elasticsearch: Reindex 接口 自动选择 slices 的数目来进行加速。Elasticsearch 会依据你电脑中的 core 的数量来进行决定。我们可以使用如下的格式:

POST _reindex?slices=auto
{
  "source": {
    "index": "twitter2"
  },
  "dest": {
    "index": "twitter3"
  }
}


Reindex 执行


Reindex 是一个时间点的副本

就像上面返回的结果显示的那样,它是以 batch(批量)的方式来执行的。默认的批量大小为 1000

你也可以只拷贝源 index 其中的一部分数据

        -  通过加入 query 到 source 中


        -  通过定义 max_docs 参数


比如:

POST _reindex
{
  "max_docs": 100,
  "source": {
    "index": "twitter2",
    "query": {
      "match": {
        "city": "北京"
      }
    }
  },
  "dest": {
    "index": "twitter3"
  }
}

这里,我们定义最多不超过 100 个文档,同时,我们只拷贝来自“北京”的 twitter 记录。


设置 op_type to create 将导致 _reindex 仅在目标索引中创建缺少的文档。 所有现有文档都会导致版本冲突,比如:

POST _reindex
{
  "source": {
    "index": "twitter2"
  },
  "dest": {
    "index": "twitter3",
    "op_type": "create"
  }
}

如果我们之前已经做过 reindex,那么我们可以看到如下的结果:

{
  "took": 2,
  "timed_out": false,
  "total": 1,
  "updated": 0,
  "created": 0,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 1,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1,
  "throttled_until_millis": 0,
  "failures": [
    {
      "index": "twitter3",
      "type": "_doc",
      "id": "1",
      "cause": {
        "type": "version_conflict_engine_exception",
        "reason": "[1]: version conflict, document already exists (current version [5])",
        "index_uuid": "ffz2LNIIQqqDx211R5f4fQ",
        "shard": "0",
        "index": "twitter3"
      },
      "status": 409
    }
  ]
}

它表明我们之前的文档 id 为1的有版本上的冲突。


默认情况下,版本冲突会中止 _reindex 进程。 “conflict” 请求 body 参数可用于指示 _reindex 继续处理版本冲突的下一个文档。 请务必注意,其他错误类型的处理不受 “conflict” 参数的影响。 当 “conflict”:在请求正文中设置 “proceed” 时, _reindex 进程将继续发生版本冲突并返回遇到的版本冲突计数:

POST _reindex
{
  "conflicts": "proceed",
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "op_type": "create"
  }
}


Throttling


重新索引大量文档可能会使你的群集泛滥甚至崩溃。requests_per_second 限制索引操作速率。

POST _reindex?requests_per_second=500 
{
  "source": {
    "index": "blogs",
    "size": 500
  },
  "dest": {
    "index": "blogs_fixed"
  }
}


运用 index 别名来进行 reindex


我们可以通过如下的方法来实现从一个 index 到另外一个 index 的数据转移:


PUT test     
PUT test_2   
POST /_aliases
{
    "actions" : [
        { "add":  { "index": "test_2", "alias": "test" } },
        { "remove_index": { "index": "test" } }  
    ]
}

在上面的例子中,假如我们地添加了一个叫做 test 的 index,而 test_2 是我们想要的。我们直接可以通过上面的方法吧 test 中的数据交换到 test_2 中,并同时把 test 索引删除。


从远处进行 reindex


_reindex 也支持从一个远处的 Elasticsearch 的服务器进行 reindex,它的语法为:

POST _reindex
{
  "source": {
    "index": "blogs",
    "remote": {
      "host": "http://remote_cluster_node1:9200",
      "username": "USERNAME",
      "password": "PASSWORD"
    }
  },
  "dest": {
    "index": "blogs"
  }
}

这里显示它从一个在 http://remote_cluster_node1:9200 的服务器来拷贝文件从一个 index 到另外一个 index。我们必须注意的是:remote hosts 必须很清楚地在 elasticsearch.yml 这个配置文件中的 reindex.remote.whitelist 中进行定义。


Update by Query


虽然这个不在我们的 reindex 介绍范围,但是在有些情况下,我们可以可以通过 _update_by_query API 来让我们轻松地更新一个字段的值:

POST blogs_fixed/_update_by_query
{
  "query": {
    "match": {
      "category.keyword": ""
    }
  },
  "script": {
    "source": """
       ctx._source['category'] = "None"
     """
  }
}

在上面,把 category.keyword 项为空的所有文档的 category 通过脚本设置为默认的 "None" 字符串。它和 reindex 的作用相似。


为 mapping 添加新的 mulit-field


假设我们要向 twitter_new 索引的 mapping 添加一个多字段(multi-field)


具体来说,假设我们要用新的方法分析 “content” 字段

PUT new_new/_mapping
{
  "properties": {
    "content": {
      "type": "text",
      "fields": {
        "english": {
          "type": "text",
          "analyzer": "english"
        }
      }
    }
  }
}

在上面我们为 content 字段添加了一个新的 english 字段,并且它的 analyzer 为 english。


由于 mapping 已经发生改变,但是索引中已经有的文档没有这个新的字段 english,如果这个时候我们进行如下的搜索,将不会找到任何的结果:

GET twitter_new/_search
{
  "query": {
    "match": {
      "content.english": "performance tips"
    }
  }
}

那么我们该如何使得索引中现有的文档都有 content.english 这个字段呢?运行 _update_by_query 以拥有现有文档选择新的 “content.english” 字段:

POST twitter_new/_update_by_query

当我们完成上面的请求后,然后再执行如下的操作,将会在 twitter_new 索引中搜索到想要的文档:

GET twitter_new/_search
{
  "query": {
    "match": {
      "content.english": "performance tips"
    }
  }
}


参考:


【1】https://www.elastic.co/guide/en/elasticsearch/reference/7.3/docs-reindex.html


上一篇:cocos2d-x学习记录第一篇-环境变量配置


下一篇:为什么logstash进程的CPU使用率100%?