ES索引备份

#!/usr/bin/env python
# -*- coding:utf-8 -*-

"""
/**************************************************************
**************************************************************/
获取ES中所有的文档数据
filename data_es.py
python3
"""

import sys
import json
import requests

# Python 3 默认字符串类型已经是Unicode,不需要设置默认编码

host = "10.233.54.21"
port = 9200

def dump_es_ids(scroll_name, scroll_id):
    """
    循环发送请求获取所有es
    :return:
    """
    url = f"http://{host}:{port}/_search/scroll"
    data = {
        "scroll": scroll_name,
        "scroll_id": scroll_id
    }

    header = {"Content-Type": "application/json"}
    response = requests.post(url, json=data, headers=header)
    response.raise_for_status()  # 检查HTTP请求是否成功
    return response.json()

def get_scroll(size, scroll_name, index):
    """
    获取第一批数据和scroll
    :return:
    """
    url = f"http://{host}:{port}/{index}/_search?scroll={scroll_name}"
    data = {
        "size": size,
        "query": {
            "match_all": {}
        }
    }

    header = {"Content-Type": "application/json"}
    response = requests.post(url, json=data, headers=header)
    response.raise_for_status()  # 检查HTTP请求是否成功
    return response.json()

def has_more(result_obj, index):
    """
    查看是否还有更多数据
    :param result_obj:
    :return:
    """
    try:
        obj_list = result_obj["hits"]["hits"]
        if len(obj_list) > 0:
            print(f"[INFO] index {index} has more data")
            return True
        else:
            return False
    except Exception as e:
        print(f"[ERROR] got error {e}")
        return False

def get_id_list(result_obj):
    """
    从结果集中获取id列表
    :param result_obj:
    :return:
    """
    obj_list = result_obj["hits"]["hits"]
    id_list = [item["_source"] for item in obj_list]
    return id_list

def main():
    """
    获取文档的所有id
    :return:
    """
    index = sys.argv[1]
    out_file = sys.argv[2]
    scroll_name = "5m"
    size = 1000
    scroll_obj = get_scroll(size, scroll_name, index)
    scroll_id = scroll_obj["_scroll_id"]
    print(f"[INFO] scroll_id is {scroll_id}")
    result_obj = scroll_obj
    counter = 0
    with open(out_file, "w") as out:
        while has_more(result_obj, index):
            counter += len(result_obj["hits"]["hits"])
            result_obj_list = get_id_list(result_obj)
            print(f"[INFO] index {index} get data length {len(result_obj_list)}")
            result_obj = dump_es_ids(scroll_name, scroll_id)
            print(f"[INFO] index {index} list total length {len(result_obj_list)}")
            print(f"[INFO] index {index} now total logs {counter}")

            for obj in result_obj_list:
                out.write(f"{json.dumps(obj, ensure_ascii=False)}\n")

if __name__ == '__main__':
    main()




host = "10.92.204.60" # 修改成对应elasticsearch-master svc的IP   ,  port = 9200
# 找到svc
kubectl get svc -n mpks | grep elasticsearch

# 先查看全文索引(导出的索引需要跟rd和drd确认)
curl http://ELASTICSEARCH_MASTER_IP:9200/_cat/indices | grep "fulltext" 

备份执行  curl http://10.233.54.21:9200/_cat/indices | grep "fulltext" | awk -F" " '{if($7>0)print $3}'|awk '{print "touch ",$1," && python3 data_es.py ",$1," ",$1 }' |bash

常见问题:未找到request库
# 方式1pip install request -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn

#方式2  去官网下载https://pypi.org/project/requests/#files 然后到解压后的目录去执行:

python setup.py install
上一篇:MobaXterm基本使用 -- 服务器状态、批量操作、显示/切换中文字体、修复zsh按键失灵


下一篇:怎么在FTP服务器上配置SSL/TLS?