Github: https://github.com/noplay/python-mysql-replication
设置同步账号权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY '123456'; # 刷新权限 flush privileges;
参考
利用Python my-replication读取mysql的binlog
安装
pip install mysql-replication
代码示例
# -*- coding: utf-8 -*- import datetime import json from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import ( DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent ) class DateEncoder(json.JSONEncoder): """ 自定义类,解决报错: TypeError: Object of type 'datetime' is not JSON serializable """ def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) # 配置数据库信息 mysql_settings = { 'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': '123456' } def main(): # 实例化binlog 流对象 stream = BinLogStreamReader( connection_settings=mysql_settings, server_id=100, # slave标识,唯一 blocking=True, # 阻塞等待后续事件 # 设定只监控写操作:增、删、改 only_events=[ DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent ] ) for binlogevent in stream: # binlogevent.dump() # 打印所有信息 for row in binlogevent.rows: # 打印 库名 和 表名 event = {"schema": binlogevent.schema, "table": binlogevent.table} if isinstance(binlogevent, DeleteRowsEvent): event["action"] = "delete" event["data"] = row["values"] elif isinstance(binlogevent, UpdateRowsEvent): event["action"] = "update" event["data"] = row["after_values"] # 注意这里不是values elif isinstance(binlogevent, WriteRowsEvent): event["action"] = "insert" event["data"] = row["values"] print(json.dumps(event, cls=DateEncoder)) # sys.stdout.flush() # stream.close() # 如果使用阻塞模式,这行多余了 if __name__ == '__main__': main() """ 输出数据格式 { "schema": "demo", # 数据库名 "table": "student", # 表名 "action": "update", # 动作 insert、delete、update "data": { # 数据,里边包含所有字段 "id": 26, "name": "haha", "age": 34, "update_time": "2019-06-06 16:59:06", "display": 0 } } """