安装就略过了,建议安装最近的就好,网上挺多教程的。两款比较好用的插件elasticsearch-head和kibana。安装好之后学习路线如下
第一阶段:Elasticsearch官方API文档(按每天8小时来算花费一周)
链接:
2.x版本中文
7.x版本英文
这两个官方api文档结合着看,虽然2.x和7.x的有些地方不一样,但是很多底层的东西,包括原理是一致的,而且2.x是中文版的好理解。重要的部分用Postman测测学学
第二阶段:Elasticsearch-py官方API文档(按每天8小时来算花费三天)
链接:7.x
这个是Python调用Elasticsearch的接口建议在Pycharm里测试一下
第三阶段:Elasticsearch-dsl官方API文档(按每天8小时来算花费三天)
链接:elasticsearch-dsl
这个模块能够简化Elasticsearch-py中有关查询的操作,同样建议在Pycharm里测试一下
第四阶段:Elasticsearch实践(按需花费)
案例:Elasticsearch实战 | 如何从数千万手机号中识别出情侣号?
这里附上将案例中的实现方式简单转换为Python实现方式的代码
"""
https://mp.weixin.qq.com/s?__biz=MzI2NDY1MTA3OQ==&mid=2247484728&idx=1&sn=eeb76ad84c98af16fc16d6dc5d5d11af#wechat_redirect"""
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Mapping
from loguru import logger
es = Elasticsearch()
def insert_data():
data = """
{"index": {"_id": 1}},
{"phone_number": "13511112222"},
{"index": {"_id": 2}},
{"phone_number": "13611112222"},
{"index": {"_id": 3}},
{"phone_number": "13711112222"},
{"index": {"_id": 4}},
{"phone_number": "13811112222"},
{"index": {"_id": 5}},
{"phone_number": "13844248474"},
{"index": {"_id": 6}},
{"phone_number": "13866113333"},
{"index": {"_id": 7}},
{"phone_number": "15766113333"}
"""
data = data.replace(',', '')
res = es.bulk(body=data, index="phone_index") # 批量操作
logger.info(res)
def prepare():
body = \
{
"description": "Adds insert_time timestamp to documents",
"processors": [
{
"set": {
"field": "_source.insert_time",
"value": "{{_ingest.timestamp}}"
}
},
{
"script": {
"lang": "painless",
"source": "ctx.last_eight_number = (ctx.phone_number.substring(3,11))"
}
}
]
}
# 创建一个管道
res = es.ingest.put_pipeline(id='initialize', body=body, ignore=400)
logger.info(res)
body = {
"index_patterns": 'phone_index',
"template": {
"settings": {
"number_of_replicas": 0,
"index.default_pipeline": 'initialize',
"index": {
"max_ngram_diff": "13",
"analysis": {
"analyzer": {
"ngram_analyzer": {
"tokenizer": "ngram_tokenizer"
}
},
"tokenizer": {
"ngram_tokenizer": {
"token_chars": [
"letter",
"digit"
],
"min_gram": "1",
"type": "ngram",
"max_gram": "11"
}
}
}
}
},
"mappings": {
"properties": {
"insert_time": {
"type": "date"
},
"last_eight_number": {
"type": "keyword"
},
"phone_number": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
},
"analyzer": "ngram_analyzer"
}
}
}
}
}
# 创建一个索引模板
res = es.indices.put_index_template(name="phone_template", body=body, ignore=400)
logger.info(res)
# 创建索引
res = es.indices.create(index="phone_index", ignore=400)
logger.info(res)
# res = es.indices.create(index="phone_couple_index", ignore=400)
# logger.info(res)
# 插入数据
insert_data()
def get_need_hits_list():
"""提取出情侣号(>=2)的手机号或对应id。"""
body = {
"size": 0,
"query": {
"range": {
"insert_time": {
"gte": 1629659503000,
"lte": 1629688618000
}
}
},
"aggs": {
"last_aggs": {
"terms": {
"field": "last_eight_number",
"min_doc_count": 2,
"size": 10,
"shard_size": 30
},
"aggs": {
"sub_top_hits_aggs": {
"top_hits": {
"size": 100,
"_source": {
"includes": "phone_number"
},
"sort": [
{
"phone_number.keyword": {
"order": "asc"
}
}
]
}
}
}
}
}
}
res = es.search(body=body, index='phone_index')
logger.info(res)
# 获取满足要的id
need_buckets_list = res['aggregations']['last_aggs']['buckets']
i = 0
while i < len(need_buckets_list):
yield need_buckets_list[i]['key'], need_buckets_list[i]['sub_top_hits_aggs']['hits']['hits']
i += 1
def create_couple_index_template():
"""给情侣号创建索引模板"""
body = {
"index_patterns": "phone_couple_[0-9]{8}",
"template": {
"settings": {
"number_of_replicas": 0,
"index": {
"max_ngram_diff": "13",
"analysis": {
"analyzer": {
"ngram_analyzer": {
"tokenizer": "ngram_tokenizer"
}
},
"tokenizer": {
"ngram_tokenizer": {
"token_chars": [
"letter",
"digit"
],
"min_gram": "1",
"type": "ngram",
"max_gram": "11"
}
}
}
}
},
"mappings": {
"properties": {
"phone_number": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
},
"analyzer": "ngram_analyzer"
}
}
}
}
}
res = es.indices.put_index_template(name="phone_couple_template", body=body, ignore=400)
logger.info(res)
def reindex():
"""取出的满足条件的id进行跨索引迁移。"""
g = get_need_hits_list()
while True:
try:
index_key, hits_list = next(g)
ids_list = [hit['_id'] for hit in hits_list]
# 创建一个新的索引
res = es.indices.create(index=f"phone_couple_{index_key}_index", ignore=400)
logger.info(res)
# 索引迁移
body = {
"source": {
"index": "phone_index",
"query": {
"terms": {
"_id": ids_list
}
}
},
"dest": {
"index": f"phone_couple_{index_key}_index"
}
}
res = es.reindex(body=body)
logger.info(res)
except StopIteration:
break
if __name__ == '__main__':
prepare()
create_couple_index_template()
reindex()