Elasticsearch,分布式搜索引擎
1. Elasticsearch入门
- Elasticsearch简介
- 一个分布式的、Restful风格的搜索引擎
- 支持对各种类型的数据的检索
- 搜索速度快、可以提供实时的搜索服务
- 便于扩展、每秒可以处理PB级海量数据
- Elasticsearch术语
- 索引、类型、文档、字段
- 集群、节点、分片、副本
1.1 术语解释
- 索引:相当于MySQL数据库中的database(数据库)
- 类型:相当于MySQL数据库中的table(表)
- 文档:相当于MySQL数据库中的table的一行数据,数据结构为JSON
- 字段:相当于MySQL数据库中的table的一列
Elasticsearch 6.0 以后开始逐步废除类型的概念,索引的含义中也包括了类型
- 集群:分布式部署,提高性能
- 节点:集群中的每一台服务器
- 分片:对一个索引的进一步划分存储,提高并发处理能力
- 副本:对分片的备份,提高可用性
Elasticsearch连接:官网
1.2 Elasticsearch配置
为了与使用的SpringBoot版本相兼容,Elasticsearch的版本选择为6.4.3
文件目录结构为:
修改config目录下elasticsearch.yml文件
2. 配置环境变量
1.3 安装中文分词插件
- github 上搜索
- 解压到指定的目录下
ik目录下的目录结构
在config目录下IkAnalyzer.cfg.xml中可以自己配置新词
其内容为:
1.4 安装Postman
Postman相关连接:官网
按部就班安装即可。
1.5 使用命令行操作Elasticsearch
- 启动Elasticsearch
双击bin目录下elasticsearch.bat
- 常用命令介绍
- 查看节点健康状态
curl -X GET "localhost:9200/_cat/health?v"
- 查看节点具体信息
curl -X GET "localhost:9200/_cat/nodes?v"
- 查看索引相关信息
curl -X GET "localhost:9200/_cat/indices?v"
查看状态显示yellow是因为没有分片(备份)。 - 删除索引
curl -X DELETE "localhost:9200/test" //test就是索引名字
再次查询索引相关信息,没有了test
- 创建索引
curl -X PUT "localhost:9200/test" //test就是索引名字
再次查询索引相关信息,又有了test
1.6 使用Postman访问Elasticsearch
- 查索引
- 删除索引
再次查询索引相关信息,没有了test
- 新建索引
再次查询索引相关信息,又有了test
- 提交数据
//test:索引 _doc:固定格式 1:id号 然后在请求body中写数据
PUT localhost:9200/test/_doc/1
- 查数据
- 改数据
和添加数据操作相同,底层会先删除再添加 - 删除数据
- 搜索功能演示
添加几组数据
搜索演示:
搜索时,也可以先分词后搜索,并不一定完全匹配
2. Spring整合Elasticsearch
- 引入依赖
- spring-boot-starter-data-elasticsearch
- 配置Elasticsearch
- cluster-name、cluster-nodes
- Spring Data Elasticsearch
- ElasticsearchTemplate
- ElasticsearchRepository
2.1 引入依赖
<!--elasticsearch-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
2.2 Elasticsearch相关配置
- Spring整合Elasticsearch,在application.properties文件中配置即可
# elasticsearch
# ElasticsearchProperties
# 在安装目录config/elasticserch.yml中找到
spring.data.elasticsearch.cluster-name=my-application
# 9200http访问的端口,9300tcp端口
spring.data.elasticsearch.cluster-nodes=127.0.0.1:9300
- 解决Netty冲突问题
问题原因:Redis底层使用了Netty,Elasticsearch也用了Netty,当被注册两次就会报错
解决思路:Elasticsearch中注册Netty前会判断有无一个参数,如果有就不注册
具体解决方法:
- 给帖子DiscussPost类加上相关注解
package com.ateam.community.entity;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.util.Date;
// indexName:索引名,type:固定为_doc,shards:分片,replis:备份
@Document(indexName = "discusspost", type = "_doc", shards = 6, replicas = 3)
public class DiscussPost {
@Id
private int id;
@Field(type = FieldType.Integer)
private int userId;
//analyzer:存储时的解析器,
/*
举例
存入:互联网校招--->建立最大的索引(就是各种拆分)
本质上是根据存入的内容,提取出关键词,然后用关键词关联存储的内容,后来我们搜索的时候,可以根据关键词找到存入的内容;
所以在保存的时候,应该拆分出尽可能多的关键词,以增加后续被搜索到的几率和范围
*/
//searchAnalyzer:存储时的解析器, 拆分尽可能少的,但满足你意图的分词器
/*
ik_max_word、ik_smart都是分词器的名字
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String title;
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String content;
@Field(type = FieldType.Integer)
private int discussType;
@Field(type = FieldType.Integer)
private int status;
@Field(type = FieldType.Date)
private Date createTime;
@Field(type = FieldType.Integer)
private int commentCount;
@Field(type = FieldType.Double)
private double score;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public int getDiscussType() {
return discussType;
}
public void setDiscussType(int discussType) {
this.discussType = discussType;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public int getCommentCount() {
return commentCount;
}
public void setCommentCount(int commentCount) {
this.commentCount = commentCount;
}
public double getScore() {
return score;
}
public void setScore(double score) {
this.score = score;
}
@Override
public String toString() {
return "DiscussPost{" +
"id=" + id +
", userId=" + userId +
", title='" + title + '\'' +
", content='" + content + '\'' +
", discussType=" + discussType +
", status=" + status +
", createTime=" + createTime +
", commentCount=" + commentCount +
", score=" + score +
'}';
}
}
2.3 数据层
在dao下新建一个elasticsearch包,并创建DiscussPostRepository接口
/*
ElasticsearchRepository<DiscussPost,Integer>
ElasticsearchRepository<处理的实体类,实体类中的主键类型>
ElasticsearchRepository:该接口中已经有对es服务层访问的增删改查等方法,我们可以直接调用
*/
@Repository // spring 提供
public interface DiscussPostRepository extends ElasticsearchRepository<DiscussPost,Integer> {
}
2.4 测试
在test包下,新建一个类来测试es
package com.ateam.community;
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)//配置类
public class ElasticsearchTests {
@Resource
private DiscussPostMapper discussPostMapper;
@Autowired
private DiscussPostRepository discussPostRepository;
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@Test
public void testInert(){
discussPostRepository.save(discussPostMapper.selectDiscussPostById(241));
discussPostRepository.save(discussPostMapper.selectDiscussPostById(242));
discussPostRepository.save(discussPostMapper.selectDiscussPostById(243));
}
@Test
public void testInsetList(){
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(101,0,100,0));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(102,0,100,0));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(103,0,100,0));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(111,0,100,0));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(131,0,100,0));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(132,0,100,0));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(133,0,100,0));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(134,0,100,0));
}
@Test
public void testUpdate(){
DiscussPost post = discussPostMapper.selectDiscussPostById(231);
post.setContent("我是新人,使劲灌水!!!!");
discussPostRepository.save(post);
}
@Test
public void testDelete(){
discussPostRepository.deleteById(231);
}
@Test
public void testSearchByRepository(){
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.multiMatchQuery("互联网寒冬","title","content"))
.withSort(SortBuilders.fieldSort("discussType").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
.withPageable(PageRequest.of(0,10))
.withHighlightFields(
new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")
).build();
//elasticsearchTemplate.queryForPage(searchQuery,class,SearchResultMapper)
// 底层获取到了高亮显示的值,但是没有返回
Page<DiscussPost> page = discussPostRepository.search(searchQuery);
System.out.println(page.getTotalElements()); // 总的数目
System.out.println(page.getTotalPages()); // 总的页数
System.out.println(page.getNumber()); // 当前第几页
System.out.println(page.getSize()); // 一页多少条数据
for (DiscussPost post : page) {
System.out.println(post);
}
}
@Test
public void testSearchByTemplate(){
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.multiMatchQuery("互联网寒冬","title","content"))
.withSort(SortBuilders.fieldSort("discussType").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
.withPageable(PageRequest.of(0,10))
.withHighlightFields(
new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")
).build();
Page<DiscussPost> page = elasticsearchTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() {
@Override
public <T> AggregatedPage<T> mapResults(SearchResponse searchResponse, Class<T> aClass, Pageable pageable) {
SearchHits hits = searchResponse.getHits();
if (hits.totalHits <= 0) {
return null;
}
ArrayList<DiscussPost> list = new ArrayList<>();
for (SearchHit hit : hits) {
DiscussPost post = new DiscussPost();
String id = hit.getSourceAsMap().get("id").toString();
post.setId(Integer.valueOf(id));
String userId = hit.getSourceAsMap().get("userId").toString();
post.setUserId(Integer.valueOf(userId));
String title = hit.getSourceAsMap().get("title").toString();
post.setTitle(title);
String content = hit.getSourceAsMap().get("content").toString();
post.setContent(content);
String discussType = hit.getSourceAsMap().get("discussType").toString();
post.setDiscussType(Integer.valueOf(discussType));
String status = hit.getSourceAsMap().get("status").toString();
post.setStatus(Integer.valueOf(status));
String createTime = hit.getSourceAsMap().get("createTime").toString();
post.setCreateTime(new Date(Long.valueOf(createTime)));
String commentCount = hit.getSourceAsMap().get("commentCount").toString();
post.setCommentCount(Integer.valueOf(commentCount));
String score = hit.getSourceAsMap().get("score").toString();
post.setScore(Double.valueOf(score));
// 处理高亮显示的结果
HighlightField titleField = hit.getHighlightFields().get("title");
if (titleField != null) {
// titleField.getFragments() 返回的是一个数组
post.setTitle(titleField.getFragments()[0].toString());
}
HighlightField contentField = hit.getHighlightFields().get("content");
if (contentField != null) {
// titleField.getFragments() 返回的是一个数组
post.setContent(contentField.getFragments()[0].toString());
}
list.add(post);
}
return new AggregatedPageImpl(list, pageable, hits.getTotalHits(),searchResponse.getAggregations(),
searchResponse.getScrollId(),hits.getMaxScore());
}
});
System.out.println(page.getTotalElements()); // 总的数目
System.out.println(page.getTotalPages()); // 总的页数
System.out.println(page.getNumber()); // 当前第几页
System.out.println(page.getSize()); // 一页多少条数据
for (DiscussPost post : page) {
System.out.println(post);
}
}
}
3. 开发社区搜索功能
- 搜索服务
- 将帖子保存至Elasticsearch服务器
- 从Elasticsearch服务器删除帖子
- 从Elasticsearch服务器搜索帖子
- 发布事件
- 发布帖子时,将帖子异步的提交到Elasticsearch服务器
- 增加评论时,将帖子异步的提交到Elasticsearch服务器
- 在消费组件中增加一个方法,消费帖子发布事件
- 显示结果
- 在控制器中处理搜索请求,在HTML上显示搜索结果
- 在控制器中处理搜索请求,在HTML上显示搜索结果
3.1 搜索服务
在service包下,新建一个类ElasticsearchService
@Service
public class ElasticsearchService {
@Autowired
private DiscussPostRepository discussPostRepository;
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
// 添加、修改
public void saveDiscussPost(DiscussPost post) {
discussPostRepository.save(post);
}
// 删除
public void deleteDiscussPost(int id) {
discussPostRepository.deleteById(id);
}
// 查询
public Page<DiscussPost> searchDiscussPost(String keyword, int current, int limit) {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.multiMatchQuery(keyword,"title","content"))
.withSort(SortBuilders.fieldSort("discussType").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
.withPageable(PageRequest.of(current,limit))
.withHighlightFields(
new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")
).build();
return elasticsearchTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() {
@Override
public <T> AggregatedPage<T> mapResults(SearchResponse searchResponse, Class<T> aClass, Pageable pageable) {
SearchHits hits = searchResponse.getHits();
if (hits.totalHits <= 0) {
return null;
}
ArrayList<DiscussPost> list = new ArrayList<>();
for (SearchHit hit : hits) {
DiscussPost post = new DiscussPost();
String id = hit.getSourceAsMap().get("id").toString();
post.setId(Integer.valueOf(id));
String userId = hit.getSourceAsMap().get("userId").toString();
post.setUserId(Integer.valueOf(userId));
String title = hit.getSourceAsMap().get("title").toString();
post.setTitle(title);
String content = hit.getSourceAsMap().get("content").toString();
post.setContent(content);
String discussType = hit.getSourceAsMap().get("discussType").toString();
post.setDiscussType(Integer.valueOf(discussType));
String status = hit.getSourceAsMap().get("status").toString();
post.setStatus(Integer.valueOf(status));
String createTime = hit.getSourceAsMap().get("createTime").toString();
post.setCreateTime(new Date(Long.valueOf(createTime)));
String commentCount = hit.getSourceAsMap().get("commentCount").toString();
post.setCommentCount(Integer.valueOf(commentCount));
String score = hit.getSourceAsMap().get("score").toString();
post.setScore(Double.valueOf(score));
// 处理高亮显示的结果
HighlightField titleField = hit.getHighlightFields().get("title");
if (titleField != null) {
// titleField.getFragments() 返回的是一个数组
post.setTitle(titleField.getFragments()[0].toString());
}
HighlightField contentField = hit.getHighlightFields().get("content");
if (contentField != null) {
// titleField.getFragments() 返回的是一个数组
post.setContent(contentField.getFragments()[0].toString());
}
list.add(post);
}
return new AggregatedPageImpl(list, pageable, hits.getTotalHits(),searchResponse.getAggregations(),
searchResponse.getScrollId(),hits.getMaxScore());
}
});
}
}
3.2 发布事件
3.2.1 发布帖子时,将帖子异步的提交到Elasticsearch服务器
修改DiscussPostController类中的addDiscussPost方法
3.2.2 增加评论时,将帖子异步的提交到Elasticsearch服务器
修改CommentController类中的addComment方法
3.2.3 在消费组件中增加一个方法,消费帖子发布事件
在event包下,EventConsumer类中新增一个消费帖子发布事件的方法
// 消费发帖事件
@KafkaListener(topics = {TOPIC_PUBLISH})
public void handlePublishMessage(ConsumerRecord record) {
if (record == null || record.value() == null) {
logger.error("消息的内容为空!");
return;
}
// 利用fastjson将json字符串转化为Event对象
Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null) {
logger.error("消息格式错误!");
return;
}
// 查询这个帖子
DiscussPost post = discussPostService.findDiscussPostById(event.getEntityId());
// 将这个帖子存储到es服务器中
elasticsearchService.saveDiscussPost(post);
}
3.3 显示结果
- 在controller包下新建一个searchController类
package com.ateam.community.controller;
@Controller
public class SearchController implements CommunityConstant {
@Autowired
private ElasticsearchService elasticsearchService;
@Autowired
private UserService userService;
@Autowired
private LikeService likeService;
// search?keyword=xxx
@RequestMapping(value = "/search", method = RequestMethod.GET)
public String search(String keyword, Page page, Model model){
// 搜索帖子
org.springframework.data.domain.Page<DiscussPost> searchResult =
elasticsearchService.searchDiscussPost(keyword,page.getCurrent() - 1, page.getLimit());
// 聚合数据
List<Map<String, Object>> discussPosts = new ArrayList<>();
if (searchResult != null) {
for (DiscussPost post : searchResult) {
HashMap<String, Object> map = new HashMap<>();
// 帖子
map.put("post",post);
// 作者信息
map.put("user",userService.findUserById(post.getUserId()));
// 点赞数量
map.put("likeCount",likeService.findEntityLikeCount(ENTITY_TYPE_POST,post.getId()));
discussPosts.add(map);
}
}
model.addAttribute("discussPosts",discussPosts);
model.addAttribute("keyword",keyword);
// 设置分页信息
page.setPath("/search?keyword=" + keyword);
page.setRows(searchResult == null ? 0 : (int) searchResult.getTotalElements());
return "/site/search";
}
}
- 修改index.html页面中搜索框
- 处理search.html页面