input {
jdbc {
#jdbc驱动包位置
jdbc_driver_library => "D:\tools\elk\logstash-7.6.1\ojdbc8-12.2.0.1.jar"
#jdbc驱动类
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
# 数据库相关配置
jdbc_connection_string => "jdbc:oracle:thin:callcard/huawei123@//callcardsitoracle01.beta.hic.cloud:1521/callcardsit_srv"
jdbc_user => "callcard"
jdbc_password => "huawei123"
# 是否清除sql_last_value的记录,需要增量同步时此字段必须为false;
clean_run => true
# 同步频率(分 时 天 月 年),默认每分钟同步一次;
schedule => "*/10 * * * * *"
use_column_value => true
tracking_column_type => timestamp
tracking_column => updated_at
# 同步SQL
statement =>"select *
from user n
where updated_at>:sql_last_value and updated_at<sysdate order by updated_at desc"
# 索引类型,不需要指定type,否则会在同步ES后生成type字段
#type => "user"
# 设置时区
#jdbc_default_timezone =>"Asia/Shanghai"
}
}
filter {
# 删除无用字段
mutate {
remove_field => "@timestamp"
remove_field => "@version"
}
# 时间+8个时区,思想是找临时变量,最后+8后替换
ruby {
code => "event.set(‘@created_at‘, event.get(‘created_at‘).time.localtime + 8*60*60)"
}
ruby {
code => "event.set(‘created_at‘,event.get(‘@created_at‘))"
}
mutate {
remove_field => ["@created_at"]
}
}
output {
stdout {
codec => rubydebug
}
#if [type]=="user" {
elasticsearch {
# ES host:port
hosts => ["127.0.0.1:9200"]
#将mysql数据加入blog索引下,会自动创建
index => "user"
# 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号_id
document_id => "%{id}"
}
# }
}
Logstash Oracle同步设置