在数据全文检索需求越来越大的今天,很多公司都在关系数据库数据的基础上,加上了Elastic Search,来进行数据快速全文检索,所以ElasticSearch与关系型数据库数据进行数据同步就变的尤为重要。本文主要介绍PostgreSQL数据库通过Logstash-JDBC插件与ElasticSearch进行数据的近实时同步。
系统配置
- ElasticSearch: 7.6.2
- PostgreSQL: 10+190ubuntu
- kibana: 7.4.0
- Logstash: 7.6.2
posgresql 数据库结构
此数据结构是临时想的,在正常的项目中是不会存在这种结构的,大家见谅。一张学生表,一张老师表,通过课程进行关联。(这种关联关系显示生活中不会这样设计的,但是能说明问题就行一切从简)
create table student(
id bigint primary key NOT NULL,
name varchar(5) not null,
sex char(1) default ‘男‘ ,
age int check(age>1),
courses varchar(20),
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
create table teacher(
id bigint primary key NOT NULL ,
name varchar(5) not null ,
sex char(1) default ‘男‘,
courses varchar(20) default ‘语文‘,
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
初始化数据
()
logstash docker-compose配置
其中logstash.conf 这个文件是logstash指定的配置文件,在下面会有所展示
version: ‘3.7‘
services:
logstash_test:
image:
logstash:7.6.2
container_name:
logstash_test
volumes:
- ‘./services/logstash_test/data/:/usr/data/‘
- ‘./services/logstash_test/config/logstash.yml:/usr/share/logstash/config/logstash.yml‘
- ‘./services/logstash_test/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml‘
- ‘./services/logstash_test/config/logstash_test.conf:/usr/share/logstash/pipeline/logstash.conf‘
pipelines.yml 指定路径要包含logstash.conf文件
- pipeline.id: student_test_xhh
path.config: "/usr/share/logstash/pipeline/*.conf"
pipeline.batch.size: 500
pipeline.batch.delay: 200
pipeline.workers: 1
logstash.yml
config:
reload:
automatic: true
interval: 3s
logstash配置文件
使用jdbc进行同步,
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://host:port/db_name"
jdbc_driver_library => "/usr/data/postgresql-42.2.14.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_user => "db_user"
jdbc_password => "db_password"
jdbc_paging_enabled => true
jdbc_page_size => 50
statement_filepath => "/usr/data/jdbc_test.sql"
record_last_run => true
clean_run => true
tracking_column_type => "timestamp"
tracking_column => "unix_ts_in_secs"
use_column_value => true
last_run_metadata_path => "/usr/share/logstash/config/student"
schedule => "2 * * * * *"
}
}
filter {
json {
source => "teacher"
target => "teacher"
}
mutate {
add_field => {"[@metadata][doc_id]" => "%{id}"}
}
mutate {
remove_field => ["[teacher][id]", "[id]"]
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => "http://elasticserver:9200"
user => "elasticuser"
password => "elasticpassword"
document_id => "%{[@metadata][doc_id]}"
index => "student_test_xhh"
}
}
着重几个参数介绍:
- jdbc_connection_string: 连接数据库url, 此字段是说明需要连接的关系型数据库。
- jdbc_driver_library :此字段是jdbc连接关系型数据库的驱动文件,本文中logstash是在docker中运行的,所以填写的是docker中这个驱动的文件位置。 postgresql jdbc驱动下载 其他关系型数据库下载可以在网上搜一下
- jdbc_driver_class: 驱动程序类,与上面驱动保持致
- jdbc_user:连接该数据库的用户名
- jdbc_password:连接该数据库的密码
- jdbc_paging_enabled:是否启用分页,启用的话会自动在执行的sql语句上进行加上分页
- jdbc_page_size:分页条数
- statement_filepath: sql命令的文件地址,比较复杂的sql语句可以写在一个sql文件中,这个参数指定到这个文件的路径就可以进行执行。
- tracking_column:此字段指定 unix_ts_in_secs,用于标记 Logstash 从数据库读取的最后一个文档,存储在last_run_metadata_path指定的文件下,该值将会用于确定 Logstash 在其轮询循环的下一次迭代中所请求文档的起始值。这样就可以达到增量更新的效果,降低负载,提高效率
- tracking_column_type: 标注上面unix_ts_in_secs字段的类型,这里使用的是timestamp类型,还支持numeric类型
- use_column_value: 使用该列的值
- last_run_metadata_path:此字段指定 unix_ts_in_secs,用于标记 Logstash 从数据库读取的最后一个文档,存储在last_run_metadata_path指定的文件下,该值将会用于确定 Logstash 在其轮询循环的下一次迭代中所请求文档的起始值。
- schedule:其会使用 cron 语法来指定 Logstash 应当以什么频率对 RDS 进行轮询以查找变更。这里所指定的 "2 " 会告诉 Logstash 每 两分钟联系一次表。
- sql_last_value:这是一个内置参数,包括 Logstash 轮询循环中当前迭代的起始点,上面 JDBC 输入配置中的 SELECT 语句便会引用这一参数。该字段会设置为 “unix_ts_in_secs”的最新值。在 Logstash 轮询循环内所执行的查询中,其会用作所返回文档的起点。通过在查询中加入这一变量,能够确保不会将之前传播到 Elasticsearch 的插入或更新内容重新发送到 Elasticsearch。该参数在sql语句中使用
jdbc_test.sql 脚本
select s.id,
s.name,
s.sex,
s.age,
s.courses,
s.create_time,
s.update_time,
cast(jsonb_build_object(
‘id‘, t.id,
‘name‘, t.name,
‘sex‘, t.sex,
‘courses‘, t.courses
) as varchar ) as teacher,
s.update_time AS unix_ts_in_secs
from student s left join teacher t on s.courses = t.courses
where (s.update_time > cast(:sql_last_value as timestamp ) and s.update_time < NOW())
group by s.id, t.id
order by s.update_time
更新时间 记录到unix_ts_in_secs中
sql_last_value: logstash内置参数,若第一次运行unix_ts_in_secs中没有记录,则sql_last_value会从时间戳为0的时间算起。
执行完成es中数据展示
在elk中执行
GET /student_test_xhh/_search
会显示下面数据
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "student_test_xhh",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"create_time" : "2020-08-06T11:42:59.454Z",
"update_time" : "2020-08-06T11:42:59.454Z",
"teacher" : {
"sex" : "男",
"name" : "张老师",
"courses" : "语文"
},
"sex" : "男",
"unix_ts_in_secs" : "2020-08-06T11:42:59.454Z",
"courses" : "语文",
"name" : "李青",
"age" : 12,
"@timestamp" : "2020-08-06T12:10:02.849Z",
"@version" : "1"
}
},
{
"_index" : "student_test_xhh",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"create_time" : "2020-08-06T11:42:59.454Z",
"update_time" : "2020-08-06T11:42:59.454Z",
"teacher" : {
"sex" : "女",
"name" : "王老师",
"courses" : "数学"
},
"sex" : "女",
"unix_ts_in_secs" : "2020-08-06T11:42:59.454Z",
"courses" : "数学",
"name" : "艾欧尼亚",
"age" : 11,
"@timestamp" : "2020-08-06T12:10:02.857Z",
"@version" : "1"
}
},
{
"_index" : "student_test_xhh",
"_type" : "_doc",
"_id" : "3",
"_score" : 1.0,
"_source" : {
"create_time" : "2020-08-06T11:42:59.454Z",
"update_time" : "2020-08-06T11:42:59.454Z",
"teacher" : {
"sex" : "女",
"name" : "李老师",
"courses" : "历史"
},
"sex" : "男",
"unix_ts_in_secs" : "2020-08-06T11:42:59.454Z",
"courses" : "历史",
"name" : "巨龙之巢",
"age" : 10,
"@timestamp" : "2020-08-06T12:10:02.860Z",
"@version" : "1"
}
},
{
"_index" : "student_test_xhh",
"_type" : "_doc",
"_id" : "4",
"_score" : 1.0,
"_source" : {
"create_time" : "2020-08-06T11:42:59.454Z",
"update_time" : "2020-08-06T11:42:59.454Z",
"teacher" : {
"sex" : null,
"name" : null,
"courses" : null
},
"sex" : "男",
"unix_ts_in_secs" : "2020-08-06T11:42:59.454Z",
"courses" : "英语",
"name" : "暗影岛",
"age" : 9,
"@timestamp" : "2020-08-06T12:10:02.862Z",
"@version" : "1"
}
}
]
}
}
数据同步成功!
-
原文作者:xaohuihui
- 参考文献:
https://developer.aliyun.com/article/762059