【Maxwell】01 安装及入门

 

官网地址:

https://maxwells-daemon.io/

 

下载地址(版本发行):

https://github.com/zendesk/maxwell/releases

 

参考教程自尚硅谷视频:

https://www.bilibili.com/video/BV1JQ4y1e7CN?spm_id_from=333.999.0.0

 

什么是Maxwell?

Zendesk 开源,用 Java 编写的 MySQL 实时抓取软件

功能:

实时监听MySQL的二进制日志文件binlog,生成JSON格式的消息
作为一个消息提供者发送给消息中间件,或者其他平台的应用程序

原理:

利用MySQL自身的主从配置特性,将自身伪装成从库对MySQL当成主库进行监听
记录主库的非查询操作

版本注意:

1、版本1.30开始全面放弃使用JDK8 改用JDK11版本
2、使用JDK8,最高版本 maxwell-1.29.2.tar.gz

 

一、安装Maxwell:

Linux平台 Java运行环境

1、解压maxwell-1.29.2.tar.gz 

tar -zxvf maxwell-1.29.2.tar.gz

2、(可选)复制解压出来的目录到/usr/local/中 

cp -r maxwell-1.29.2 /usr/local/

 

二、配置MySQL:

1、创建Maxwell库(库名自定义)

CREATE DATABASE `maxwell` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';

2、分配从库账号

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL PRIVILEGES ON *.* TO 'maxwell'@'%' WITH GRANT OPTION;
FLUSH PRIVILEGES;

3、开启Binlog日志

[mysqld]
# 开启binlog日志
log-bin=mysql-bin

# (可选)设置binlog日志的格式,推荐row
binlog_format= statement|mixed|row

 

三、启动Maxwell

1、方式一,直接参数式启动

# 切换到maxwell目录
cd /usr/local/maxwell-1.29.2

# 启动maxwell
./bin/maxwell \
--user='maxwell' \
--password='123456' \
--host='192.168.2.225' \
--port='3308' \
--producer='stdout' \
--jdbc_options='useSSL=false&serverTimezone=Asia/Shanghai'

参数说明:

# 主库分配的从库账号
--user='maxwell' \ 

# 主库分配的从库账号密码
--password='123456' \ 

# 主库地址
--host='192.168.2.225' \ 

# 主库端口
--port='3308' \ 

# 消息生产模式,stdout控制台输出,kafka
--producer='stdout' \ 

# jdbc的时区参数
--jdbc_options='useSSL=false&serverTimezone=Asia/Shanghai'

 

2、加载配置文件启动

进入maxwell目录:

cd /usr/local/maxwell-1.29.2

备份配置文件:

cp config.properties.example config.properties

编辑关键参数

vim config.properties

参数项:

# mysql login info (MySQL账号信息)
host=192.168.2.225
user=maxwell
password=123456
port=3308

# 订阅信息(生产者模式:即输出的目标 stdout控制台, kafka kafka集群)
producer=stdout

 

3、启动命令:

cd /usr/local/maxwell-1.29.2/
./bin/maxwell --config ./config.properties --jdbc_options='useSSL=false&serverTimezone=Asia/Shanghai'

 

四、监听效果:

检查Maxwell是否能够监听?

在被监听的主库中执行非查询SQL

 

1、插入日志

INSERT INTO `test-db`.`day_sale` (`ID`, `PRODUCT`, `CHANNEL`, `AMOUNT`, `SALE_DATE`) VALUES (NULL, '苹果', '淘宝', 2497.0000, NOW());

终端日志:

