mysqlsmom 文档: https://mysqlsmom.readthedocs.io/en/latest/hello.html
github: https://github.com/m358807551/mysqlsmom
环境要求:
1、python2.7 2、redis 3、Mysql 配置 binlog-format=row
安装
pip install mysqlsmom
全量同步
# 创建全量同步配置文件 $ mom new test_mom/init_config.py -t init --force # 编辑配置文件 $ vim ./test_mom/init_config.py # 按注释提示修改配置 # 开始同步 $ mom run -c ./test_mom/init_config.py
增量同步
配置三个文件
test_mom ├── binlog_config.py # 配置文件 ├── my_filters.py # 过滤器 配置于 watched └── my_handlers.py # 处理器 配置于 pipeline
新建配置
mom new test_mom/binlog_config.py -t binlog --force
1、binlog_config.py
# coding=utf-8 STREAM = "BINLOG" # "BINLOG" or "INIT" SERVER_ID = 99 SLAVE_UUID = __name__ # 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1 BULK_SIZE = 1 BINLOG_CONNECTION = { 'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': '123456' } # redis存储上次同步位置等信息 REDIS = { "host": "127.0.0.1", "port": 6379, "db": 0, # "password": "password", # 不需要密码则注释或删掉该行 } NODES = [{"host": "127.0.0.1", "port": 9200}] TASKS = [ { "stream": { "database": "demo", "table": "student" }, "jobs": [{ "actions": ["insert", "update"], "watched": { "filter_display": {} }, "pipeline": [ {"only_fields": {"fields": ["id", "name", "age"]}}, {"change_name": {"key": "name", "prefix": "hot-"}}, {"set_id": {"field": "id"}} ], "dest": { "es": { "action": "upsert", "index": "demo", "type": "student", "nodes": NODES } } }, { "actions": ["delete"], "pipeline": [ # {"only_fields": {"fields": ["id", "name", "age"]}}, {"set_id": {"field": "id"}} ], "dest": { "es": { "action": "delete", "index": "demo", "type": "student", "nodes": NODES } } } ] } ] CUSTOM_ROW_HANDLERS = "./my_handlers.py" CUSTOM_ROW_FILTERS = "./my_filters.py"
自定义处理器 my_handlers.py
# -*- coding: utf-8 -*- import copy def change_name(row, key, prefix): new_row = copy.deepcopy(row) new_row[key] = "{}{}".format(prefix, row[key]) # 返回数据字典,下一工序继续处理 return new_row
自定义过滤器 my_filters.py
# -*- coding: utf-8 -*- def filter_display(event): # 返回True 或 False,使用或丢弃 return event["values"]["display"] == 1
启动
mom run -c test_mom/binlog_config.py