Canal源码分析之启动时处理逻辑和主备切换机制

Canal源码分析之启动时处理逻辑和主备切换机制 canal主备切换机制架构图

 

 源码分析

Canal的版本是1.0.3,先找到程序的入口点 


/**
 * canal独立版本启动的入口类
 *
 */
public class CanalLauncher {
 

    ...

    public static void main(String[] args) {
        try {
             
            ...
            
            if (remoteConfigLoader != null) {
                remoteConfigLoader.startMonitor(new RemoteCanalConfigMonitor() {

                    @Override
                    public void onChange(Properties properties) {
                        try {
                            // 远程配置canal.properties修改重新加载整个应用
                            canalStater.destroy();
                            //从这里进入下面的类
                            canalStater.start(properties);
                        } catch (Throwable throwable) {
                            logger.error(throwable.getMessage(), throwable);
                        }
                    }
                });
            }

            ....

        } catch (Throwable e) {
            logger.error("## Something goes wrong when starting up the canal Server:", e);
        }
    }

 

}

可以看到启动时,通过配置文件判断是否需要将监听到的binlog数据写入mq 

/**
 * Canal server 启动类
 *
 */
public class CanalStater {
 

    private CanalController     controller      = null;
    private CanalMQProducer     canalMQProducer = null;
    private Thread              shutdownThread  = null;
    private CanalMQStarter      canalMQStarter  = null;

    /**
     * 启动方法
     *
     * @param properties canal.properties 配置
     * @throws Throwable
     */
    synchronized void start(Properties properties) throws Throwable {
        String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);

        //根据配置初始化mq的生产者
        if (serverMode.equalsIgnoreCase("kafka")) {
            canalMQProducer = new CanalKafkaProducer();
        } else if (serverMode.equalsIgnoreCase("rocketmq")) {
            canalMQProducer = new CanalRocketMQProducer();
        }

        if (canalMQProducer != null) {
             ...

            if ("true".equals(autoScan)) {
                String rootDir = CanalController.getProperty(properties, CanalConstants.CANAL_CONF_DIR);

                ...

            } else {
                String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
                System.setProperty(CanalConstants.CANAL_DESTINATIONS, destinations);
            }
        }

        logger.info("## start the canal server.");
        controller = new CanalController(properties);
        //在这里进入下一面的类
        controller.start();
       

        ...

        if (canalMQProducer != null) {
            canalMQStarter = new CanalMQStarter(canalMQProducer);
            MQProperties mqProperties = buildMQProperties(properties);
             //启动mq生产者
            canalMQStarter.start(mqProperties);
            controller.setCanalMQStarter(canalMQStarter);
        }
    }

     ...
}

下面看下这个类CanalController  

创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态

/**
 * canal调度控制器
 *
 */
public class CanalController {

 
    ....

    public void start() throws Throwable {
   
        ...

        for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
            final String destination = entry.getKey();
            InstanceConfig config = entry.getValue();
            // 创建destination的工作节点
            //创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状
            if (!embededCanalServer.isStart(destination)) {
                // HA机制启动
                ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
态
                if (!config.getLazy() && !runningMonitor.isStart()) {
                    //从这里进入下一个类
                    runningMonitor.start();
                }
            }

            if (autoScan) {
                instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
            }
        }
 

        // 启动网络接口
        if (canalServer != null) {
            canalServer.start();
        }
    }

   ...

}

上面现实了canal的程序入口,下面重点来分析下ServerRunningMonitor 这个类看下cancal如何现实主备切换

一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行尝试创建新的同名节点,重新选出一个canal server启动instance.

/**
 * 针对server的running节点控制
 * 
 */
public class ServerRunningMonitor extends AbstractCanalLifeCycle {

    private BooleanMutex               mutex        = new BooleanMutex(false);
    // 当前服务节点状态信息
    private ServerRunningData          serverData;
    // 当前实际运行的节点状态信息
    private volatile ServerRunningData activeData;

    ...

