easy-es使用以及Es和MySQL同步

Easy-Es使用

介绍

官方地址Easy-Es,它主要就是简化了ES相关的API, 使用起来像MP一样舒服

SpringBoot接入Easy-Es

相关依赖

已进入Es和Easy-Es依赖

<properties>
    <java.version>11</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <spring-boot.version>2.6.13</spring-boot.version>
    <es.vsersion>7.12.0</es.vsersion>
    <easy_es.vsersion>2.0.0</easy_es.vsersion>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <!-- es依赖 -->
    <!-- 排除springboot中内置的es依赖,以防和easy-es中的依赖冲突-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>${es.vsersion}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>${es.vsersion}</version>
    </dependency>

    <!-- easy-es -->
    <dependency>
        <groupId>org.dromara.easy-es</groupId>
        <artifactId>easy-es-boot-starter</artifactId>
        <version>${easy_es.vsersion}</version>
    </dependency>

    <!-- hutool -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.8.32</version>
    </dependency>
</dependencies>

SQL初始化

-- 创建whitebrocade数据库
DROP DATABASE IF EXISTS whitebrocade;
CREATE DATABASE whitebrocade;
USER whitebrocade;
-- 创建student表
CREATE TABLE `student` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `description` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4

-- 插入数据
INSERT INTO `whitebrocade`.`student`(`id`, `name`, `description`) VALUES (1, '小牛马', '我是小牛马');
INSERT INTO `whitebrocade`.`student`(`id`, `name`, `description`) VALUES (2, '中牛马', '我是中牛马');

application.yaml文件

你只需要更改address更改成你自己的ES地址和端口即可, 如果有设置账号密码的, 那么可以见username和password注释打开,进行填写

# 应用服务 WEB 访问端口
server:
  port: 9999

easy-es:
  # 动态数据源配置
  dynamic:
    datasource:
      # 默认数据源名称
      master:
        #填你的es连接地址
        address: localhost:9200
  enable: true # 默认为true,若为false时,则认为不启用本框架
  # 如果是https, 那就改成https
  schema: http
  # address: localhost:9200
  # username: 有设置才填写,非必须
  # password: 有设置才填写,非必须
  # 不打印banner
  banner: false
  keep-alive-millis: 30000 # 心跳策略时间 单位:ms
  connect-timeout: 5000 # 连接超时时间 单位:ms
  socket-timeout: 600000 # 通信超时时间 单位:ms
  connection-request-timeout: 5000 # 连接请求超时时间 单位:ms
  max-conn-total: 100 # 最大连接数 单位:个
  max-conn-per-route: 100 # 最大连接路由数 单位:个
  global-config:
    # 是否开启小黑子模式,默认关闭, 开启后日志将更有趣,提升编码乐趣,仅供娱乐,切勿用于其它任何用途
    i-kun-mode: true
    # 开启控制台打印通过本框架生成的DSL语句,默认为开启,测试稳定后的生产环境建议关闭,以提升少量性能
    print-dsl: true
    # 当前项目是否分布式项目,默认为true,在非手动托管索引模式下,若为分布式项目则会获取分布式锁,非分布式项目只需synchronized锁.
    distributed: false
    # 重建索引超时时间 单位小时,默认72H 可根据ES中存储的数据量调整
    reindexTimeOutHours: 72
    # 异步处理索引是否阻塞主线程 默认阻塞 数据量过大时调整为非阻塞异步进行 项目启动更快
    async-process-index-blocking: true
    # 分布式环境下,平滑模式,当前客户端激活最新索引最大重试次数,若数据量过大,重建索引数据迁移时间超过4320/60=72H,可调大此参数值,此参数值决定最大重试次数,超出此次数后仍未成功,则终止重试并记录异常日志
    active-release-index-max-retry: 4320
    # 分布式环境下,平滑模式,当前客户端激活最新索引最大重试次数 分布式环境下,平滑模式,当前客户端激活最新索引重试时间间隔 若您期望最终一致性的时效性更高,可调小此值,但会牺牲一些性能
    active-release-index-fixed-delay: 60
    db-config:
      # 是否开启下划线转驼峰 默认为false
      map-underscore-to-camel-case: true
      # 索引前缀,可用于区分环境  默认为空 用法和MP的tablePrefix一样的作用和用法
      # index-prefix:
      # id生成策略 none由ES自动生成,是默认的配置,无需您额外配置 推荐
      id-type: none
      # 字段更新策略 默认为not_null
      field-strategy: not_empty
      # 默认开启,开启后查询所有匹配数据,若不开启,会导致无法获取数据总条数,其它功能不受影响,若查询数量突破1W条时,需要同步调整@IndexName注解中的maxResultWindow也大于1w,并重建索引后方可在后续查询中生效(不推荐,建议分页查询).
      enable-track-total-hits: true
      # 数据刷新策略,默认为不刷新,若对数据时效性要求比较高,可以调整为immediate,但性能损耗高,
      # 也可以调整为折中的wait_until, 等待请求提交数据后,等待数据完成刷新(1s),再结束请求 性能损耗适中
      refresh-policy: wait_until
      # 批量更新接口的阈值 默认值为1万,突破此值需要同步调整enable-track-total-hits=true,@IndexName.maxResultWindow > 1w,并重建索引
      batch-update-threshold: 10000
      # 是否智能为字段添加.keyword后缀 默认开启,开启后会根据当前字段的索引类型及当前查询类型自动推断本次查询是否需要拼接.keyword后缀
      # 是否自动拼接.keyword后缀是基于自动推断的,如果你当前实体类的字段类型是String,并且其字段类型未指定或指定为keyword_text双类型时,才会有自动拼接后缀,并且是否拼接取决于查询本身,如果是match查询时不会拼接后缀的,拼接会违背初衷
      smartAddKeywordSuffix: true

