源码分析
Canal的版本是1.0.3,先找到程序的入口点
/**
* canal独立版本启动的入口类
*
*/
public class CanalLauncher {
...
public static void main(String[] args) {
try {
...
if (remoteConfigLoader != null) {
remoteConfigLoader.startMonitor(new RemoteCanalConfigMonitor() {
@Override
public void onChange(Properties properties) {
try {
// 远程配置canal.properties修改重新加载整个应用
canalStater.destroy();
//从这里进入下面的类
canalStater.start(properties);
} catch (Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
}
});
}
....
} catch (Throwable e) {
logger.error("## Something goes wrong when starting up the canal Server:", e);
}
}
}
可以看到启动时,通过配置文件判断是否需要将监听到的binlog数据写入mq
/**
* Canal server 启动类
*
*/
public class CanalStater {
private CanalController controller = null;
private CanalMQProducer canalMQProducer = null;
private Thread shutdownThread = null;
private CanalMQStarter canalMQStarter = null;
/**
* 启动方法
*
* @param properties canal.properties 配置
* @throws Throwable
*/
synchronized void start(Properties properties) throws Throwable {
String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
//根据配置初始化mq的生产者
if (serverMode.equalsIgnoreCase("kafka")) {
canalMQProducer = new CanalKafkaProducer();
} else if (serverMode.equalsIgnoreCase("rocketmq")) {
canalMQProducer = new CanalRocketMQProducer();
}
if (canalMQProducer != null) {
...
if ("true".equals(autoScan)) {
String rootDir = CanalController.getProperty(properties, CanalConstants.CANAL_CONF_DIR);
...
} else {
String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
System.setProperty(CanalConstants.CANAL_DESTINATIONS, destinations);
}
}
logger.info("## start the canal server.");
controller = new CanalController(properties);
//在这里进入下一面的类
controller.start();
...
if (canalMQProducer != null) {
canalMQStarter = new CanalMQStarter(canalMQProducer);
MQProperties mqProperties = buildMQProperties(properties);
//启动mq生产者
canalMQStarter.start(mqProperties);
controller.setCanalMQStarter(canalMQStarter);
}
}
...
}
下面看下这个类CanalController
创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
/**
* canal调度控制器
*
*/
public class CanalController {
....
public void start() throws Throwable {
...
for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
final String destination = entry.getKey();
InstanceConfig config = entry.getValue();
// 创建destination的工作节点
//创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状
if (!embededCanalServer.isStart(destination)) {
// HA机制启动
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
态
if (!config.getLazy() && !runningMonitor.isStart()) {
//从这里进入下一个类
runningMonitor.start();
}
}
if (autoScan) {
instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
}
}
// 启动网络接口
if (canalServer != null) {
canalServer.start();
}
}
...
}
上面现实了canal的程序入口,下面重点来分析下ServerRunningMonitor 这个类看下cancal如何现实主备切换
一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行尝试创建新的同名节点,重新选出一个canal server启动instance.
/**
* 针对server的running节点控制
*
*/
public class ServerRunningMonitor extends AbstractCanalLifeCycle {
private BooleanMutex mutex = new BooleanMutex(false);
// 当前服务节点状态信息
private ServerRunningData serverData;
// 当前实际运行的节点状态信息
private volatile ServerRunningData activeData;
...
public ServerRunningMonitor(){
// 创建父节点
dataListener = new IZkDataListener() {
public void handleDataChange(String dataPath, Object data) throws Exception {
MDC.put("destination", destination);
ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
if (!isMine(runningData.getAddress())) {
mutex.set(false);
}
if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
release = true;
releaseRunning();// 彻底释放mainstem
}
activeData = (ServerRunningData) runningData;
}
public void handleDataDeleted(String dataPath) throws Exception {
MDC.put("destination", destination);
//利用AQS自旋锁实现多线程下的无锁阻塞
mutex.set(false);
if (!release && activeData != null && isMine(activeData.getAddress())) {
// 如果上一次active的状态就是本机,则即时触发一下active抢占
initRunning();
} else {
// 否则就是等待delayTime,避免因网络闪断或者zk异常,导致出现频繁的切换操作
// 具体场景: canal server所在的网络出现闪断,导致zookeeper认为session失
// 效, 释放了running节点,此时canal server对应的jvm并未退出,(一种假死状
// 态,非常特殊的情况)
delayExector.schedule(new Runnable() {
public void run() {
initRunning();
}
}, delayTime, TimeUnit.SECONDS);
}
}
};
}
...
private void initRunning() {
if (!isStart()) {
return;
}
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
// 序列化
byte[] bytes = JsonUtils.marshalToByte(serverData);
try {
mutex.set(false);
zkClient.create(path, bytes, CreateMode.EPHEMERAL);
activeData = serverData;
processActiveEnter();// 触发一下事件
mutex.set(true);
} catch (ZkNodeExistsException e) {
bytes = zkClient.readData(path, true);
if (bytes == null) {// 如果不存在节点,立即尝试一次
initRunning();
} else {
activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
}
} catch (ZkNoNodeException e) {
zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
initRunning();
}
}
/**
* 判断处于active的节点是不是本机实例
* @param address
* @return
*/
private boolean isMine(String address) {
return address.equals(serverData.getAddress());
}
}
下面来做个下HA机制的测试
操作步骤
一.配置
把canal配置成HA的架构,配置如下
1.canal-master
canal.properties
canal.id = 2
canal.ip = 127.0.0.1
canal.port = 32121
canal.metrics.pull.port = 11112
canal.zkServers = 127.0.0.1:2181
#基于ZK记录解析位点,HA机制下必须启用
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal.mq.servers = 127.0.0.1:9092
Instace.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=example
canal.mq.partition=0
2.canal-slave
canal.properties
canal.id = 3
canal.ip = 127.0.0.1
canal.port = 32122
canal.metrics.pull.port = 11113
canal.zkServers = 127.0.0.1:2181
#基于ZK记录解析位点,HA机制下必须启用
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal.mq.servers = 127.0.0.1:9092
Instace.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=example
canal.mq.partition=0
两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置
高可用测试
1.分别启动canal-master和canal-slave
在zk里可以看到canal-master先创建running节点处于active,此时canal-master处于standby
2.截断表t_test
Kafka tool里这是出现了一条数据type为TRUNCATE
{ "data": null, "database": "trade", "es": 1555560679000, "id": 1, "isDdl": true, "mysqlType": null, "old": null, "pkNames": null, "sql": "TRUNCATE `t_test`", "sqlType": null, "table": "t_test", "ts": 1555560920238, "type": "TRUNCATE" } |
3.通过navicat往mysql里插入数据
直接执行t_test.sql文件,往t_test表里按照id编号插入8000条数据(注意不要用navicat的数据同步功能可能会造成数据写入数据不是按照id递增的顺序,对测试结果的观察不大友好)
{
"data": null,
"database": "trade",
"es": 1555560679000,
"id": 1,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "TRUNCATE `t_test`",
"sqlType": null,
"table": "t_test",
"ts": 1555560920238,
"type": "TRUNCATE"
}
4.模拟 canal server对应的jvm异常crash的情况
把canal-master对应得java进程强制kill掉,过了段时间可以看到running节点cid变成了3,就是canal-slave的id,说明canal完成了主从切换。
5.检查kafka里数据的正确性
通过新running(ctime = 2019-04-18 12:33:07) 节点的创建时间查找时间为2019-04-18 12:33:07附近的数据可以发现id为1392-1394的数据插入重复发到kafka里了
原因分析:canal的HA机制是通过把从mysql binlg拉取的位点信息异步写入zk里的,发生主从切换的时候canal-slave节点会zk里把该位点信息读取出来,并且从这个地方开始解析mysql的binlog日志。canal-master可能有些数据已经发到kafka里但是还来不及更新zk就挂掉了,所以就会出现数据重复发到kafka的情况
写入zk的cursor位点信息如下:
建议解决方案:HA机制可以保证拉取binlog的数据不丢失,但是会出现重复发送到kafka的情况,DELETE和UPDATE事件的记录消息并没影响,而INSERT事件需要根据原始数据的id进行幂等性判断
三.功能测试
1.执行批量更新的操作
UPDATE `t_test` a
SET `net_balance` = '123.0000000000',
`sma_committed` = '123.0000000000',
`high_water_mark` = '2359539.0000000000',
`version` = '2018-11-01'
WHERE
a.create_time >= '2018-10-11 14:42:22'
AND a.create_time <= '2019-02-13 04:28:23'
2.查看kafka里的数
据
offset=0的数据详情
结论:批量操作的DML语句是多条数据(具体数量并不确定)合在一起发一条kafka数据的
四.数据格式
Kafka收到canal解析的DML binlog日志格式
{
"data": [
{
//修改,删除,新增后的数据
"external_id": "20180522032329",
"complete_time": "2018-05-22 20:20:12",
"return_time": "2018-05-24 11:34:12"
}
],
"database": "数据库名",
"es": 1555489722000,
"id": 主键值,
"isDdl": false,
"mysqlType": {
//字段格式
"external_id": "varchar(32)",
"complete_time": "datetime",
"return_time": "datetime"
},
"old": [
{
//老的数据
"complete_time": "2018-05-22 20:20:22",
"return_time": "2018-05-24 11:34:10"
}
],
"pkNames": [
//主键值
"external_id"
],
"sql": "",
"sqlType": {
"external_id": 12,
"complete_time": 93,
"return_time": 93
},
//表名
"table": "return_time",
"ts": 1555489722742,
//操作类型
"type": "UPDATE"
}
Kafka收到canal解析的DDL binlog日志格式
{
"data": null,
"database": "trade",
"es": 1555575860000,
"id": 2832,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "ALTER TABLE `t_order`\r\nADD COLUMN `test` varchar(255) NULL AFTER `is_bo_modified`",
"sqlType": null,
"table": "t_order",
"ts": 1555575861188,
"type": "ALTER"
}