    public ServerRunningMonitor(){
        // 创建父节点
        dataListener = new IZkDataListener() {

            public void handleDataChange(String dataPath, Object data) throws Exception {
                MDC.put("destination", destination);
                ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
                if (!isMine(runningData.getAddress())) {
                    mutex.set(false);
                }

                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
                    release = true;
                    releaseRunning();// 彻底释放mainstem
                }

                activeData = (ServerRunningData) runningData;
            }

            public void handleDataDeleted(String dataPath) throws Exception {
                MDC.put("destination", destination);
                //利用AQS自旋锁实现多线程下的无锁阻塞
                mutex.set(false);
                if (!release && activeData != null && isMine(activeData.getAddress())) {
                    // 如果上一次active的状态就是本机,则即时触发一下active抢占
                    initRunning();
                } else {
                    // 否则就是等待delayTime,避免因网络闪断或者zk异常,导致出现频繁的切换操作
                    // 具体场景: canal server所在的网络出现闪断,导致zookeeper认为session失 
                    // 效, 释放了running节点,此时canal server对应的jvm并未退出,(一种假死状 
                    // 态,非常特殊的情况)
                    delayExector.schedule(new Runnable() {

                        public void run() {
                            initRunning();
                        }
                    }, delayTime, TimeUnit.SECONDS);
                }
            }

        };

    }

     ...

 
    private void initRunning() {
        if (!isStart()) {
            return;
        }

        String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
        // 序列化
        byte[] bytes = JsonUtils.marshalToByte(serverData);
        try {
            mutex.set(false);
            zkClient.create(path, bytes, CreateMode.EPHEMERAL);
            activeData = serverData;
            processActiveEnter();// 触发一下事件
            mutex.set(true);
        } catch (ZkNodeExistsException e) {
            bytes = zkClient.readData(path, true);
            if (bytes == null) {// 如果不存在节点,立即尝试一次
                initRunning();
            } else {
                activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
            }
        } catch (ZkNoNodeException e) {
            zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
            initRunning();
        }
    }

    /**
     * 判断处于active的节点是不是本机实例
     * @param address
     * @return
     */
    private boolean isMine(String address) {
        return address.equals(serverData.getAddress());
    }


}

 

下面来做个下HA机制的测试

操作步骤

一.配置

把canal配置成HA的架构,配置如下

1.canal-master

canal.properties

canal.id = 2
canal.ip = 127.0.0.1
canal.port = 32121
canal.metrics.pull.port = 11112
canal.zkServers = 127.0.0.1:2181
#基于ZK记录解析位点,HA机制下必须启用
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal.mq.servers = 127.0.0.1:9092

Instace.properties

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=example
canal.mq.partition=0

2.canal-slave

canal.properties

canal.id = 3
canal.ip = 127.0.0.1
canal.port = 32122
canal.metrics.pull.port = 11113
canal.zkServers = 127.0.0.1:2181
#基于ZK记录解析位点,HA机制下必须启用
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal.mq.servers = 127.0.0.1:9092

Instace.properties

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=example
canal.mq.partition=0

两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置

高可用测试

1.分别启动canal-master和canal-slave

在zk里可以看到canal-master先创建running节点处于active,此时canal-master处于standby

Canal源码分析之启动时处理逻辑和主备切换机制

2.截断表t_test

Kafka tool里这是出现了一条数据type为TRUNCATE

 

{

    "data": null,

    "database": "trade",

    "es": 1555560679000,

    "id": 1,

    "isDdl": true,

    "mysqlType": null,

    "old": null,

    "pkNames": null,

    "sql": "TRUNCATE `t_test`",

    "sqlType": null,

    "table": "t_test",

    "ts": 1555560920238,

    "type": "TRUNCATE"

}

3.通过navicat往mysql里插入数据

直接执行t_test.sql文件,往t_test表里按照id编号插入8000条数据(注意不要用navicat的数据同步功能可能会造成数据写入数据不是按照id递增的顺序,对测试结果的观察不大友好)

