MySQL系列:基于binlog的增量订阅与消费(一)

  在一些业务场景中,像在数据分析中我们有时候需要捕获数据变化(CDC);在数据审计中,我们也往往需要知道数据从这个点到另一个点的变化;同样在实时分析中,我们有时候需要看到某个值得实时变化等。

要解决以上问题,我们可以实时解析mysql binlog日志,下面两个工具可以很好的处理这个问题:

1. canal(阿里巴巴开源项目,纯java开发)

2. python-mysql-replication(python开发)

使用场景:

1. MySQL到NoSQL的数据同步

2. MySQL到搜索引擎的复制

3. 当数据发生变化是清除数据缓存

4. 数据库审计

5. 实时数据分析

本文主要说说python-mysql-replication。

介绍

python-mysql-replication是基于MySQL复制原理实现的,把自己伪装成一个slave不断的从MySQL数据库获取binlog并解析。

当前版本(0.15)环境支持:

1. MySQL 5.5/5.6/5.7

2. Python >=2.6

3. Python 3.3/3.4/3.5/3.6(3.2不支持)

MySQL复制实现:

MySQL系列:基于binlog的增量订阅与消费(一)

python-mysql-replication实现:

MySQL系列:基于binlog的增量订阅与消费(一)

配置安装

安装软件

[root@mha-maxscale- ~]# pip install mysql-replication

MySQL授权

 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'192.168.3.%' IDENTIFIED BY '';

Binlog要满足如下条件

 MySQL>root@(none) 09:53:38>show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.01 sec) MySQL>show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec) MySQL>show variables like 'binlog_row_image';
+------------------+-------+
| Variable_name | Value |
+------------------+-------+
| binlog_row_image | FULL |
+------------------+-------+
1 row in set (0.00 sec)

示例代码:

 [root@mha-maxscale-1 script]# cat mysql-replication.py
#!/usr/bin/env python
# -*- coding: utf-8 -*- from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
import sys
import json def main():
mysql_settings = {'host': '192.168.3.130',
'port': 3306, 'user': 'replicator', 'passwd': ''}
stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=101,
blocking=True,
only_schemas=['zow'],
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
resume_stream=True,
log_file='mysql-bin.000013', log_pos=6197) for binlogevent in stream:
for row in binlogevent.rows:
event = {"schema": binlogevent.schema, "table": binlogevent.table, "log_pos": binlogevent.packet.log_pos}
if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["values"] = dict(row["values"].items())
event = dict(event.items())
elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["before_values"] = dict(row["before_values"].items())
event["after_values"] = dict(row["after_values"].items())
event = dict(event.items())
elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["values"] = dict(row["values"].items())
event = dict(event.items())
print json.dumps(event)
sys.stdout.flush() stream.close() if __name__ == "__main__":
main()

运行结果:

 [root@mha-maxscale- script]# python mysql-replication.py
{"action": "insert", "table": "t2", "log_pos": , "values": {"tname": "hh", "id": }, "schema": "zow"}
{"log_pos": , "after_values": {"tname": "ii", "id": }, "action": "update", "table": "t2", "before_values": {"tname": "hh", "id": }, "schema": "zow"}
{"log_pos": , "after_values": {"tname": "ii", "id": }, "action": "update", "table": "t2", "before_values": {"tname": "hh", "id": }, "schema": "zow"}
{"action": "delete", "table": "t2", "log_pos": , "values": {"tname": "ii", "id": }, "schema": "zow"}
{"action": "delete", "table": "t2", "log_pos": , "values": {"tname": "ii", "id": }, "schema": "zow"}

更多例子见:https://github.com/noplay/python-mysql-replication/tree/master/examples

上一篇:Android横竖屏切换小结


下一篇:深入理解之 Android Handler