




1. 下载logstash-5.6.1安装包,下载路径:logstash-5.6.1,然后解压之es的同级目录(方便管理);
  2. 配置config目录下的logstash.yml文件,具体配置如下(不全,有需要可执行添加):

# Settings file in YAML
# Settings can be specified either in hierarchical form, e.g.:
#   pipeline:
#     batch:
#       size: 125
#       delay: 5
# Or as flat keys:
#   pipeline.batch.size: 125
#   pipeline.batch.delay: 5
# ------------  Node identity ------------
# Use a descriptive name for the node:
# test
# If omitted the node name will default to the machine's host name
# ------------ Data path ------------------
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#设置UUID文件存放路径 /data/es/logstash-5.6.1
# ------------ Pipeline Settings --------------
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
# This defaults to the number of the host's CPU cores.
pipeline.workers: 10
# How many workers should be used per output plugin instance
pipeline.output.workers: 10
# How many events to retrieve from inputs before sending to filters+workers
pipeline.batch.size: 3000
# How long to wait before dispatching an undersized batch to filters+workers
# Value is in milliseconds.
pipeline.batch.delay: 100
stash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
# WARNING: enabling this can lead to data loss during shutdown
# pipeline.unsafe_shutdown: false
# ------------ Pipeline Configuration Settings --------------
# Where to fetch the pipeline configuration for the main pipeline
path.config: /data/es/logstash-5.6.1/config/logstash.conf
# Pipeline configuration string for the main pipeline
# config.string:
# At startup, test if the configuration is valid and exit (dry run)
# config.test_and_exit: false
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
# config.reload.automatic: false
# How often to check if the pipeline configuration has changed (in seconds)
# config.reload.interval: 3
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
# config.debug: false
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
# config.support_escapes: false
# ------------ Module Settings ---------------
# Define modules here.  Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and 
# above the next, like this:
# modules:
#   - name: MODULE_NAME
# Module variable names must be in the format of 
# modules:
# ------------ Queuing Settings --------------
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
# queue.type: memory
# If using queue.type: persisted, the directory path where the data files will be stored.
# Default is
path.queue: /data/es/logstash-5.6.1/data/queue
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 250mb
# queue.page_capacity: 250mb
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
# queue.max_events: 0
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
# queue.max_bytes: 1024mb
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
# queue.checkpoint.acks: 1024
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
# queue.checkpoint.writes: 1024
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
# queue.checkpoint.interval: 1000
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
# dead_letter_queue.enable: false

# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is
path.dead_letter_queue: /data/es/logstash-5.6.1/data/dead_letter_queue
# ------------ Metrics Settings --------------
# Bind address for the metrics REST endpoint
# ""
# Bind port for the metrics REST endpoint, this option also accept a range
# (9600-9700) and logstash will pick up the first available ports.
# http.port: 9600-9700
# ------------ Debugging Settings --------------
# Options for log.level:
#   * fatal
#   * error
#   * warn
#   * info (default)
#   * debug
#   * trace
# log.level: info
# ------------ Other Settings --------------
# Where to find custom plugins
# path.plugins: []




input {
    stdin {
    jdbc {
	  jdbc_driver_library => "../lib/greenplum-1.0.jar"
	  jdbc_driver_class => "com.pivotal.jdbc.GreenplumDriver"
	  jdbc_user => "root"
	  jdbc_password => "1234"
      jdbc_paging_enabled => "false"
      jdbc_page_size => "1000"
      statement_filepath => "/data/es/logstash-5.6.1/config/jdbc.sql"
      schedule => "* * * * *"
	  clean_run => false
	  record_last_run => true
	  last_run_metadata_path => "/data/es/logstash-5.6.1/data/jdbc.lastrun"

	  use_column_value => true
	  tracking_column => inputtime

filter {
    json {
        source => "message"
        remove_field => ["message"]

output {
    elasticsearch {
        hosts => ""
        index => "testIndex"
		document_type => "test"
         #document_id => "%{id}"
         flush_size => 1000
         idle_flush_time => 15
    stdout {
        codec => json_lines

input {
    stdin {
	jdbc {
	  jdbc_driver_library => "../lib/ojdbc14-"
	  jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
      jdbc_connection_string => "jdbc:oracle:thin:root/1234@//"
	  jdbc_user => "root"
	  jdbc_password => "1234"
      jdbc_paging_enabled => "false"
      jdbc_page_size => "1000"
      statement_filepath => "/data/es/logstash-5.6.1/config/sql/test.sql"
      schedule => "* * * * *"
	  clean_run => false
	  record_last_run => true
	  last_run_metadata_path => "/data/es/logstash-5.6.1/data/test.lastrun"
	  use_column_value => true
	  tracking_column => inputtime
	  type => "test"

input {
    stdin {
	jdbc {
	  jdbc_driver_library => "../lib/sqljdbc4.jar"
	  jdbc_driver_class => ""
      jdbc_connection_string => "jdbc:sqlserver://;databaseName=TESTDB "
	  jdbc_user => "root"
	  jdbc_password => "1234"
      jdbc_paging_enabled => "false"
      jdbc_page_size => "1000"
      statement_filepath => "/data/es/logstash-5.6.1/config/sql/test.sql"
      schedule => "* * * * *"
	  clean_run => false
	  record_last_run => true
	  last_run_metadata_path => "/data/es/logstash-5.6.1/data/test.lastrun"
	  use_column_value => true
	  tracking_column => inputtime
	  type => "test"

input {
    stdin {
	jdbc {
	  jdbc_driver_library => "../lib/ mysql-connector-java-6.0.5.jar "
	  jdbc_driver_class => " com.mysql.jdbc.Driver"
      jdbc_connection_string => "jdbc:mysql:// TESTDB "
	  jdbc_user => "root"
	  jdbc_password => "1234"
      jdbc_paging_enabled => "false"
      jdbc_page_size => "1000"
      statement_filepath => "/data/es/logstash-5.6.1/config/sql/test.sql"
      schedule => "* * * * *"
	  clean_run => false
	  record_last_run => true
	  last_run_metadata_path => "/data/es/logstash-5.6.1/data/test.lastrun"
	  use_column_value => true
	  tracking_column => inputtime
	  type => "test"

    执行命令 ./ logstash & 后台启动;
    如果要停止,则执行命令ps aux|grep logstash查看进程,然后杀死进程即可
##注意事项 ##

    export JAVA_HOME=/usr/local/jdk1.8.0_121
    export PATH=$JAVA_HOME/bin:$PATH
  2.logstash的增量配置的最后更新值sql_last_value默认为timestamp类型的时间值,如果需要使用自定义的字段,则需要自行修改sql_last_value值(只需修改一次),然后指定更新的字段(record_last_run => true tracking_column => stringField),这样logstash则会根据入库的最后一条记录的字段值进行改写和实现增量;


    Select * from tableName where inputtime>:sql_last_value
    — 2017-12-26 00:00:00
    说明:如果增量字段是时间类型,可按照上面的格式去写首次导入的时间,如果增量字段是字符串类型,比如:“20171226000000”,则上面的格式也需要写成:— ‘20171226000000’;否则增量不起作用

