Elasticsearch python操作实践(elasticsearch 、Elasticsearch DSL)
关于Elasticsearch
安装方式
文档
中文文档
Elasticsearch
Elasticsearch DSL操作文档
本文通过python模块对elasticsearch进行简单的增删改查。
本文测试脚本仓库地址:https://github.com/YangJunJ/studyElasticsearch.git
环境
- Python 3.6.2
- elasticsearch 7.13.1
- elasticsearch-dsl 7.3.0
- elasticsearch 7.13.1
安装下面两个python模块
sudo pip3 install elasticsearch
sudo pip3 install elasticsearch_dsl
建立连接
创建文件elasticsearch_obj.py
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, UpdateByQuery
class ElasticSearchObj(object):
def __init__(self):
self.user_key = "user_index"
self.friend_key = "friend_index"
self.test_count_key = "count_test_key"
# 初始化一个Elasticsearch实例对象
self.client = Elasticsearch(hosts=[{'host': "127.0.0.1", "port": 9200}])
def search(self, index):
# 返回索引对象结果级
return Search(using=self.client, index=index)
def update(self, index):
# ubq = UpdateByQuery(index=index).using(self.client)
# or
ubq = UpdateByQuery(index=index, using=self.client)
return ubq
创建或者更新文档
Elasticsearch 使用 JSON作为文档的序列化格式,用起来有点项nosql,文档的字段数量也可以像mongodb文档一样,不一致不相同的形式存储
index插入
create插入
创建文件study_insert.py
import random
from elasticsearch_obj import ElasticSearchObj
class CreateNewRecord(ElasticSearchObj):
def index(self, index, doc_id=None, doc_type="_doc", **kwargs):
"""
如果索引不存在,则会创建索引并写入文档
self.client.index(index, body, doc_type=None, id=None, params=None, headers=None)
http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
:param index: 文档索引名
:param doc_id: 当doc_id=None时,通过该方法会向文档中插入一条记录,doc_id!=None时,如果doc_id已经存在则会更新,不存在则插入
:param doc_type: 文档类型,默认为_doc,相同index下的记录值必须一致,如果不相等,则会触发异常
:param kwargs: 文档内容
:return:
"""
return self.client.index(index=index, body=kwargs, doc_type=doc_type, id=doc_id)
def create(self, index, doc_id=None, doc_type="_doc", **kwargs):
"""
http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
用法同index,不同的是create必须传递文档Id,否则会触发异常错误
:param index:
:param doc_id:
:param doc_type:
:param kwargs:
:return:
"""
return self.client.create(index=index, id=doc_id, doc_type=doc_type, body=kwargs)
def test_insert(self):
"""
插入一些记录,方便等会查询时使用
:return:
"""
hobby = ["唱", "跳", "rap", "唱跳rap", "游泳", "打球", "其他"]
for i in range(100):
response = self.index(
index=self.user_key,
id=i,
name="".join(random.sample('zyxwvutsrqponmlkjihgfedcba', random.randint(3, 8))),
age=random.randint(18, 30),
hobby=random.choice(hobby),
height=random.randint(155, 185),
sex=random.choice(["男", "女", "不详"])
)
print(f"{self.user_key}插入{i}条数据, 返回:{response}")
for i in range(100):
response = self.index(
index=self.friend_key,
user_id=i,
friend_id=random.randint(0, 99),
word=random.sample(["拧螺丝", "撸代码", "CRUD", "划水", "偷偷写下bug"], random.randint(1, 3)),
other={"work_place": random.choice(["第一排", "第二排"]), "performance": random.choice(["好", "不好"])}
)
print(f"{self.friend_key}插入{i}条数据, 返回:{response}")
for i in range(20000):
response = self.index(
index=self.test_count_key,
user_id=i,
friend_id=random.randint(0, 99),
weight=random.randint(0, 10000)
)
print(f"{self.test_count_key}插入{i}条数据, 返回:{response}")
if __name__ == "__main__":
test = CreateNewRecord()
test.test_insert()
查询
页查询子句
- 模糊查询:match, 返回与提供的文本、数字、日期或布尔值匹配的文档,包括用于模糊匹配
- 精确查询:term, 该术语必须与字段值完全匹配,包括空格和大写
-
范围查询:range, 返回包含提供范围内的术语的文档
- gt: 大于(可选)
- gte: 大于或等于(可选)
- lt: 小于(可选)
- lte:小于或等于(可选)
- format:用于转换date查询中值的日期格式(可选,字符串)
用法可以查看下面脚本
创建文件study_search.py
from elasticsearch_obj import ElasticSearchObj
class Search(ElasticSearchObj):
"""
示例 elasticsearch_dsl.Search 基本操作
查询的字段值中如果包含中文、下划线等其他特殊字符, 字段需要需要加上 __keyword,否则无法匹配, 用法详见示例
介绍两个查询中需要使用到的方法:
elasticsearch_dsl.Search.scan(): 将搜索条件扫描索引,返回所有与条件匹配文档生成器
elasticsearch_dsl.Search.to_dict(): 将hit对象系列化成python字典记录
"""
def get_all_record(self):
"""
示例获取self.key下的所有文档
:return:
"""
# all_record 是 elasticsearch_dsl.search.Search对象,可以通过scan() 方法遍历self.key索引内的所有文档
all_record = self.search(self.user_key)
for record in all_record.scan():
# record 是 elasticsearch_dsl.response.hit.Hit 对象
# 可以通过.属性获取字段值, 比如: record.name
# 如果字段名不存在,则会触发'Hit' object has no attribute 错误
print(f"record.id:{record.id}, record.name:{record.name}")
# 上面的是其中一种方法,它也可以通过to_dict()方法,将记录通过python字典的形式返回
print(record.to_dict())
def print_all_record(self, explain, all_record, max_count=3):
i = 1
for record in all_record.scan():
print(explain, record.to_dict())
if i >= max_count:
print(explain, f"查询数量大于{i},后面的记录不打印了")
break
i += 1
self.cross_line()
@staticmethod
def cross_line():
print('=================================================')
def get_record_by_query(self):
"""
示例query的常用用法
:return:
"""
all_record = self.search(self.user_key)
# 例如要找到爱好中包含rap的用户信息
match_rap = all_record.query("match", hobby="rap")
self.print_all_record("查看所有包含爱好rap的用户:", match_rap)
# 查找爱好中含有rap,并且性别为女的用户信息
match_rap_female = all_record.query("match", hobby="rap").query("term", sex__keyword="女")
# match_rap_female = match_rap.query("term", sex__keyword="女")
self.print_all_record("查看爱好含有rap的所有女用户:", match_rap_female)
# 查找身高大于175的男用户,并且爱好只有唱的用户
man_sing_gt_175 = all_record.query("range", height={"gt": 175}).query(
"term", sex__keyword="男").query(
"term", hobby__keyword="唱")
self.print_all_record("查找身高大于175的男用户,并且爱好只有唱的用户", man_sing_gt_175)
def get_record_by_filter(self):
"""
示例filter的用法
filter 与 query用法一样,这里只举一个简单例子
:return:
"""
all_record = self.search(self.user_key)
# 查找年龄大于等于25的男用户,并且爱好是唱跳rap
all_record = all_record.filter("range", age={"gte": 25}).filter(
"term", sex__keyword="男").filter(
"term", hobby__keyword="唱跳rap")
self.print_all_record("查找年龄大于等于25的男用户,并且爱好是唱跳rap:", all_record)
def get_record_by_query_and_filter(self):
"""
query、filter可以混用
举一个例子
:return:
"""
all_record = self.search(self.user_key)
# 查找年龄在20-25的男性用户
all_record = all_record.query("range", age={"gte": 20, "lte": 25}).filter("term", sex__keyword="男")
self.print_all_record("查找年龄在20-25的男性用户:", all_record)
def get_record_by_exclude(self):
"""
非查询
:return:
"""
all_record = self.search(self.user_key)
# 查找性别不是男生,并且爱好不是唱的用户
records = all_record.exclude("term", sex__keyword="男").exclude("term", hobby__keyword="唱")
self.print_all_record("查找性别不是男生,并且爱好不是唱的用户:", records, 6)
# 查找男性用户中,爱好不是唱且身高大于175的用户
records = all_record.filter("term", sex__keyword="男").exclude(
"term", hobby__keyword="唱").query(
"range", height={"gt": 175})
self.print_all_record("查找男性用户中,爱好不是唱且身高大于175的用户:", records, 6)
def other_search(self):
"""
其他查询:
获取文档数量:.count()
查询结果排序:sort(field)
:return:
"""
all_record = self.search(self.user_key)
# 获取id>=90的文档数量
gte_90 = all_record.filter("range", id={"gte": 90})
print(f"id>=90的文档 存在,数量为:{gte_90.count()}") # id>=90的文档 存在,数量为:10
# 获取id>=300的文档数量
gte_300 = all_record.filter("range", id={"gte": 300})
print(f"id>=300的文档 存在,数量为:{gte_300.count()}") # id>=300的文档 存在,数量为:0
# 排序 获取排序结果后,不能使用san(), 比如:self.search(self.key).sort("-height").scan()
# 使用scan()不会以任何预先确定的顺序返回结果, 详见:
# https://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.scan
# 不使用san() 获取记录会有数量10000条的限制,如果查询文档数量大于10000,不使用san()方法便会出现异常
# 下面查询会触发异常:
# all_record = self.search(self.test_count_key)
# for record in all_record[0: 15000]:
# print(record.to_dict())
# 因为test_count_key下数量已经达到20000条,当取15000条数据时会触发:elasticsearch.exceptions.RequestError异常
# 查询id>90, 且按玩家身高排序从大到小排序
gte_90_sort = all_record.sort("-height")
# self.print_all_record("查询id>90, 且按玩家身高排序从大到小排序:", gte_90_sort, 10)
for record in gte_90_sort:
print(record.to_dict())
self.cross_line()
# 查询id>90, 且按玩家身高排序从小到大排序, 如果身高相同,则按id降序排序
gte_90_sort = all_record.filter("range", id={"gte": 90}).sort("height", "-id")
for record in gte_90_sort:
print(record.to_dict())
self.cross_line()
# 分页
# 查询用户用户数据,跳过前五十条后20条
skip_50_search_20 = all_record[50: 70]
i = 1
for record in skip_50_search_20:
print(f"第{i}条数据, 用户id:{record.to_dict()['id']}")
i += 1
if __name__ == "__main__":
search = Search()
# search.get_all_record()
# search.get_record_by_query()
# search.get_record_by_filter()
# search.get_record_by_query_and_filter()
# search.get_record_by_exclude()
search.other_search()
更新
创建文件study_search.py
详见示例:update_example
import time
from elasticsearch_obj import ElasticSearchObj
class Update(ElasticSearchObj):
def refresh(self, index):
self.client.indices.refresh(index=index)
@staticmethod
def cross_line():
print('=================================================')
def print_all_record(self, explain, all_record):
for record in all_record.scan():
print(explain, record.to_dict())
self.cross_line()
def update_example(self):
# 将user_key id为90记录,name改为”我是来测试的“, height增加50
# 查找user_key id为90更新前的记录
update_before = self.search(self.user_key).filter("term", id=90)
self.print_all_record("查找user_key id为90更新前的记录:", update_before)
update_obj = self.update(self.user_key).filter(
"term", id=90).script(
source="ctx._source.name=params.name;ctx._source.height+=params.height;",
params={
"name": "我是来测试的",
"height": 50
})
update_response = update_obj.execute() # 请求执行并返回执行结果
print("user_key id为90记录,name改为”我是来测试的“, height增加50 更新返回结果:", update_response)
update_after = self.search(self.user_key).filter("term", id=90)
self.print_all_record("查找user_key id为90更新后的记录1:", update_after)
time.sleep(1)
update_after = self.search(self.user_key).filter("term", id=90)
self.print_all_record("查找user_key id为90更新后的记录2:", update_after)
# 执行结果就不沾出来了,但有执行的可以看出
# 更新后记录1:是更新前的结果; 而更新后记录2:是更新后的结果
# 造成这一现象是因为上一个请求更新在内部并未完全完成,所以在sleep 一秒后便能获取更新后的记录
print(end="\n\n\n")
# 如果需要在更新后立即获取最新文档,又不知道多久能够完成更新, 可以在执行更新后刷新文档
# 看下面示例
# 将user_key 下id为70的文档,name改为”试试刷新index能否更新后获取最新数据“,height增加一百
print("user_key 下id为70的文档,name改为”试试刷新index能否更新后获取最新数据“,height增加一百")
update_before = self.search(self.user_key).filter("term", id=70)
self.print_all_record("查找user_key id为80更新前的记录:", update_before)
update_obj = self.update(self.user_key).filter(
"term", id=70).script(
source="ctx._source.name=params.name;ctx._source.height+=params.height;",
params={
"name": "试试刷新index能否更新后获取最新数据",
"height": 100
})
update_response = update_obj.execute() # 请求执行并返回执行结果
print("user_key id为70记录,name改为”试试刷新index能否更新后获取最新数据“, height增加100 更新返回结果:", update_response)
self.refresh(self.user_key) # 刷新
update_after = self.search(self.user_key).filter("term", id=70)
self.print_all_record("查找user_key id为70更新后的记录1:", update_after)
# 默认情况下,Elasticsearch 每秒都会定期刷新索引, 如果并不需要获取更新后的文档,尽量就不要手动刷新了
# 可以通过更新响应的total跟updated数量是否一致判断记录是否更新成功
# 查询更新会更新所有匹配的文档,查询条件跟上面介绍的查询用法一致
# 例如:将user_key所有age增加一岁
response = self.update(self.user_key).script(source="ctx._source.age+=1;").execute()
print("将user_key所有age增加一岁:", response["total"], response["updated"], "response=", response)
# 此处增加刷新,是因为上一个执行是更新整个user_key,如果还未自动刷新,执行下面示例,或造成并发异常,
# 导致elasticsearch.exceptions.ConflictError异常
self.refresh(self.user_key)
print(end="\n\n\n")
# 特别注意的是,如果script定义的字段,查询的文档存在则会更新,不存在则会在文档中插入字段
print("将user_key下id为1的文档增加一个字段test_field,值为:[1,2,3]")
update_before = self.search(self.user_key).filter("term", id=1)
self.print_all_record("查找user_key id为1更新前的记录:", update_before)
update_obj = self.update(self.user_key).filter(
"term", id=1).script(
source='ctx._source.test_field=params.test_field',
params={
"test_field": [1, 2, 3],
})
response = update_obj.execute()
print("将user_key下id为1的文档增加一个字段test_field:", response)
self.refresh(self.user_key) # 刷新
update_after = self.search(self.user_key).filter("term", id=1)
self.print_all_record("查找user_key 将user_key下id为1的文档增加一个字段test_field,更新后的记录:", update_after)
# 删除字段
print("将user_key下id为1的文档增加的test_field字段移除")
update_before = self.search(self.user_key).filter("term", id=1)
self.print_all_record("查找user_key id为1更新前的记录:", update_before)
response = self.update(self.user_key).filter(
"term", id=1).script(
source='ctx._source.remove("test_field")').execute()
print("将user_key下id为1的文档增加的test_field字段移除:", response)
self.refresh(self.user_key) # 刷新
update_after = self.search(self.user_key).filter("term", id=1)
self.print_all_record("将user_key下id为1的文档增加的test_field字段移除,更新后的记录:", update_after)
if __name__ == "__main__":
update = Update()
update.update_example()
删除
删除索引
根据查询条件删除文档
创建文件study_delete.py
from elasticsearch_obj import ElasticSearchObj
class StudyDelete(ElasticSearchObj):
def delete_index(self, index):
"""
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-delete-index.html
删除整个索引
:param index:
:return:
"""
try:
self.client.indices.delete(index=index)
except Exception:
print(f"需要删除的索引:{index}不存在")
def delete_by_query(self):
"""
示例根据查询条件删除文档
:return:
"""
# 将user_key下id大于90的玩家删除
all_record = self.search(self.user_key)
print("查看删除前文档的数量:", all_record.count())
all_record.filter("range", id={"gt": 90}).delete()
self.client.indices.refresh(index=self.user_key)
all_record = self.search(self.user_key)
print(f"查看删除后文档的数量:{all_record.count()}")
if __name__ == "__main__":
study_delete = StudyDelete()
study_delete.delete_index(study_delete.friend_key) # 删除索引friend_key
study_delete.delete_by_query()