maxwell实时同步mysql中binlog

 

概述

Maxwell是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。

特征

  1.    支持 SELECT * FROM table 的方式进行全量数据初始化
  2.    支持在主库发生failover后,自动恢复binlog位置(GTID)
  3.    可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持databasetablecolumn等级别的数据分区
  4.    工作方式是伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddlxidrow等各种event

与canal 和mysql_streamer比较

除了Maxwell外,目前常用的MySQL Binlog解析工具主要有阿里的canal、mysql_streamer,三个工具对比如下:

maxwell实时同步mysql中binlog

canal 由Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费canal解析到的数据。

maxwell相对于canal的优势是使用简单,它直接将数据变更输出为json字符串,不需要再编写客户端。

安装地址

1.Maxwell官网地址   http://maxwells-daemon.io/

2.安装包下载地址    https://github.com/zendesk/maxwell

安装MaxWell

 安装Maxwell

(1)上传maxwell-1.22.4.tar.gz 并进行解压

   tar -zxvf maxwell-1.22.4.tar.gz -C /opt/module/

MySql配置

(1)修改my-default.cnf

[kris@hadoop101 maxwell-1.22.5]$ cd /usr/share/mysql/
[kris@hadoop101 mysql]$ sudo cp my-default.cnf /etc/my.cnf
[kris@hadoop101 mysql]$ sudo vim /etc/my.cnf

提示:如果是rpm的方式安装MySql的那么则没有my.cnf文件,则需要修改mysql-default.cnf然后复制到/etc/路径下,如果/etc/路径下有my.cnf则直接修改my.cnf即可。

(2)修改my.cnf,添加如下配置。

[kris@hadoop101 mysql]$ sudo vim /etc/my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row

sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES

 启动MySql设置参数

mysql> set global binlog_format=ROW;
mysql> set global binlog_row_image=FULL;

创建maxwell库(启动时候会自动创建,不需手动创建)和用户

mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

重启MySql服务
[kris@hadoop101 ~]$ sudo service mysql restart

 Maxwell 使用

需要首先启动 zookeeper和kafka

 输出信息到命令行

(1)启动MaxWell

kris@hadoop101 maxwell-1.22.5]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=stdout
Using kafka version: 1.0.0
17:20:11,351 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
17:20:11,813 INFO  SchemaStoreSchema - Creating maxwell database
17:20:12,367 INFO  Maxwell - Maxwell v1.22.5 is booting (StdoutProducer), starting at Position[BinlogPosition[master.000001:3816], lastHeartbeat=0]
17:20:12,625 INFO  AbstractSchemaStore - Maxwell is capturing initial schema
17:20:13,733 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: master.000001:3816
17:20:13,828 INFO  BinaryLogClient - Connected to hadoop101:3306 at master.000001/3816 (sid:6379, cid:5)
17:20:13,829 INFO  BinlogConnectorLifecycleListener - Binlog connected.

    maxwell实时同步mysql中binlog

 

对mysql的test测试库进行增删改操作:

INSERT INTO stu1 VALUES(1003,'kris',22); //增、删、改

查看MaxWell 阻塞窗口

    maxwell实时同步mysql中binlog

{"database":"test","table":"stu1","type":"delete","ts":1576747370,"xid":454,"commit":true,"data":{"id":1003,"name":"wangwu","age":23}}
{"database":"test","table":"stu1","type":"insert","ts":1576747481,"xid":622,"commit":true,"data":{"id":1003,"name":"kris","age":22}}
{"database":"test","table":"stu1","type":"update","ts":1576747522,"xid":695,"commit":true,"data":{"id":1002,"name":"lisi","age":30},"old":{"age":18}}

 输出信息到kafka

 启动kafka集群:

[kris@hadoop101 kafka]$ bin/kafka-server-start.sh config/server.properties &
[1] 3459

[kris@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties &
[1] 6320

[kris@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties &
[1] 5823

启动消费者:

[kris@hadoop101 bin]$ ./kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic maxwell  --from-beginning
[2019-12-19 18:59:14,209] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-12-19 19:03:38,595] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: maxwell-1. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)

 

maxwell实时同步mysql中binlog

 

 

 

 

 

上一篇:netty-Selector


下一篇:Mariadb Galera Cluster 群集 安装部署