Canal源码分析之启动时处理逻辑和主备切换机制

 

{
    "data": null,
    "database": "trade",
    "es": 1555560679000,	
    "id": 1,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "sql": "TRUNCATE `t_test`",
    "sqlType": null,
    "table": "t_test",
    "ts": 1555560920238,
    "type": "TRUNCATE"
}

4.模拟 canal server对应的jvm异常crash的情况

把canal-master对应得java进程强制kill掉,过了段时间可以看到running节点cid变成了3,就是canal-slave的id,说明canal完成了主从切换。

Canal源码分析之启动时处理逻辑和主备切换机制

5.检查kafka里数据的正确性

 

Canal源码分析之启动时处理逻辑和主备切换机制

通过新running(ctime = 2019-04-18 12:33:07) 节点的创建时间查找时间为2019-04-18 12:33:07附近的数据可以发现id为1392-1394的数据插入重复发到kafka里了

Canal源码分析之启动时处理逻辑和主备切换机制

原因分析:canal的HA机制是通过把从mysql binlg拉取的位点信息异步写入zk里的,发生主从切换的时候canal-slave节点会zk里把该位点信息读取出来,并且从这个地方开始解析mysql的binlog日志。canal-master可能有些数据已经发到kafka里但是还来不及更新zk就挂掉了,所以就会出现数据重复发到kafka的情况

写入zk的cursor位点信息如下:

Canal源码分析之启动时处理逻辑和主备切换机制

 

建议解决方案:HA机制可以保证拉取binlog的数据不丢失,但是会出现重复发送到kafka的情况,DELETE和UPDATE事件的记录消息并没影响,而INSERT事件需要根据原始数据的id进行幂等性判断

 

三.功能测试

1.执行批量更新的操作

UPDATE `t_test`   a
SET `net_balance` = '123.0000000000',
 `sma_committed` = '123.0000000000',
 `high_water_mark` = '2359539.0000000000',
 `version` = '2018-11-01'
WHERE
 a.create_time >= '2018-10-11 14:42:22'
AND  a.create_time <= '2019-02-13 04:28:23'

2.查看kafka里的数

Canal源码分析之启动时处理逻辑和主备切换机制

offset=0的数据详情

Canal源码分析之启动时处理逻辑和主备切换机制

结论:批量操作的DML语句是多条数据(具体数量并不确定)合在一起发一条kafka数据的

四.数据格式

Kafka收到canal解析的DML binlog日志格式

{
    "data": [
        {
            //修改,删除,新增后的数据
            "external_id": "20180522032329",
            "complete_time": "2018-05-22 20:20:12",
            "return_time": "2018-05-24 11:34:12"
        }
    ],
    "database": "数据库名",
    "es": 1555489722000,
    "id": 主键值,  
    "isDdl": false,

"mysqlType": {

        //字段格式
        "external_id": "varchar(32)",
        "complete_time": "datetime",
        "return_time": "datetime"

    },

    "old": [
        {
            //老的数据
            "complete_time": "2018-05-22 20:20:22",
            "return_time": "2018-05-24 11:34:10"
        }
    ],
"pkNames": [

        //主键值
        "external_id"
    ],
    "sql": "",
    "sqlType": {

        "external_id": 12,
        "complete_time": 93,
        "return_time": 93
},
    //表名
    "table": "return_time",
"ts": 1555489722742,
    //操作类型
    "type": "UPDATE"
}

Kafka收到canal解析的DDL binlog日志格式

{
    "data": null,
    "database": "trade",
    "es": 1555575860000,
    "id": 2832,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "sql": "ALTER TABLE `t_order`\r\nADD COLUMN `test`  varchar(255) NULL AFTER `is_bo_modified`",
    "sqlType": null,
    "table": "t_order",
    "ts": 1555575861188,
    "type": "ALTER"
}

 

上一篇:flink-sql解析canal-json实现实时同步


下一篇:Canal部署过程中的错误