input { jdbc {
#数据库地址,用户,密码
jdbc_connection_string => "jdbc:mysql://ip:3306/test"
jdbc_user => "root"
jdbc_password => "root"
#驱动位置配置
#此处的路径最好是绝对路径,行对路径取决与允许命令的目录
jdbc_driver_library => "sync-conf/mysql-connector-java-6.0.6.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
#开启分页查询
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#决定增量同步的关键
#是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run=> false
#是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要表的字段名称
,使用其它字段追踪,而不是用时间,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
use_column_value=> true
#如果 use_column_value 为真,需配置此参数. 这个参数就是数据库给出的一个字段名称。当然该字段必须是递增的,可以是 数据库的数据时间这类的
tracking_column=> create_time
#只有两种选择 numeric,timestamp
tracking_column_type=>"timestamp"
#是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run=> true
#在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :sql_last_value 取得就是该文件中的值
last_run_metadata_path=> "/etc/logstash/run_metadata.d/my_info"
#是否将字段名称转小写。
#这里有个小的提示,如果你这前就处理过一次数据,并且在Kibana中有对应的搜索需求的话,还是改为true,
#因为默认是true,并且Kibana是大小写区分的。准确的说应该是ES大小写区分
lowercase_column_names=> false
#你的SQL的位置,当然,你的SQL也可以直接写在这里。
#statement=> SELECT * FROM tabeName t WHERE t.creat_time > :sql_last_value order by creat_time asc
statement_filepath=> "/etc/logstash/statement_file.d/my_info.sql" #sql 文件执行路径,最好绝对路径。
#注意:外载的SQL文件就是一个文本文件就可以了,还有需要注意的是,一个jdbc{}插件就只能处理一个SQL语句,
#如果你有多个SQL需要处理的话,只能在重新建立一个jdbc{}插件。 }
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
#用于区分输入的数据源,可以在output中区分输出到哪里
type => "user"
}
}
output {
#设置窗口日志输出
stdout {
codec => json_lines
}
if[type]=="user"{
elasticsearch {
hosts=> ["127.0.0.1:9200"]
#注意index的值不支持大写字母
index=> "user"
#document_type自行设置,不设置时,默认为doc
#document_type=> ""
#此处的值来自查询sql中的列名称,根据需要自行配置
document_id=> "%{id}"
}
}
}