Canal监听MySQL Binarylog消费实践

背景

在MySQL作为如今最为主流使用的数据库背景下,除了常规的数据存储使用场景,还存在大量的使用需求,如:数据自动同步,数据更新监听等场景。由于数据库层面的增量数据变动无法依靠应用服务层面进行有效感知,因此,还是需要从数据库自身提供的机制入手进行实现处理。下面为将展示关于如何借助Canal实践解决场景的几个业务场景问题。

Canal简述

Github开源地址:https://github.com/alibaba/canal
Canal监听MySQL Binarylog消费实践

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)。
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)。
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据。

Canal工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议。
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )。
  • canal 解析 binary log 对象(原始为 byte 流)。

Canal服务范围

  • 当前Canal支持源端MYSQL的版本包括:5.1.x、5.5.x、5.6.x、5.7.x、8.0.x。
  • Canal直接支持的写入目标类型包括:MYSQL、Kafka、elasticsearch、Hbase、RocketMQ等。由于Datahub直接支持Kafka协议的写入,所以Canal服务也可以支持往Datahub中写入Binary Log数据。

Canal消费方式

Canal在伪装成为目标MySQL的一个Slave节点后,获取到来自主节点的BinaryLog日志内容。那么作为BinaryLog消费者该如何使用canal监听得到的内容呢。Canal为我们提供了两种类型的方式,直接消费和投递。直接消费即使用Canal配套提供的客户端程序,即时消费Canal的监听内容。投递是指配置指定的MQ类型以及对应信息,Canal将会按照BinaryLog的条目投递到指定的MQ下,再交由MQ为各种消费形式提供数据消费。

Canal客户端消费

Canal官方SDK提供地址:https://github.com/alibaba/canal/wiki/ClientExample

  • 消费使用代码摘要与简单说明:
// List<Entry>为一次消费包含的多条BinaryLog数据内容
private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            // 行变动数据存放对象,其中rowDatasList为更新前后数据内容
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }
        // 当前BinaryLog动作类型
        EventType eventType = rowChage.getEventType();
        
        // entry.getHeader()存放当前变动数据对应的Schema、Table信息
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

Canal投递MQ

Canal暂时提供的投递MQ类型包括:

投递配置

以投递到RocketMQ为样例提供Canal版本为1.1.4的配置样例(对比发现1.1.5开始配置内容有所改动,官网文档说明内容有所存疑,因为为大家提供可用1.1.4版本为配置样例参考),MQ使用大禹平台RocketMQ为样例。
  • canal.properties
# 改动部分一
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =${大禹账号AccessKey}
canal.aliyun.secretKey = ${大禹账号SecretKey}

#改动部分二
##################################################
#########              MQ              #############
##################################################
canal.mq.servers = ${大禹MQ访问地址}
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = ${大禹生产者Group}
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
  • instance.properties
# position info
canal.instance.master.address=${canal服务监听的MySQL地址}

canal.instance.dbUsername=${数据库访问用户名}
canal.instance.dbPassword=${数据库访问密码}

# table regex
canal.instance.filter.regex=${监听数据表匹配表达式}
# mq config
canal.mq.topic=${大禹MQ投递Topic名称}

投递内容数据结构展示

与客户端直接消费不同,canal投递上MQ的消息内容为文本内容,下面为大家展示MQ中一条BinaryLog的格式内容,直观地感受可以使用的相关信息。

  • 样例数据
{
    "data": [
        {
            "id": "4971874",
            "code": "45ffb86",
            "name": "组织1",
            "display_name": "组织1",
            "simple_name": "组织1",
            "parent_code": "da43c609",
            "type": "R",
            "sub_type": null,
            "comment": null,
            "level": "2",
            "region_id": null,
            "dutyuser_id": null,
            "ou_code": null,
            "cry": null,
            "line": null,
            "manager": null,
            "address": null,
            "create_by": null,
            "create_on": null,
            "last_modify_by": "20443",
            "last_modify_on": "2021-12-09 06:00:03"
        }
    ],
    "database": "test",
    "es": 1617179083000,
    "id": 7,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(20)",
        "code": "varchar(255)",
        "name": "varchar(255)",
        "display_name": "varchar(255)",
        "simple_name": "varchar(128)",
        "parent_code": "varchar(255)",
        "type": "varchar(32)",
        "sub_type": "varchar(32)",
        "comment": "varchar(2040)",
        "level": "int(11)",
        "region_id": "varchar(128)",
        "dutyuser_id": "varchar(32)",
        "ou_code": "varchar(32)",
        "cry": "varchar(32)",
        "line": "varchar(32)",
        "manager": "varchar(255)",
        "address": "varchar(255)",
        "create_by": "bigint(20)",
        "create_on": "datetime",
        "last_modify_by": "bigint(20)",
        "last_modify_on": "datetime"
    },
    "old": [
        {
            "last_modify_by": null
        }
    ],
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": -5,
        "code": 12,
        "name": 12,
        "display_name": 12,
        "simple_name": 12,
        "parent_code": 12,
        "type": 12,
        "sub_type": 12,
        "comment": 12,
        "level": 4,
        "region_id": 12,
        "dutyuser_id": 12,
        "ou_code": 12,
        "cry": 12,
        "line": 12,
        "manager": 12,
        "address": 12,
        "create_by": -5,
        "create_on": 93,
        "last_modify_by": -5,
        "last_modify_on": 93
    },
    "table": "test2",
    "ts": 1617179084459,
    "type": "UPDATE"
}

