应用一:mysql数据增量同步到kafka
1 准备mysql测试表
mysql> create table test_sync(id int not null auto_increment, name varchar(32), description varchar(64), create_time timestamp, update_time timestamp, primary key(id));
Query OK, 0 rows affected (0.04 sec) mysql> insert into test_sync (name, description, create_time, update_time) values('test1', '', now(), now());
Query OK, 1 row affected (0.02 sec) mysql> select * from test_sync;
+----+-------+-------------+---------------------+---------------------+
| id | name | description | create_time | update_time |
+----+-------+-------------+---------------------+---------------------+
| 1 | test1 | 1 | 2019-03-13 10:45:49 | 2019-03-13 10:45:49 |
+----+-------+-------------+---------------------+---------------------+
1 row in set (0.00 sec)
2 准备kafka topic
# bin/kafka-topics.sh --zookeeper $zk:2181/kafka -create --topic test_sync --partitions 2 --replication-factor 2
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "test_sync".
# bin/kafka-topics.sh --zookeeper $zk:2181/kafka -describe --topic test_sync
Topic:test_sync PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test_sync Partition: 0 Leader: 112 Replicas: 112,111 Isr: 112,111
Topic: test_sync Partition: 1 Leader: 110 Replicas: 110,112 Isr: 110,112
3 准备logstash conf
input {
jdbc {
jdbc_driver_library => "/path/to/mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/database"
jdbc_user => "DBUSERNAME"
jdbc_password => "DBPASSWORD"
jdbc_paging_enabled => "true"
jdbc_page_size => "10000"
jdbc_default_timezone => "UTC"
statement => "select * from TABLE where id > :sql_last_value"
tracking_column => id
tracking_column_type => "numeric"
use_column_value => true
record_last_run => true
clean_run => false
#I have scheduled this to run in every second
schedule => "* * * * * * Asia/Shanghai"
}
}
output {
kafka {
bootstrap_servers => "192.168.0.1:9092,192.168.0.2:9092"
topic_id => "test_sync"
codec => json
}
stdout {codec => json}
}
注意
1)必须要设置时区,注意解析后的时间是UTC(会比Asia/Shanghai的时间晚8小时),如果希望解析后的时间和mysql中的时间保持一致,需要设置jdbc_default_timezone => "UTC";
2)最好设置分页;
3)增量有两种方式,
- 一种是根据id,适用于数据创建之后不会修改的情形,对应 tracking_column_type => "numeric";
- 一种是根据update_time,适用于每次创建或修改之后都会修改update_time的情形,对应 tracking_column_type => "timestamp";
第二种示例
statement => "select * from table where update_time > :sql_last_value;"
tracking_column => "update_time"
tracking_column_type => "timestamp"
增量进度位于last_run_metadata_path,默认路径$HOME/.logstash_jdbc_last_run,如果一台机器上启动多个logstash需要每个单独配置
通过schedule控制多久执行一次增量同步;
4 启动logstash
$ logstash -f $conf
观察日志是否正常,日志中会显示执行的sql以及发送的消息;
[2019-03-13T22:43:00,312][INFO ][logstash.inputs.jdbc ] (0.000253s) SELECT version()
[2019-03-13T22:43:00,314][INFO ][logstash.inputs.jdbc ] (0.000564s) select * from test_sync where update_time > '2019-03-13 14:07:41';
{"update_time":"2019-03-13T06:42:40.000Z","id":1,"create_time":"2019-03-13T06:07:41.000Z","@version":"1","@timestamp":"2019-03-13T14:43:00.507Z","name":"test1","description":"2"}
进一步测试mysql修改和新增的情形,一切正常;
5 kafka确认
# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092 --topic test_sync --from-beginning
6 可能的问题
报错can't dup Fixnum
[2019-03-13T22:19:46,790][ERROR][logstash.pipeline ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<TypeError: can't dup Fixnum>, :backtrace=>["org/jruby/RubyKernel.java:1882:in `dup'", "uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date/format.rb:838:in `_parse'", "uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date.rb:1830:in `parse'"...
这有可能是因为tracking_column之前按照numeric运行,后来改成timestamp,检查last_run_metadata_path,默认路径$HOME/.logstash_jdbc_last_run,如果是因为以上原因,删除即可
参考:
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html
https://*.com/questions/31446405/incremental-mysql-reading-using-logstash