[root@localhost maxwell-1.29.2]# ./bin/maxwell --config ./config.properties --jdbc_options='useSSL=false&serverTimezone=Asia/Shanghai'
Using kafka version: 1.0.0
13:31:50,295 INFO  Maxwell - Starting Maxwell. maxMemory: 247332864 bufferMemoryUsage: 0.25
13:31:50,534 INFO  Maxwell - Maxwell v1.29.2 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-bin.000005:61414], lastHeartbeat=1642483666136]
13:31:50,757 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000005:16191], lastHeartbeat=0])
13:31:50,931 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000005:61414
13:31:50,963 INFO  BinaryLogClient - Connected to 192.168.2.225:3308 at mysql-bin.000005/61414 (sid:6379, cid:173)
13:31:50,963 INFO  BinlogConnectorReplicator - Binlog connected.
{"database":"test-db","table":"day_sale","type":"insert","ts":1642484382,"xid":65636,"commit":true,"data":{"ID":163,"PRODUCT":"苹果","CHANNEL":"淘宝","AMOUNT":2497.0000,"SALE_DATE":"2022-01-18 13:39:42"}}

 

可以看到Maxwell将会返回一条信息,将其格式化处理:

{
    "database": "test-db", -- 库名
    "table": "day_sale", -- 表名
    "type": "insert", -- 执行类型:插入
    "ts": 1642484382, -- 操作时间
    "xid": 65636,  -- 操作ID?
    "commit": true, -- 提交状态
    "data": { -- 数据信息: {字段: 值}
        "ID": 163,
        "PRODUCT": "苹果",
        "CHANNEL": "淘宝",
        "AMOUNT": 2497,
        "SALE_DATE": "2022-01-18 13:39:42"
    }
}

如果是多个数据,maxwell还是根据记录一条条打印 (说明maxwell以一条记录为标准单位进行输出)

{"database":"test-db","table":"day_sale","type":"insert","ts":1642484928,"xid":67070,"xoffset":0,"data":{"ID":164,"PRODUCT":"产品A","CHANNEL":"拼多多","AMOUNT":2497.0000,"SALE_DATE":"2022-01-18 13:48:48"}}
{"database":"test-db","table":"day_sale","type":"insert","ts":1642484928,"xid":67070,"xoffset":1,"data":{"ID":165,"PRODUCT":"产品B","CHANNEL":"京东","AMOUNT":2497.0000,"SALE_DATE":"2022-01-18 13:48:48"}}
{"database":"test-db","table":"day_sale","type":"insert","ts":1642484928,"xid":67070,"commit":true,"data":{"ID":166,"PRODUCT":"产品C","CHANNEL":"淘宝","AMOUNT":2497.0000,"SALE_DATE":"2022-01-18 13:48:48"}}

2、修改记录:

UPDATE `test-db`.`day_sale` SET `CHANNEL` = '拼多多2' WHERE `ID` = 164

格式化信息:

{
    "database": "test-db",
    "table": "day_sale",
    "type": "update", -- 操作类型:修改
    "ts": 1642485002, 
    "xid": 67425,
    "commit": true,
    "data": {
        "ID": 164,
        "PRODUCT": "产品A",
        "CHANNEL": "拼多多2",
        "AMOUNT": 2497,
        "SALE_DATE": "2022-01-18 13:48:48"
    },
    "old": { -- maxwell会存储原纪录变更的信息
        "CHANNEL": "拼多多"
    }
}

 

3、删除记录

DELETE FROM `test-db`.`day_sale` WHERE `ID` = 165

格式化信息:

{
    "database": "test-db",
    "table": "day_sale",
    "type": "delete", -- 操作类型,删除
    "ts": 1642485422,
    "xid": 68499,
    "commit": true,
    "data": { -- maxwell 无论新增,删除,修改,都会对数据进行完整保留
        "ID": 165,
        "PRODUCT": "产品B",
        "CHANNEL": "京东",
        "AMOUNT": 2497,
        "SALE_DATE": "2022-01-18 13:48:48"
    }
}

 

上述的操作,是终端阻塞的,要保持后台执行maxwell 就需要将消息输出到其他中间件中去,如kafka

# 订阅信息(生产者模式:即输出的目标 stdout控制台, kafka kafka集群)
producer=kafka

 

上一篇:Sqlite


下一篇:SQL手工注入MongoDB数据库