【原创】大数据基础之Logstash(2)应用之mysql-kafka

应用一:mysql数据增量同步到kafka

1 准备mysql测试表

mysql> create table test_sync(id int not null auto_increment, name varchar(32), description varchar(64), create_time timestamp, update_time timestamp, primary key(id));
Query OK, 0 rows affected (0.04 sec) mysql> insert into test_sync (name, description, create_time, update_time) values('test1', '', now(), now());
Query OK, 1 row affected (0.02 sec) mysql> select * from test_sync;
+----+-------+-------------+---------------------+---------------------+
| id | name | description | create_time | update_time |
+----+-------+-------------+---------------------+---------------------+
| 1 | test1 | 1 | 2019-03-13 10:45:49 | 2019-03-13 10:45:49 |
+----+-------+-------------+---------------------+---------------------+
1 row in set (0.00 sec)

2 准备kafka topic

# bin/kafka-topics.sh --zookeeper $zk:2181/kafka -create --topic test_sync --partitions 2 --replication-factor 2
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "test_sync".
# bin/kafka-topics.sh --zookeeper $zk:2181/kafka -describe --topic test_sync
Topic:test_sync PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test_sync Partition: 0 Leader: 112 Replicas: 112,111 Isr: 112,111
Topic: test_sync Partition: 1 Leader: 110 Replicas: 110,112 Isr: 110,112

3 准备logstash conf

input {
jdbc {
jdbc_driver_library => "/path/to/mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/database"
jdbc_user => "DBUSERNAME"
jdbc_password => "DBPASSWORD"
jdbc_paging_enabled => "true"
jdbc_page_size => "10000"
jdbc_default_timezone => "UTC"
statement => "select * from TABLE where id > :sql_last_value"
tracking_column => id
tracking_column_type => "numeric"
use_column_value => true
record_last_run => true
clean_run => false
#I have scheduled this to run in every second
schedule => "* * * * * * Asia/Shanghai"
}
}

output {
  kafka {
    bootstrap_servers => "192.168.0.1:9092,192.168.0.2:9092"
    topic_id => "test_sync"
    codec => json
  }
  stdout {codec => json}
}

注意

1)必须要设置时区,注意解析后的时间是UTC(会比Asia/Shanghai的时间晚8小时),如果希望解析后的时间和mysql中的时间保持一致,需要设置jdbc_default_timezone => "UTC";

2)最好设置分页;

3)增量有两种方式,

  • 一种是根据id,适用于数据创建之后不会修改的情形,对应 tracking_column_type => "numeric";
  • 一种是根据update_time,适用于每次创建或修改之后都会修改update_time的情形,对应 tracking_column_type => "timestamp";

第二种示例

  statement => "select * from table where update_time > :sql_last_value;"
tracking_column => "update_time"
tracking_column_type => "timestamp"

增量进度位于last_run_metadata_path,默认路径$HOME/.logstash_jdbc_last_run,如果一台机器上启动多个logstash需要每个单独配置

通过schedule控制多久执行一次增量同步;

4 启动logstash

$ logstash -f $conf

观察日志是否正常,日志中会显示执行的sql以及发送的消息;

[2019-03-13T22:43:00,312][INFO ][logstash.inputs.jdbc ] (0.000253s) SELECT version()
[2019-03-13T22:43:00,314][INFO ][logstash.inputs.jdbc ] (0.000564s) select * from test_sync where update_time > '2019-03-13 14:07:41';
{"update_time":"2019-03-13T06:42:40.000Z","id":1,"create_time":"2019-03-13T06:07:41.000Z","@version":"1","@timestamp":"2019-03-13T14:43:00.507Z","name":"test1","description":"2"}

进一步测试mysql修改和新增的情形,一切正常;

5 kafka确认

# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092 --topic test_sync --from-beginning

6 可能的问题

报错can't dup Fixnum

[2019-03-13T22:19:46,790][ERROR][logstash.pipeline ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<TypeError: can't dup Fixnum>, :backtrace=>["org/jruby/RubyKernel.java:1882:in `dup'", "uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date/format.rb:838:in `_parse'", "uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date.rb:1830:in `parse'"...

这有可能是因为tracking_column之前按照numeric运行,后来改成timestamp,检查last_run_metadata_path,默认路径$HOME/.logstash_jdbc_last_run,如果是因为以上原因,删除即可

参考:

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html

https://*.com/questions/31446405/incremental-mysql-reading-using-logstash

上一篇:MSSQL部分补丁的列表及下载地址(持续更新)


下一篇:jQuery幻灯片skitter-slider插件学习总结