上述展示的内容为更新其中一条数据中的"last_modify_by"字段的内容。借助上面的BinaryLog样例,可以关注到一下内容:

  • 库表信息:字段"database","table"分别展示了数据库名称以及库表名称。
  • 数据表结构:mysqlType中囊括了当前BinaryLog中操作的数据表的具体字段结构,包括字段名称、字段类型;
  • 操作类型:字段"type"显示了该BinaryLog内容对应的操作类型。主要包括:INSERT、UPDATE、DELETE。
  • 更新前后的数据内容:"data"中可以看到组成为数组List,说明一条BinaryLog数据可能包括多条更新后的数据内容。同样地,从"old"字段中也可以看到更新前的数据内容,也同为数据List结构。

应用实践

数据增量同步

  • 实践背景

    在某项目交付过程中,客户希望解决一个数据同步问题,要求实时性较高,性能影响尽可能少地减少。同时,由于来源数据库为一个外部维护的数据库,且无法直接使用到新建设的业务系统中作为业务数据库。旧的解决方案进行每日同步的频次执行,在业务量较少的时间段进行一次大规模的数据查询再插入的动作。旧的方案会导致数据同步滞后时间为一天,且全量的查询动作对外部数据库产生较大的使用影响,遭到外部维护方的查询限制,同步执行时间也比较长。

  • 数据同步实时化

    为了使数据能够进行实时同步,决定使用Canal接入到外部数据库,然后把Canal监听的BinaryLog接入到新建设的MySQL库中,使得两边的数据库数据同步延迟仅有秒级差异。Canal的接入也使得每日执行的同步任务得以取消,减少了额外的系统维护工作。而且BinaryLog的监听推送对外部数据库性能来说影响较少。

  • 增量数据投递消费

    此外,Canal投递消费能力能够拓展数据增量改动的体现形式。Canal把感知到的数据库变动内容投递到指定的MQ Topic,为后续的消费途径提供多样性。如:Canal订阅指定数据表的变动数据投递到Datahub中,投递的内容就如上面的数据结构展示。允许借助Blink计算平台对数据进行感知整合,实现业务场景的下聚合统计等实时计算诉求;也能够开放Datahub的Topic订阅权限,把增量数据的变动开发到指定使用者,提供实时数据变动推送。

Canal监听MySQL Binarylog消费实践

监听字段更新

  • 实践背景

    在大屏项目建设过程中,大量的指标数据维护同一指标表中是常见的处理手段。但指标数据的更新来源比较复杂,有数据开发同学进行写入,有外部系统进行推送。客户要求对大屏指标的实时性与有效性进行保证,但大量的指标更新情况无法有效监控。

  • 解决方案

    使用Canal对指定的指标表进行监听,对指标表的更新数据BinaryLog进行解析,然后以日志形式记录。针对每一条数据内容能够识别到具体的指标,把当前更新的数据信息记录到库表中。再按照对应的指标更新要求,感知更新日志表的数据库,就能够确保及时知道指标的更新频次是否符合预期,指标数据每次更新的数据内容,做到更新频次可监控,更新数据变动可追溯。

Canal监听MySQL Binarylog消费实践

上一篇:使用 Windows Vault 导入导出身份凭证


下一篇:Firefox 52 将停止支持所有 NPAPI 插件 Flash 除外