Elasticsearch安装DSL使用以及Canal&Logstash数据同步

Elasticsearch

ES的版本7.8.0

什么是Elasticsearch?

Elasticsearch 是一个分布式、可扩展、近乎实时搜索与数据分析的引擎。它和Solr一样是基于lucene进行封装的搜索引擎,能够进行近乎实时全文检索和结构化数据的实时统计。下面主要围绕全文检索这个功能来展开。

Elasticsearch安装

要想体验下Elasticsearch的功能则需要先在服务器上安装好软件,这里就简单说说安装的步骤,首先需要在服务器上面安装好JDK8以上的版本

  • 1、下载并解压
# 下载
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.8.0-linux-x86_64.tar.gz
# 解压tar包
tar -xzvf elasticsearch-7.8.0-linux-x86_64.tar.gz
  • 2、linux环境配置,增加文件句柄数
# 编辑文件
vim /etc/security/limits.conf

# 新增下面的参数,如果有的话需要修改,然后保存退出
* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096
  • 3、增加虚拟线程的数量
vim /etc/sysctl.conf
#新增
vm.max_map_count = 262144
# 保存文件推出并执行下面的命令加载参数
sysctl -p
  • 4、进入elasticsearch解压目录,然后打开config/elasticsearch.yml文件,配置下面的信息,并保存推出
# 集群名称,后续会用到这里先配置
cluster.name: es-frank
# 节点名称
node.name: es-node1
# 数据文件 index 的目录
path.data: /usr/local/app/elasticsearch-7.9.0/data
# 运行日志的目录
path.logs: /usr/local/app/elasticsearch-7.9.0/logs
# 绑定的ip,0.0.0.0监听所有的ip
network.host: 0.0.0.0
# http的端口
http.port: 9200
# 集群初始master节点,跟上面的 node.name保持一致即可
cluster.initial_master_nodes: ["es-node1"]
  • 5、配置jvm.opitions,修改启动时候内存配置,避免参数过大启动报错
  • 6、增加启动用户 esuser,因为ES不能用root用户启动,
