官网地址:
https://maxwells-daemon.io/
下载地址(版本发行):
https://github.com/zendesk/maxwell/releases
参考教程自尚硅谷视频:
https://www.bilibili.com/video/BV1JQ4y1e7CN?spm_id_from=333.999.0.0
什么是Maxwell?
Zendesk 开源,用 Java 编写的 MySQL 实时抓取软件
功能:
实时监听MySQL的二进制日志文件binlog,生成JSON格式的消息
作为一个消息提供者发送给消息中间件,或者其他平台的应用程序
原理:
利用MySQL自身的主从配置特性,将自身伪装成从库对MySQL当成主库进行监听
记录主库的非查询操作
版本注意:
1、版本1.30开始全面放弃使用JDK8 改用JDK11版本
2、使用JDK8,最高版本 maxwell-1.29.2.tar.gz
一、安装Maxwell:
Linux平台 Java运行环境
1、解压maxwell-1.29.2.tar.gz
tar -zxvf maxwell-1.29.2.tar.gz
2、(可选)复制解压出来的目录到/usr/local/中
cp -r maxwell-1.29.2 /usr/local/
二、配置MySQL:
1、创建Maxwell库(库名自定义)
CREATE DATABASE `maxwell` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';
2、分配从库账号
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456'; GRANT ALL PRIVILEGES ON *.* TO 'maxwell'@'%' WITH GRANT OPTION; FLUSH PRIVILEGES;
3、开启Binlog日志
[mysqld] # 开启binlog日志 log-bin=mysql-bin # (可选)设置binlog日志的格式,推荐row binlog_format= statement|mixed|row
三、启动Maxwell
1、方式一,直接参数式启动
# 切换到maxwell目录 cd /usr/local/maxwell-1.29.2 # 启动maxwell ./bin/maxwell \ --user='maxwell' \ --password='123456' \ --host='192.168.2.225' \ --port='3308' \ --producer='stdout' \ --jdbc_options='useSSL=false&serverTimezone=Asia/Shanghai'
参数说明:
# 主库分配的从库账号 --user='maxwell' \ # 主库分配的从库账号密码 --password='123456' \ # 主库地址 --host='192.168.2.225' \ # 主库端口 --port='3308' \ # 消息生产模式,stdout控制台输出,kafka --producer='stdout' \ # jdbc的时区参数 --jdbc_options='useSSL=false&serverTimezone=Asia/Shanghai'
2、加载配置文件启动
进入maxwell目录:
cd /usr/local/maxwell-1.29.2
备份配置文件:
cp config.properties.example config.properties
编辑关键参数
vim config.properties
参数项:
# mysql login info (MySQL账号信息) host=192.168.2.225 user=maxwell password=123456 port=3308 # 订阅信息(生产者模式:即输出的目标 stdout控制台, kafka kafka集群) producer=stdout
3、启动命令:
cd /usr/local/maxwell-1.29.2/ ./bin/maxwell --config ./config.properties --jdbc_options='useSSL=false&serverTimezone=Asia/Shanghai'
四、监听效果:
检查Maxwell是否能够监听?
在被监听的主库中执行非查询SQL
1、插入日志
INSERT INTO `test-db`.`day_sale` (`ID`, `PRODUCT`, `CHANNEL`, `AMOUNT`, `SALE_DATE`) VALUES (NULL, '苹果', '淘宝', 2497.0000, NOW());
终端日志:
[root@localhost maxwell-1.29.2]# ./bin/maxwell --config ./config.properties --jdbc_options='useSSL=false&serverTimezone=Asia/Shanghai' Using kafka version: 1.0.0 13:31:50,295 INFO Maxwell - Starting Maxwell. maxMemory: 247332864 bufferMemoryUsage: 0.25 13:31:50,534 INFO Maxwell - Maxwell v1.29.2 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-bin.000005:61414], lastHeartbeat=1642483666136] 13:31:50,757 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000005:16191], lastHeartbeat=0]) 13:31:50,931 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000005:61414 13:31:50,963 INFO BinaryLogClient - Connected to 192.168.2.225:3308 at mysql-bin.000005/61414 (sid:6379, cid:173) 13:31:50,963 INFO BinlogConnectorReplicator - Binlog connected. {"database":"test-db","table":"day_sale","type":"insert","ts":1642484382,"xid":65636,"commit":true,"data":{"ID":163,"PRODUCT":"苹果","CHANNEL":"淘宝","AMOUNT":2497.0000,"SALE_DATE":"2022-01-18 13:39:42"}}
可以看到Maxwell将会返回一条信息,将其格式化处理:
{ "database": "test-db", -- 库名 "table": "day_sale", -- 表名 "type": "insert", -- 执行类型:插入 "ts": 1642484382, -- 操作时间 "xid": 65636, -- 操作ID? "commit": true, -- 提交状态 "data": { -- 数据信息: {字段: 值} "ID": 163, "PRODUCT": "苹果", "CHANNEL": "淘宝", "AMOUNT": 2497, "SALE_DATE": "2022-01-18 13:39:42" } }
如果是多个数据,maxwell还是根据记录一条条打印 (说明maxwell以一条记录为标准单位进行输出)
{"database":"test-db","table":"day_sale","type":"insert","ts":1642484928,"xid":67070,"xoffset":0,"data":{"ID":164,"PRODUCT":"产品A","CHANNEL":"拼多多","AMOUNT":2497.0000,"SALE_DATE":"2022-01-18 13:48:48"}} {"database":"test-db","table":"day_sale","type":"insert","ts":1642484928,"xid":67070,"xoffset":1,"data":{"ID":165,"PRODUCT":"产品B","CHANNEL":"京东","AMOUNT":2497.0000,"SALE_DATE":"2022-01-18 13:48:48"}} {"database":"test-db","table":"day_sale","type":"insert","ts":1642484928,"xid":67070,"commit":true,"data":{"ID":166,"PRODUCT":"产品C","CHANNEL":"淘宝","AMOUNT":2497.0000,"SALE_DATE":"2022-01-18 13:48:48"}}
2、修改记录:
UPDATE `test-db`.`day_sale` SET `CHANNEL` = '拼多多2' WHERE `ID` = 164
格式化信息:
{ "database": "test-db", "table": "day_sale", "type": "update", -- 操作类型:修改 "ts": 1642485002, "xid": 67425, "commit": true, "data": { "ID": 164, "PRODUCT": "产品A", "CHANNEL": "拼多多2", "AMOUNT": 2497, "SALE_DATE": "2022-01-18 13:48:48" }, "old": { -- maxwell会存储原纪录变更的信息 "CHANNEL": "拼多多" } }
3、删除记录
DELETE FROM `test-db`.`day_sale` WHERE `ID` = 165
格式化信息:
{ "database": "test-db", "table": "day_sale", "type": "delete", -- 操作类型,删除 "ts": 1642485422, "xid": 68499, "commit": true, "data": { -- maxwell 无论新增,删除,修改,都会对数据进行完整保留 "ID": 165, "PRODUCT": "产品B", "CHANNEL": "京东", "AMOUNT": 2497, "SALE_DATE": "2022-01-18 13:48:48" } }
上述的操作,是终端阻塞的,要保持后台执行maxwell 就需要将消息输出到其他中间件中去,如kafka
# 订阅信息(生产者模式:即输出的目标 stdout控制台, kafka kafka集群) producer=kafka