使用中间件logstash监控数据库同步到es

1.关于下载(linux版本)要和es版本保持一致,避免bug

https://www.elastic.co/cn/downloads/logstash

因为logstash要连数据库 我们要准备 mysql-connector-java-8.0.13.jar 驱动

2.Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地

同步方式: 

  1.  主键的新增方式

  2.   update_time方式( 这里使用update_time方式,需要数据库有这种更新字段)

比较:

使用 logstash-input-jdbc 插件读取 mysql 的数据,这个插件的工作原理比较简单,就是定时执行一个 sql,然后将 sql 执行的结果写入到流中,增量获取的方式没有通过 binlog 方式同步,而是用一个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询比当前大的记录即可获取这段时间内的全部增量,一般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只适用于那种只有插入没有更新的表,update_time 更加通用一些,建议在 mysql 表设计的时候都增加一个 update_time 字段。

下面进一步详细说配置:

cd /usr/local/logstash-7.15.1/config/mysql.conf

input {
  jdbc{
    jdbc_driver_library => "/usr/local/logstash-7.15.1/bin/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
     # 8.0以上版本:一定要把serverTimezone=UTC天加上  
    jdbc_driver_class => "com.mysql.jdbc.Driver"   
    jdbc_connection_string => "jdbc:mysql://localhost:3306/nsbd?characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&serverTimezone=UTC"
    jdbc_user => "root"
    jdbc_password => "root"
    #schedule: 执行 sql 时机,类似 crontab 的调度schedule: 执行 sql 时机,类似 crontab 的调度
    #https://cron.qqe2.com/ 这个网站执行通配符
     schedule => "*/5 * * * * *"  
    #这里使用update_time方式,需要数据库有这种更新字段
    statement => "SELECT * FROM user WHERE update_time >= :sql_last_value"
    #使用递增列的值
    use_column_value => true
    #递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型
    tracking_column_type => "timestamp"
    #递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
    tracking_column => "update_time"
    #同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
    last_run_metadata_path => "syncpoint_table"
  }
}
output {
    elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.126.128:9200"]
        # 索引名称 可自定义
        index => "user"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{id}"
        document_type => "user"
    }
    stdout {
        # JSON格式输出
        codec => json_lines
    }
}

报错:

 Could not fetch all the sources {:exception=>LogStash::ConfigLoadingError, :message=>"The following config files contains non-ascii characters but are not UTF-8 encoded

使用中间件logstash监控数据库同步到es

 编码问题,转换一下编码utf8 ,我使用文本转换的 

配置ok! 然后启动一下 

cd  /usr/local/logstash-7.15.1/bin/

执行 有点慢 看打印的日志

 ./logstash  -f ../config/mysql.conf

现在打开es head 查询就同步过来了啊
使用中间件logstash监控数据库同步到es

 

上一篇:修饰符.sync


下一篇:brew update 失败