# 创建用户
useradd esuser
# 在es安装目录,给所有的目录授予esuser用户的读写执行 rwx 的权限
chown -R esuser *
  • 7、(这一步可以在后面需要配置中文分词的时候再来配置)配置开源IK分词器,因为默认情况ES对英文是按照单词来分词,中文是按照一个字符来分词,而且不能定制自己的词汇。具体的配置可以参考IK中文分词,需要注意要按照本地的ES的版本来按照分词器
  • 8、切换到esuser用户 su esuser,cd到ES的安装目录,第一次执行./bin/elasticsearch观察控制台的日志看是否有报错,后续启动可以使用./bin/elasticsearch -d让进程在后台运行。启动完成后在浏览器访问应用http://ip:9200/其中ip是服务器的IP,根据自己的IP配置即可,如果能够返回类似下面的信息则说明启动成功
{
  "name" : "es-node2",
  "cluster_name" : "frank-es-cluster",
  "cluster_uuid" : "Vkb9d3KIRVKYny7y3mHuAw",
  "version" : {
    "number" : "7.8.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "757314695644ea9a1dc2fecd26d1a43856725e65",
    "build_date" : "2020-06-14T19:35:50.234439Z",
    "build_snapshot" : false,
    "lucene_version" : "8.5.1",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}
  • 关闭服务可以通过jps获取进程号,然后kill
  • 至此万里长征取得了一步小小的胜利,接下来就来体验下ES的检索的功能吧!!

Elasticsearch API使用

本着好读书不求甚解的原则,这里先来探索下API的使用,后面再来大致下解析下索引的结构和原理,这里的一些例子只是点到为止,如果大家要深入学习需要结合官网和平常在实践过程中的不断积累来对各个API有更加深入的理解,废话不多说操起Postman开拔。

  • 创建索引同时创建映射Mapping(我这里安装了IK分词器),下面单独当然也可以单独的创建索引和创建映射
// 单独创建的方式
// 单独创建创建索引
PUT http://localhost:9202/product1
// 单独创建创建映射Mapping
PUT http://localhost:9202/product1/_mapping

// 同时创建索引和映射
PUT http://localhost:9202/product
{
    "settings": {
        "index": {
            "number_of_shards": 3,
            "number_of_replicas": 1
        }
    },
    "mappings": {
        "properties": {
            "id": {
                "type": "long"
            },
            "prod_id": {
                "type": "long"
            },
            "prod_name": {
                "type": "text",
                "analyzer": "ik_max_word"
            },
            "price": {
                "type": "float"
            },
            "prod_desc": {
                "type": "text",
                "analyzer": "ik_max_word"
            },
            "stock": {
            	"type": "integer"	
            },
            "create_date": {
            	"type": "date"
            }
        }
    }
}
  • 创建文档,url后面的1是指定document的_id为1,下面展示的创建一条数据,可以多创建几条数据。
POST http://localhost:9202/product/_doc/1
{
	"id":1,
	"prod_id":1001,
	"prod_name":"真我Q2",
	"prod_desc":"realme 真我Q2 4800万像素 120Hz畅速屏 双5G天玑800U 冲浪蓝孩 6GB+128GB 30W闪充 手机 OPPO提供售后支持",
	"price":1238,
	"stock":1000,
	"create_date":"2021-03-10"
}
  • 更新文档,更新指定_id的属性
// 更新文档为5的记录
POST http://localhost:9202/product/_doc/5/_update
{
	"doc":{
		"stock":1701
	}
}
  • 根据id删除文档
// 删除 _id 为5的记录
DELETE http://localhost:9202/product/_doc/5
  • bool查询,关键字must(多个条件同时满足,相当于and),should(多个条件相当于用or连接),must_not(多个条件都不能满足),filter(过滤掉不符合条件的数据多个条件),前面的关键字都是数组查询。通过指定boost来提高排名,使得某个查询结果能够展示在结果的前面(跟竞价排名类似)。mustmulti_match区别:must是每个查询条件必须同时满足,multi_match是多个字段中有个满足即可查询出结果
# must+boost查询示例
POST /product/_search
{
	"query":{
		"bool":{
			"should":[
				{
					"match":{
						"prod_name": {
							"query": "小米Note9",
							"boost": "50"
						}
					}
				},
				{
					"match":{
						"prod_name": {
							"query": "小米9A",
							"boost": "30"
						}
					}
				}
			]
		}
	}
}
  • 分页排序:默认分页不能超过10000条记录,分页过大影响ES的影响性能。可以通过scroll来优化分页,scroll通过生成镜像的方式来支持大数据量的查询处理,可用于导出查询
# 分页+排序+返回特定的字段。查询条件可以自己指定
POST /product/_search
{
	"query":{
		"match_all":{
		}
	},
	"sort":{
		"price":"asc",
		"stock":"asc"
	},
	"from": 0,
	"size": 10,
	"_source":["id", "prod_name", "price"]
}

# scroll查询,第一次查询生成镜像数据,后面根据生成的数据继续查询,注意后面的scroll_id是上次查询你返回的scroll_id,相当于一个游标,每次都要携带

# 第一次查询通过scroll的方式查询,3m表示镜像保存三分钟,body中的size表示每个镜像的大小
POST /product/_search?scroll=3m
{
	"query":{
		"match_all":{}
	},
	"size": 2
}

# 第二次查询通过 scroll 查询,携带上次的scroll_id,这样性能上有很大的提升
POST /_search/scroll
{
	"scroll_id": "FGluY2x1ZGVfY29udGV4dF91dWlkDnF1ZXJ5VGhlbkZldGNoAxRxRTRKSUhnQkEwWldSRjk2b0lxcgAAAAAAAABnFlNMWnlNVzVWVG91NjcyMW5aTlZOUFEUcVU0SklIZ0JBMFpXUkY5Nm9JcXMAAAAAAAAAaBZTTFp5TVc1VlRvdTY3MjFuWk5WTlBRFHFrNEpJSGdCQTBaV1JGOTZvSXF0AAAAAAAAAGkWU0xaeU1XNVZUb3U2NzIxblpOVk5QUQ==",
	"scroll": "3m"
}

  • 字段匹配match_phrase,其中slop是间隔的个数至少间隔4个分词,少于4个查询不到数据
POST http://localhost:9201/product/_search
{
	"query":{
		"match_phrase":{
			"prod_name":{
				"query": "小米9A",
				"slop": 4
			}
		}
	}
}
  • 聚合操作,在ES中做聚合操作和大数据量的分页是要尽量避免的,因为针对ES的分片结构,需要冲每个分片中获取最大的数据量来进行排序然后得出结果,这样需要大量的内存和计算,影响ES的性能,聚合操作尽量在生成Index的时候就写好
# 查询平均数据
POST /product/_search
{
	"aggs":{
		"avg_price": {
			"avg": {
				"field": "price"
			}
		}
	}
}

Elasticsearch索引简析

  • 通过上面简单的使用已经对Index有了一个大致的印象了,那Index到底是什么结果呢?
    • Index相当于数据库的表,一个索引相当于一个表,
    • Type,7.x版本这个已经弱化了,有一个默认的_doc,后面的版本会删掉这个功能
    • Mapping,定义了Index的结果,各个属性的类型,是否是关键字,分词类型,相当于表结构
    • Field,Index中的每一个字段,相当于表的字段
    • Document,一个文档就是一条记录,相当于数据库的一条记录
  • Elasticsearch是基于lucene,它的索引创建是基于倒排的方式,通过针对需要分词的字段进行倒排关联到指定的记录的id,然后找到对应的记录,下面的图大致展示了倒排索引的形成过程,开始三条记录,进行分词索引之后形成了中间的索引信息。
    Elasticsearch安装DSL使用以及Canal&Logstash数据同步

使用Canal数据同步

Canal提供增量数据的订阅和消费,代码开源在github,详细的介绍可以查看github。总共有三个组件canal.deployercanal.adaptercanal.admin

canal.developer监听数据库binlog

  • 开启数据库binlog
#开启 binlog
log-bin=mysql-bin
# 选择行模式
binlog-format=ROW
# 服务器的id,不能和下面的salve_id重复
server_id=1
  • 查看binlog是否开启
# 结果是不是on
show variabes like 'log_bin';
# 看binary log的文件信息
show binary logs;
  • 创建同步用户canal1
# 创建用户canal1
create user canal1 identified by 'Canal1@123';
# 给canal1授权访问
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal1'@'%';
# 刷新权限
flush privileges;
  • 配置canal.developer需要同步的实例属性
# 同步的数据库地址
canal.instance.master.address=127.0.0.1:3306
# 同步的数据库名称密码
canal.instance.dbUsername=canal1
canal.instance.dbPassword=Canal1@123
canal.instance.connectionCharset = UTF-8
# 同步product库的所有表,这里使用了正则表达式
canal.instance.filter.regex=product\\..*
  • 最后启动canal.developer,查看logs目录下面的服务器日志和example目录下的实例日志看是否启动成功
./bin/startup.sh
  • java客户端验证同步是否生效,github上的java客户端,注意代码中的监听是针对所有的库的connector.subscribe(".*\\..*");,可以改成指定的数据库,例如我们指定的数据库connector.subscribe("product\\..*");,运行之后出现下面的日志,说明同步成功
================>; binlog[mysql-bin.000001:9841] , name[product,es_product] , eventType : INSERT
id : 4    update=true
prod_id : 2004    update=true
prod_name : 测试数据    update=true
prod_desc : 测试数据描述    update=true
price : 3500.0    update=true
stock : 1000    update=true
create_date : 2021-03-16 08:21:50    update=true

canal.adapter同步数据到es

  • 由于canal.adapter需要使用zookeeper所以需要先开启zookeeper,可以自行下载zookeeper并启动。需要注意,adapter中配置默认-Xms2g,修改配置在bin目录的启动脚本startup.sh
  • 如果要同步MySQL中的数据到Elasticsearch去,还需要修改canal.adapter中两个地方的配置文件,如下所示,一个是application.yml,另外一个新增product.yml,对应在elasticsearch中创建的product索引

conf/
├── application.yml
├── es7
│ └── product.yml

  • 启动adapter的文件application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 # canal通讯地址
    canal.tcp.zookeeper.hosts: 127.0.0.1:2181
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/product?useUnicode=true&serverTimezone=GMT
      username: root
      password: Root@123
  canalAdapters:
  - instance: example # canal 中实例名称
    groups:
    - groupId: g1
      outerAdapters:
      - name: es7 # es7版本
        key: productKey
        hosts: 81.68.176.190:9201 # canal的ip和端口
        properties:
          mode: rest # or rest
          cluster.name: frank-es-cluster # es集群名称
  • 在es7中增加对应的索引的yml配置文件
dataSourceKey: defaultDS
outerAdapterKey: productKey     # 对应application.yml中es配置的key
destination: example
groupId: g1
esMapping:
  _index: product
  _id: _id
  upsert: true
  sql: "select id as _id,id,prod_id,prod_name,prod_desc,stock,price,create_date from es_product"
  commitBatch: 3000
  • 注意要设置_id映射,把表中的id映射到_id。不然在同步数据的数据的时候会报错
  • 最后启动adapter:./bin/startup.sh,可以在logs下查看日志看是否启动成功,这个时候可以增加或者修改数据的时候会触发同步操作

使用Logstash数据同步

官方推荐使用Logstash来同步数据,可以把MySQL、日志数据同步到Elasticsearch。如果内存不够的,配置的时候注意要修改conf目录下的jvm.options,接下来就是要创建同步的文件了,这里使用Input JDBC的方式来同步MySQL的数据到ES,前提是先创建好product索引

  • 在logstash目录下创建sync目录,并创建logstash-sync.conf文件,内容如下
input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql://localhost:3306/product?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC"
        jdbc_user => "root"
        jdbc_password => "xxx"
        jdbc_driver_library => "/usr/local/app/logstash-7.8.0/sync/mysql-connector-java-8.0.18.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "10000"
        # 脚本执行路径,在sync目录中配置,注意里面的条件updated_at := sql_last_value
        statement_filepath => "/usr/local/app/logstash-7.8.0/sync/product.sql"
        schedule => "* * * * *"
        # 索引类型
        type => "_doc"
        # 开启记录追踪,上次执行的时间,这个记录会保存到下面指定的文件中去
        use_column_value => true
        # 跟踪文件的路径,记录最后一次同步 create_date的时间
        last_run_metadata_path => "/usr/local/app/logstash-7.8.0/sync/trace_time"
        #跟踪的列
        tracking_column => "create_date"
        tracking_column_type => "timestamp"
        #是否清除 last_run_metadata_path 中的内容,不需要清空
        clean_run => false
        lowercase_column_names => false
    }
}
output {
    elasticsearch {
        # 多个需要写多个地址
        hosts => ["127.0.0.1:9200"]
        # 索引名称
        index => "product"
        # 设置document 的id和数据的id一致
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}
  • 由于上面的logstash-sync.conf指定了SQL文件,所以这里需要创建statement_filepathSQL文件,:sql_last_value跟上面的tracetime中的一致
select id,prod_id,prod_name,prod_desc,stock,price,create_date from es_product where create_date > :sql_last_value
  • 最后需要把mysql-connector-java-8.0.18.jar复制到logstash-sync.conf文件中指定的地方
  • 通过./bin/logstash -f sync/logstash-sync.conf启动同步,-f参数是指定同步的配置文件,可以观察日志查看同步的情况

总结

  • 本篇简单讲述了Elasticsearch的安装,特别注意文件句柄的调整和jvm.options文件中内存参数的调整,接下来简述了几个比较重要的API的使用分页排序排名DSL查询等等,接下来有Elasticsearch和SpringBoot结合后API的使用以及集群,欢迎大家继续关注
上一篇:ES_DSL语法练习_


下一篇:python的tuple()元组数据类型的使用方法以及案例