import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.elasticsearch.monitor.jvm.JvmStats;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* cannal version 1.1.5
* -- userdb.users definition
*
* CREATE TABLE `users` (
* `id` int(11) NOT NULL AUTO_INCREMENT,
* `nick` varchar(100) DEFAULT NULL,
* `phone` varchar(100) NOT NULL,
* `password` varchar(100) DEFAULT NULL,
* `email` varchar(100) DEFAULT NULL,
* `account` varchar(100) DEFAULT NULL,
* PRIMARY KEY (`id`)
* ) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8mb4;
*
* insert into users (phone) values (56);
*
*/
public class CanalClient {
private static String SERVER_ADDRESS = "192.168.9.99";
private static Integer PORT = 11111;
private static String DESTINATION = "example";
private static String USERNAME = "";
private static String PASSWORD = "";
public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD);
canalConnector.connect();
// 订阅
canalConnector.subscribe(".*\\..*");
// 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
// 就是回到以前的状态,恢复到上次同步的位置
canalConnector.rollback();
for (;;){
// getWithoutAck 获取指定数量数据,但是不做确认,即下次还能取到该数据
Message message = canalConnector.getWithoutAck(100);
long id = message.getId();
if (id != -1){
System.out.println("id-->" + id);
printEntity(message.getEntries());
}
// canalConnector.ack(id); // 提交确认
// canalConnector.rollback(id); // 处理失败,回滚数据
}
}
private static void printEntity(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
//开启/关闭事务的实体类型,跳过
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
//获取操作类型:insert/update/delete类型
CanalEntry.EventType eventType = rowChage.getEventType();
//打印Header信息
System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
//判断是否是DDL语句
if (rowChage.getIsDdl()) {
System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
}
//获取RowChange对象里的每一行数据,打印出来
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
//如果是删除语句
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//如果是新增语句
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//如果是更新的语句
} else {
//变更前的数据
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//变更后的数据
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
输出内容:
"C:\Program Files\Java\jdk1.8.0_202\bin\java.exe" -javaagent:C:\Users\thomas\AppData\Local\JetBrains\Toolbox\apps\IDEA-U\ch-0\211.7142.45\lib\idea_rt.jar=64596:C:\Users\thomas\AppData\Local\JetBrains\Toolbox\apps\IDEA-U\ch-0\211.7142.45\bin -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_202\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\rt.jar;C:\yy\coding_project\yy\cannalTest\target\classes;C:\Users\thomas\.m2\repository\com\alibaba\otter\canal.client\1.1.5\canal.client-1.1.5.jar;C:\Users\thomas\.m2\repository\com\google\protobuf\protobuf-java\3.6.1\protobuf-java-3.6.1.jar;C:\Users\thomas\.m2\repository\io\netty\netty-all\4.1.6.Final\netty-all-4.1.6.Final.jar;C:\Users\thomas\.m2\repository\org\apache\zookeeper\zookeeper\3.4.5\zookeeper-3.4.5.jar;C:\Users\thomas\.m2\repository\org\jboss\netty\netty\3.2.2.Final\netty-3.2.2.Final.jar;C:\Users\thomas\.m2\repository\com\101tec\zkclient\0.10\zkclient-0.10.jar;C:\Users\thomas\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\thomas\.m2\repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;C:\Users\thomas\.m2\repository\commons-codec\commons-codec\1.9\commons-codec-1.9.jar;C:\Users\thomas\.m2\repository\com\alibaba\fastjson\1.2.58.sec06\fastjson-1.2.58.sec06.jar;C:\Users\thomas\.m2\repository\com\google\guava\guava\22.0\guava-22.0.jar;C:\Users\thomas\.m2\repository\com\google\errorprone\error_prone_annotations\2.0.18\error_prone_annotations-2.0.18.jar;C:\Users\thomas\.m2\repository\com\google\j2objc\j2objc-annotations\1.1\j2objc-annotations-1.1.jar;C:\Users\thomas\.m2\repository\org\codehaus\mojo\animal-sniffer-annotations\1.14\animal-sniffer-annotations-1.14.jar;C:\Users\thomas\.m2\repository\ch\qos\logback\logback-core\1.1.3\logback-core-1.1.3.jar;C:\Users\thomas\.m2\repository\ch\qos\logback\logback-classic\1.1.3\logback-classic-1.1.3.jar;C:\Users\thomas\.m2\repository\org\slf4j\jcl-over-slf4j\1.7.12\jcl-over-slf4j-1.7.12.jar;C:\Users\thomas\.m2\repository\org\slf4j\slf4j-api\1.7.12\slf4j-api-1.7.12.jar;C:\Users\thomas\.m2\repository\org\springframework\spring-core\5.0.5.RELEASE\spring-core-5.0.5.RELEASE.jar;C:\Users\thomas\.m2\repository\org\springframework\spring-jcl\5.0.5.RELEASE\spring-jcl-5.0.5.RELEASE.jar;C:\Users\thomas\.m2\repository\org\springframework\spring-aop\5.0.5.RELEASE\spring-aop-5.0.5.RELEASE.jar;C:\Users\thomas\.m2\repository\org\springframework\spring-beans\5.0.5.RELEASE\spring-beans-5.0.5.RELEASE.jar;C:\Users\thomas\.m2\repository\org\springframework\spring-context\5.0.5.RELEASE\spring-context-5.0.5.RELEASE.jar;C:\Users\thomas\.m2\repository\org\springframework\spring-expression\5.0.5.RELEASE\spring-expression-5.0.5.RELEASE.jar;C:\Users\thomas\.m2\repository\org\springframework\spring-jdbc\5.0.5.RELEASE\spring-jdbc-5.0.5.RELEASE.jar;C:\Users\thomas\.m2\repository\org\springframework\spring-tx\5.0.5.RELEASE\spring-tx-5.0.5.RELEASE.jar;C:\Users\thomas\.m2\repository\org\springframework\spring-orm\5.0.5.RELEASE\spring-orm-5.0.5.RELEASE.jar;C:\Users\thomas\.m2\repository\com\alibaba\otter\canal.protocol\1.1.5\canal.protocol-1.1.5.jar;C:\Users\thomas\.m2\repository\com\alibaba\otter\canal.common\1.1.5\canal.common-1.1.5.jar;C:\Users\thomas\.m2\repository\org\scala-lang\scala-library\2.12.13\scala-library-2.12.13.jar;C:\Users\thomas\.m2\repository\org\scala-lang\scala-compiler\2.12.13\scala-compiler-2.12.13.jar;C:\Users\thomas\.m2\repository\org\scala-lang\modules\scala-xml_2.12\1.0.6\scala-xml_2.12-1.0.6.jar;C:\Users\thomas\.m2\repository\org\scala-lang\scala-reflect\2.12.13\scala-reflect-2.12.13.jar;C:\Users\thomas\.m2\repository\log4j\log4j\1.2.12\log4j-1.2.12.jar;C:\Users\thomas\.m2\repository\com\google\collections\google-collections\1.0\google-collections-1.0.jar;C:\Users\thomas\.m2\repository\com\facebook\presto\presto-jdbc\0.240\presto-jdbc-0.240.jar;C:\Users\thomas\.m2\repository\org\clapper\grizzled-slf4j_2.12\1.3.4\grizzled-slf4j_2.12-1.3.4.jar;C:\Users\thomas\.m2\repository\com\github\nscala-time\nscala-time_2.12\2.26.0\nscala-time_2.12-2.26.0.jar;C:\Users\thomas\.m2\repository\joda-time\joda-time\2.10.8\joda-time-2.10.8.jar;C:\Users\thomas\.m2\repository\org\joda\joda-convert\2.2.1\joda-convert-2.2.1.jar;C:\Users\thomas\.m2\repository\com\sksamuel\elastic4s\elastic4s-client-esjava_2.13\7.12.1\elastic4s-client-esjava_2.13-7.12.1.jar;C:\Users\thomas\.m2\repository\com\sksamuel\elastic4s\elastic4s-core_2.13\7.12.1\elastic4s-core_2.13-7.12.1.jar;C:\Users\thomas\.m2\repository\com\sksamuel\elastic4s\elastic4s-domain_2.13\7.12.1\elastic4s-domain_2.13-7.12.1.jar;C:\Users\thomas\.m2\repository\com\sksamuel\elastic4s\elastic4s-handlers_2.13\7.12.1\elastic4s-handlers_2.13-7.12.1.jar;C:\Users\thomas\.m2\repository\com\sksamuel\elastic4s\elastic4s-json-builder_2.13\7.12.1\elastic4s-json-builder_2.13-7.12.1.jar;C:\Users\thomas\.m2\repository\com\sksamuel\exts\exts_2.13\1.61.1\exts_2.13-1.61.1.jar;C:\Users\thomas\.m2\repository\com\typesafe\config\1.3.0\config-1.3.0.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\client\elasticsearch-rest-client\7.12.0\elasticsearch-rest-client-7.12.0.jar;C:\Users\thomas\.m2\repository\org\apache\httpcomponents\httpclient\4.5.10\httpclient-4.5.10.jar;C:\Users\thomas\.m2\repository\org\apache\httpcomponents\httpcore\4.4.12\httpcore-4.4.12.jar;C:\Users\thomas\.m2\repository\org\apache\httpcomponents\httpasyncclient\4.1.4\httpasyncclient-4.1.4.jar;C:\Users\thomas\.m2\repository\org\apache\httpcomponents\httpcore-nio\4.4.12\httpcore-nio-4.4.12.jar;C:\Users\thomas\.m2\repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;C:\Users\thomas\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.12.3\jackson-databind-2.12.3.jar;C:\Users\thomas\.m2\repository\com\fasterxml\jackson\module\jackson-module-scala_2.13\2.12.3\jackson-module-scala_2.13-2.12.3.jar;C:\Users\thomas\.m2\repository\com\thoughtworks\paranamer\paranamer\2.8\paranamer-2.8.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\elasticsearch\7.12.1\elasticsearch-7.12.1.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\elasticsearch-core\7.12.1\elasticsearch-core-7.12.1.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\elasticsearch-secure-sm\7.12.1\elasticsearch-secure-sm-7.12.1.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\elasticsearch-x-content\7.12.1\elasticsearch-x-content-7.12.1.jar;C:\Users\thomas\.m2\repository\org\yaml\snakeyaml\1.26\snakeyaml-1.26.jar;C:\Users\thomas\.m2\repository\com\fasterxml\jackson\dataformat\jackson-dataformat-smile\2.10.4\jackson-dataformat-smile-2.10.4.jar;C:\Users\thomas\.m2\repository\com\fasterxml\jackson\dataformat\jackson-dataformat-yaml\2.10.4\jackson-dataformat-yaml-2.10.4.jar;C:\Users\thomas\.m2\repository\com\fasterxml\jackson\dataformat\jackson-dataformat-cbor\2.10.4\jackson-dataformat-cbor-2.10.4.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\elasticsearch-geo\7.12.1\elasticsearch-geo-7.12.1.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-core\8.8.0\lucene-core-8.8.0.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-analyzers-common\8.8.0\lucene-analyzers-common-8.8.0.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-backward-codecs\8.8.0\lucene-backward-codecs-8.8.0.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-grouping\8.8.0\lucene-grouping-8.8.0.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-highlighter\8.8.0\lucene-highlighter-8.8.0.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-join\8.8.0\lucene-join-8.8.0.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-memory\8.8.0\lucene-memory-8.8.0.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-misc\8.8.0\lucene-misc-8.8.0.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-queries\8.8.0\lucene-queries-8.8.0.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-queryparser\8.8.0\lucene-queryparser-8.8.0.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-sandbox\8.8.0\lucene-sandbox-8.8.0.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-spatial-extras\8.8.0\lucene-spatial-extras-8.8.0.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-spatial3d\8.8.0\lucene-spatial3d-8.8.0.jar;C:\Users\thomas\.m2\repository\org\apache\lucene\lucene-suggest\8.8.0\lucene-suggest-8.8.0.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\elasticsearch-cli\7.12.1\elasticsearch-cli-7.12.1.jar;C:\Users\thomas\.m2\repository\net\sf\jopt-simple\jopt-simple\5.0.2\jopt-simple-5.0.2.jar;C:\Users\thomas\.m2\repository\com\carrotsearch\hppc\0.8.1\hppc-0.8.1.jar;C:\Users\thomas\.m2\repository\com\tdunning\t-digest\3.2\t-digest-3.2.jar;C:\Users\thomas\.m2\repository\org\hdrhistogram\HdrHistogram\2.1.9\HdrHistogram-2.1.9.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\jna\5.7.0-1\jna-5.7.0-1.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\elasticsearch-plugin-classloader\7.12.1\elasticsearch-plugin-classloader-7.12.1.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\client\elasticsearch-rest-high-level-client\7.12.1\elasticsearch-rest-high-level-client-7.12.1.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\plugin\mapper-extras-client\7.12.1\mapper-extras-client-7.12.1.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\plugin\parent-join-client\7.12.1\parent-join-client-7.12.1.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\plugin\aggs-matrix-stats-client\7.12.1\aggs-matrix-stats-client-7.12.1.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\plugin\rank-eval-client\7.12.1\rank-eval-client-7.12.1.jar;C:\Users\thomas\.m2\repository\org\elasticsearch\plugin\lang-mustache-client\7.12.1\lang-mustache-client-7.12.1.jar;C:\Users\thomas\.m2\repository\com\github\spullara\mustache\java\compiler\0.9.6\compiler-0.9.6.jar;C:\Users\thomas\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.12.3\jackson-core-2.12.3.jar;C:\Users\thomas\.m2\repository\org\apache\logging\log4j\log4j-api\2.8.2\log4j-api-2.8.2.jar;C:\Users\thomas\.m2\repository\org\apache\logging\log4j\log4j-core\2.8.2\log4j-core-2.8.2.jar;C:\Users\thomas\.m2\repository\com\typesafe\play\play-json_2.13\2.10.0-RC2\play-json_2.13-2.10.0-RC2.jar;C:\Users\thomas\.m2\repository\com\typesafe\play\play-functional_2.13\2.10.0-RC2\play-functional_2.13-2.10.0-RC2.jar;C:\Users\thomas\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.10.5\jackson-datatype-jdk8-2.10.5.jar;C:\Users\thomas\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.10.5\jackson-datatype-jsr310-2.10.5.jar;C:\Users\thomas\.m2\repository\io\circe\circe-core_2.12\0.14.0-M7\circe-core_2.12-0.14.0-M7.jar;C:\Users\thomas\.m2\repository\io\circe\circe-numbers_2.12\0.14.0-M7\circe-numbers_2.12-0.14.0-M7.jar;C:\Users\thomas\.m2\repository\org\typelevel\cats-core_2.12\2.6.1\cats-core_2.12-2.6.1.jar;C:\Users\thomas\.m2\repository\org\typelevel\cats-kernel_2.12\2.6.1\cats-kernel_2.12-2.6.1.jar;C:\Users\thomas\.m2\repository\org\typelevel\simulacrum-scalafix-annotations_2.12\0.5.4\simulacrum-scalafix-annotations_2.12-0.5.4.jar;C:\Users\thomas\.m2\repository\io\circe\circe-generic_2.12\0.14.0-M7\circe-generic_2.12-0.14.0-M7.jar;C:\Users\thomas\.m2\repository\com\chuusai\shapeless_2.12\2.3.3\shapeless_2.12-2.3.3.jar;C:\Users\thomas\.m2\repository\org\typelevel\macro-compat_2.12\1.1.1\macro-compat_2.12-1.1.1.jar;C:\Users\thomas\.m2\repository\io\circe\circe-parser_2.12\0.14.0-M7\circe-parser_2.12-0.14.0-M7.jar;C:\Users\thomas\.m2\repository\io\circe\circe-jawn_2.12\0.14.0-M7\circe-jawn_2.12-0.14.0-M7.jar;C:\Users\thomas\.m2\repository\org\typelevel\jawn-parser_2.12\1.1.2\jawn-parser_2.12-1.1.2.jar;C:\Users\thomas\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.12.3\jackson-annotations-2.12.3.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-runtime_2.12\1.13.1\flink-runtime_2.12-1.13.1.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-queryable-state-client-java\1.13.1\flink-queryable-state-client-java-1.13.1.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-hadoop-fs\1.13.1\flink-hadoop-fs-1.13.1.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.49.Final-13.0\flink-shaded-netty-4.1.49.Final-13.0.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-shaded-jackson\2.12.1-13.0\flink-shaded-jackson-2.12.1-13.0.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-13.0\flink-shaded-zookeeper-3-3.4.14-13.0.jar;C:\Users\thomas\.m2\repository\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;C:\Users\thomas\.m2\repository\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;C:\Users\thomas\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;C:\Users\thomas\.m2\repository\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;C:\Users\thomas\.m2\repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;C:\Users\thomas\.m2\repository\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;C:\Users\thomas\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;C:\Users\thomas\.m2\repository\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;C:\Users\thomas\.m2\repository\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;C:\Users\thomas\.m2\repository\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;C:\Users\thomas\.m2\repository\org\xerial\snappy\snappy-java\1.1.8.3\snappy-java-1.1.8.3.jar;C:\Users\thomas\.m2\repository\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;C:\Users\thomas\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\thomas\.m2\repository\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-java\1.13.1\flink-java-1.13.1.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-13.0\flink-shaded-guava-18.0-13.0.jar;C:\Users\thomas\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\thomas\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\thomas\.m2\repository\org\apache\flink\force-shading\1.13.1\force-shading-1.13.1.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-core\1.13.1\flink-core-1.13.1.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-annotations\1.13.1\flink-annotations-1.13.1.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-metrics-core\1.13.1\flink-metrics-core-1.13.1.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-shaded-asm-7\7.1-13.0\flink-shaded-asm-7-7.1-13.0.jar;C:\Users\thomas\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\thomas\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\thomas\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\thomas\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\thomas\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\thomas\.m2\repository\org\apache\commons\commons-compress\1.20\commons-compress-1.20.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-clients_2.12\1.13.1\flink-clients_2.12-1.13.1.jar;C:\Users\thomas\.m2\repository\org\apache\flink\flink-optimizer_2.12\1.13.1\flink-optimizer_2.12-1.13.1.jar;C:\Users\thomas\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar" CanalClient
id-->31
================》; binlog[mysql-bin.000001:4543] , name[userdb,users] , eventType : INSERT
id : 13 update=true
nick : update=true
phone : 4 update=true
password : update=true
email : update=true
account : update=true
================》; binlog[mysql-bin.000001:4821] , name[userdb,users] , eventType : INSERT
id : 14 update=true
nick : update=true
phone : 5 update=true
password : update=true
email : update=true
account : update=true
================》; binlog[mysql-bin.000001:5099] , name[userdb,users] , eventType : INSERT
id : 15 update=true
nick : update=true
phone : 56 update=true
password : update=true
email : update=true
account : update=true
================》; binlog[mysql-bin.000001:5372] , name[userdb,users] , eventType : DELETE
id : 10 update=false
nick : update=false
phone : 1 update=false
password : update=false
email : update=false
account : update=false
================》; binlog[mysql-bin.000001:5644] , name[userdb,users] , eventType : DELETE
id : 15 update=false
nick : update=false
phone : 56 update=false
password : update=false
email : update=false
account : update=false
================》; binlog[mysql-bin.000001:5917] , name[userdb,users] , eventType : UPDATE
------->; before
id : 12 update=false
nick : update=false
phone : 3 update=false
password : update=false
email : update=false
account : update=false
------->; after
id : 12 update=false
nick : update=false
phone : 33 update=true
password : update=false
email : update=false
account : update=false