Castled 的connector利用了schema 以及中间表进行数据存储(包含的已经提交的,未提交的)
官方使用了一个属于snapshot(快照),对于数据的处理,官方使用了excep sql 函数,基于不同
时间的snapshot 利用excep 就可以知道数据的变动
几个提供的中间表
主要在ConnectorExecutionConstants 中定义
public class ConnectorExecutionConstants {
public static final String CASTLED_CONTAINER = "castled";
public static final String UNCOMMITTED_SNAPSHOT = "uncommitted_snapshot";
public static final String COMMITTED_SNAPSHOT = "committed_snapshot";
public static final String COMMITTED_SNAPSHOT_BACKUP = "committed_snapshot_bkp";
public static final String FAILED_RECORDS = "failed_records";
public static final Path WAREHOUSE_UNLOAD_DIR_PATH = Paths.get("warehouse_unloads");
public static final Path FAILURE_RECORDS_DIR = Paths.get("pipeline_failed_records");
public static final Path APP_UPLOADS_PATH = Paths.get("app_uploads");
public static String getQualifiedCommittedSnapshot(String uuid) {
return String.format("%s.%s_%s", CASTLED_CONTAINER, uuid, COMMITTED_SNAPSHOT);
}
public static String getFailedRecordsTable(String uuid) {
return String.format("%s_%s", uuid, FAILED_RECORDS);
}
public static String getQualifiedUncommittedSnapshot(String uuid) {
return String.format("%s.%s_%s", CASTLED_CONTAINER, uuid, UNCOMMITTED_SNAPSHOT);
}
public static String getCommittedSnapshot(String uuid) {
return String.format("%s_%s", uuid, COMMITTED_SNAPSHOT);
}
public static String getUncommittedSnapshot(String uuid) {
return String.format("%s_%s", uuid, UNCOMMITTED_SNAPSHOT);
}
public static String getQualifiedCommittedSnapshotBkp(String uuid) {
return String.format("%s.%s_%s", CASTLED_CONTAINER, uuid, COMMITTED_SNAPSHOT_BACKUP);
}
public static String getCommittedSnapshotBackup(String uuid) {
return String.format("%s_%s", uuid, COMMITTED_SNAPSHOT_BACKUP);
}
}
说明
以上几个定义的中间表是比较重要的,如果运行Castled的话,就会看到类似的表在datawarehouse中,而且不同poller就是利用上边的定义进行数据处理的
参考资料
https://github.com/castledio/castled