#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
/**************************************************************
**************************************************************/
获取ES中所有的文档数据
filename data_es.py
python3
"""
import sys
import json
import requests
# Python 3 默认字符串类型已经是Unicode,不需要设置默认编码
host = "10.233.54.21"
port = 9200
def dump_es_ids(scroll_name, scroll_id):
"""
循环发送请求获取所有es
:return:
"""
url = f"http://{host}:{port}/_search/scroll"
data = {
"scroll": scroll_name,
"scroll_id": scroll_id
}
header = {"Content-Type": "application/json"}
response = requests.post(url, json=data, headers=header)
response.raise_for_status() # 检查HTTP请求是否成功
return response.json()
def get_scroll(size, scroll_name, index):
"""
获取第一批数据和scroll
:return:
"""
url = f"http://{host}:{port}/{index}/_search?scroll={scroll_name}"
data = {
"size": size,
"query": {
"match_all": {}
}
}
header = {"Content-Type": "application/json"}
response = requests.post(url, json=data, headers=header)
response.raise_for_status() # 检查HTTP请求是否成功
return response.json()
def has_more(result_obj, index):
"""
查看是否还有更多数据
:param result_obj:
:return:
"""
try:
obj_list = result_obj["hits"]["hits"]
if len(obj_list) > 0:
print(f"[INFO] index {index} has more data")
return True
else:
return False
except Exception as e:
print(f"[ERROR] got error {e}")
return False
def get_id_list(result_obj):
"""
从结果集中获取id列表
:param result_obj:
:return:
"""
obj_list = result_obj["hits"]["hits"]
id_list = [item["_source"] for item in obj_list]
return id_list
def main():
"""
获取文档的所有id
:return:
"""
index = sys.argv[1]
out_file = sys.argv[2]
scroll_name = "5m"
size = 1000
scroll_obj = get_scroll(size, scroll_name, index)
scroll_id = scroll_obj["_scroll_id"]
print(f"[INFO] scroll_id is {scroll_id}")
result_obj = scroll_obj
counter = 0
with open(out_file, "w") as out:
while has_more(result_obj, index):
counter += len(result_obj["hits"]["hits"])
result_obj_list = get_id_list(result_obj)
print(f"[INFO] index {index} get data length {len(result_obj_list)}")
result_obj = dump_es_ids(scroll_name, scroll_id)
print(f"[INFO] index {index} list total length {len(result_obj_list)}")
print(f"[INFO] index {index} now total logs {counter}")
for obj in result_obj_list:
out.write(f"{json.dumps(obj, ensure_ascii=False)}\n")
if __name__ == '__main__':
main()
host = "10.92.204.60" # 修改成对应elasticsearch-master svc的IP , port = 9200
# 找到svc
kubectl get svc -n mpks | grep elasticsearch
# 先查看全文索引(导出的索引需要跟rd和drd确认)
curl http://ELASTICSEARCH_MASTER_IP:9200/_cat/indices | grep "fulltext"
备份执行 curl http://10.233.54.21:9200/_cat/indices | grep "fulltext" | awk -F" " '{if($7>0)print $3}'|awk '{print "touch ",$1," && python3 data_es.py ",$1," ",$1 }' |bash
常见问题:未找到request库
# 方式1pip install request -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
#方式2 去官网下载https://pypi.org/project/requests/#files 然后到解压后的目录去执行:
python setup.py install