启动类

这里需要关注的是EsMapperScan注解, 发现是和MP的注解很像的, 如果项目中也引入的MP, 那么需要注意mapper的包下要进行区分,

mapper

– ee 这里存放easy-es的mapper

– mp 这里存放mp的mapper

如果不这么做会有问题的, 详细可以看避坑指南 | Easy-Es](https://www.easy-es.cn/pages/4c01d7/#项目中同时使用mybatis-plus和easy-es), 这里详细说明如果项目中同时引入Easy-Es和MP的如何应对

/**
 * @author whiteBrocade
 * @description: 启动类
 */
// 这里替换成你项目的ES所在的Mapper
@EsMapperScan("com.whitebrocade.easy_es.mapper.ee")
@SpringBootApplication
public class EasyEsApplication {

    public static void main(String[] args) {
        SpringApplication.run(EasyEsApplication.class, args);
    }

}

model模型

domain
ES相关domain
/**
 * @author whiteBrocade
 * @description: Es索引模型, 所有的Es索引都要继承这个基类
 */
@Data
public class BaseEsEntity {
    /**
     * es中的唯一id, 此时id值将由es自动生成
     */
    @IndexId(type= IdType.NONE)
    private String id;
}
/**
 * @author whiteBrocade
 * @description: 学生ES模型
 */
@Data
@EqualsAndHashCode(callSuper = true)
// @IndexName("studentesentity")
// 当您想直接把类名当作索引名,且并不需要对索引进行其它配置时,可省略此注解, 索引名规则如下,必须全部小写
// 如果类名为StudentEsEntity这种, 那么就是studentesentity
public class StudentEsEntity extends BaseEsEntity {

    /**
     * 对应mysql中主键Id
     */
    private Long mysqlId;

    /**
     * 学生姓名
     */
    @HighLight // 高亮注解
    @IndexField(analyzer = Analyzer.IK_SMART)
    private String name;

    /**
     * 描述
     */
    @HighLight // 高亮注解
    // 高亮只对text类型字段有效,高亮是对分词的高亮,keyword类型不会有高亮的,这是es的规则,非框架
    // 是否自动拼接.keyword后缀是基于自动推断的,如果你当前实体类的字段类型是String,并且其字段类型未指定或指定为keyword_text双类型时,才会有自动拼接后缀,并且是否拼接取决于查询本身,如果是match查询时不会拼接后缀的,拼接会违背初衷
    // 见easy-es的issues: https://gitee.com/dromara/easy-es/issues/I73IXA
    // https://gitee.com/dromara/easy-es/issues/I5J86T
    @IndexField(analyzer = Analyzer.IK_SMART)
    private String description;
}
mysql相关domain
/**
 * @author whiteBrocade
 * @version 1.0
 * @description 学生类
 */
@Data
public class Student {
    /**
     * id
     */
    private Long id;

    /**
     * 姓名
     */
    private String name;

    /**
     * 描述
     */
    private String description;
}

DTO
Es相关DTO
/**
 * @author whiteBrocade
 * @description: 学生Es DTO
 */
@Data
public class StudentEsDTO {
    /**
     * 姓名
     */
    private String name;

    /**
     * 描述
     */
    private String description;
}
Query
Es相关Query
/**
 * @author whiteBrocade
 * @description: 学生Es Query
 */
@Data
public class StudentEsQuery {
    /**
     * 学生ID, 这里用String类型代替Long类型, 防止精度丢失问题
     */
    private String id;

    /**
     * 姓名
     */
    private String name;

    /**
     * 描述
     */
    private String description;
}
VO
Es相关VO
/**
 * @author whiteBrocade
 * @description: 学生Es VO模型
 */
@Data
public class StudentEsVO {
    /**
     * 学生Id
     */
    private Long id;

    /**
     * 学生姓名
     */
    private String name;

    /**
     * 描述
     */
    private String description;
}

StudentEsMapper

这里和MP的很像, 不同之处就是多个一个Es开头

/**
 * @author whiteBrocade
 * @description: 学生ES EsMapper
 */
@Component
public interface StudentEsMapper extends BaseEsMapper<StudentEsEntity> {
}

Controller

/**
 * @author whiteBrocade
 * @description: TestUseEeController
 */
@Slf4j
@RestController
@RequestMapping("/es")
@RequiredArgsConstructor
public class TestUseEeController {
    private final StudentEsMapper studentEsMapper;

    /**
     * 创建索引(相当于mysql中的表)
     */
    @PostMapping("/createIndex")
    public Boolean createIndex() {
        Class<StudentEsEntity> studentClass = studentEsMapper.getEntityClass();
        // 类名小写作为索引名称
        String indexName = studentClass.getSimpleName().toLowerCase();
        Boolean existsIndex = studentEsMapper.existsIndex(indexName);
        Boolean createIndex = null;
        if (! existsIndex) {
            log.info("{}索引不存在, 准备创建索引", indexName);
            createIndex = studentEsMapper.createIndex();
            log.info("是否成功创建{}索引: {}", indexName, createIndex);
        } else {
            throw new RuntimeException(StrUtil.format("索引已经存在: {}", indexName));
        }
        return createIndex;
    }

    /**
     * 插入数据
     */
    @PostMapping("/insert")
    public Integer insert(@RequestBody StudentEsDTO dto) {
        // 2.初始化-> 新增数据
        StudentEsEntity studentEsEntity = new StudentEsEntity();
        BeanUtil.copyProperties(dto, studentEsEntity);
        // 雪花ID
        studentEsEntity.setMysqlId(IdUtil.getSnowflakeNextId());

        return studentEsMapper.insert(studentEsEntity);
    }

    /**
     * 搜索数据
     */
    @GetMapping("/search")
    public List<StudentEsVO> search(@RequestBody StudentEsQuery query) {
        // ES条件查询
        List<StudentEsEntity> esEntityList = EsWrappers.lambdaChainQuery(studentEsMapper)
                // 注意, 用的是mysqlId
                .eq(StrUtil.isNotBlank(query.getId()), StudentEsEntity::getMysqlId, Long.parseLong(query.getId()))
                .like(StrUtil.isNotBlank(query.getName()), StudentEsEntity::getName, query.getName())
                .like(StrUtil.isNotBlank(query.getDescription()), StudentEsEntity::getDescription, query.getDescription())
                // 根据score排序, score高的在前面
                .sortByScore()
                .list();

        List<StudentEsVO> esVOList = new ArrayList<>(esEntityList.size());
        // 为空直接返回
        if (CollUtil.isEmpty(esEntityList)) {
            return esVOList;
        }

        // 进行转换
        for (StudentEsEntity esEntity : esEntityList) {
            StudentEsVO esVO = new StudentEsVO();
            // 这里的ID跳过, 因为类型不兼容
            BeanUtil.copyProperties(esEntity, esVO, "id");
            esVO.setId(esEntity.getMysqlId());
            esVOList.add(esVO);
        }

        return esVOList;
    }
}

ES和MySQL同步

分为两种

  • Flink-CDC监听MySQL直接写入ES
  • Flink-CDC监听MySQL写入ActiveMQ, MQ写入到ES

Flink-CDC内容详细见博主另外一个篇文章SpringBoot集成Flink CDC实现binlog监听

直接写入

相关依赖

在上一个依赖上引入Flink-CDC

<properties>
    <flink.version>1.19.0</flink.version>
</properties>
<!-- Flink CDC依赖 start-->
<!-- Flink核心依赖, 提供了Flink的核心API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<!--  Flink流处理Java API依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- Flink客户端工具依赖, 包含命令行界面和实用函数 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- Flink连接器基础包, 包含连接器公共功能 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- Flink Kafka连接器, 用于和Apache Kafka集成, 这里不需要集成, 所以注释掉, 代码可以使用其它的MQ代替 -->
<!--<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>3.2.0-1.19</version>
    </dependency>-->
<!-- Flink Table Planner, 用于Table API和SQL的执行计划生成 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- Flink Table API桥接器, 连接DataStream API和Table API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- Flink JSON格式化数据依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- 开启Web UI支持, 端口为8081, 默认为不开启-->
<!--<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web</artifactId>
    <version>1.19.1</version>
    </dependency>-->

<!-- MySQL CDC依赖
    org.apache.flink的适用MySQL 8.0

    具体参照这篇博客 https://blog.****.net/kakaweb/article/details/129441408
-->
上一篇:富格林:安全操作方式稳健出金


下一篇:Linux第一个小程序-进度条