spring cloud集成canal

前提

win运行canal

 

加入canal依赖

1 <dependency>
2     <groupId>com.alibaba.otter</groupId>
3     <artifactId>canal.client</artifactId>
4     <version>1.1.3</version>
5 </dependency>

 

把ip、端口、监听表名做成配置文件

spring cloud集成canal

 

 

 

代码实现

  1 package com.frame.modules.dabis.archives.thread;
  2 
  3 import com.alibaba.fastjson.JSONObject;
  4 import com.alibaba.otter.canal.client.CanalConnector;
  5 import com.alibaba.otter.canal.client.CanalConnectors;
  6 import com.alibaba.otter.canal.protocol.CanalEntry;
  7 import com.alibaba.otter.canal.protocol.Message;
  8 import com.frame.solr.em.SolrCode;
  9 import com.frame.utils.PropertiesLoader;
 10 import org.apache.commons.logging.Log;
 11 import org.apache.commons.logging.LogFactory;
 12 
 13 import java.net.InetSocketAddress;
 14 import java.util.HashMap;
 15 import java.util.List;
 16 import java.util.Map;
 17 
 18 /**
 19  * @author liwei
 20  * @date 2019/8/2 14:39
 21  * @desc Created with IntelliJ IDEA.
 22  */
 23 public class CanalThread implements Runnable {
 24 
 25     Log log = LogFactory.getLog(CanalThread.class);
 26 
 27     private String solrName = SolrCode.ARCHIVES.getValue();
 28 
 29 
 30     @Override
 31     public void run() {
 32         PropertiesLoader loader = new PropertiesLoader("solrConfig.properties");
 33         listener(loader.getProperty("canalHost"), loader.getProperty("canalPort"), loader.getProperty("canalTable"));
 34     }
 35 
 36 
 37     public void listener(String canalHost, String canalPort, String table) {
 38         // 创建链接
 39         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, Integer.valueOf(canalPort)), "example", "", "");
 40         int batchSize = 1000;
 41         try {
 42             // 连接
 43             connector.connect();
 44             // 监听表
 45             connector.subscribe(table);
 46             connector.rollback();
 47             // 一直循环监听
 48             while (true) {
 49                 // 获取指定数量的数据
 50                 Message message = connector.getWithoutAck(batchSize);
 51                 long batchId = message.getId();
 52                 if(-1 != batchId && 0 != message.getEntries().size()) {
 53                     printEntry(message.getEntries());
 54                 }
 55                 // 提交确认
 56                 connector.ack(batchId);
 57             }
 58         } finally {
 59             connector.disconnect();
 60         }
 61     }
 62 
 63     /**
 64      * 打印具体变化
 65      * @param entrys
 66      */
 67     private void printEntry(List<CanalEntry.Entry> entrys) {
 68         for (CanalEntry.Entry entry : entrys) {
 69             if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType()) || CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
 70                 continue;
 71             }
 72 
 73             CanalEntry.RowChange rowChage = null;
 74             try {
 75                 rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
 76             } catch (Exception e) {
 77                 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
 78                         e);
 79             }
 80 
 81             CanalEntry.EventType eventType = rowChage.getEventType();
 82             System.out.println(String.format("================> binlog[%s:%s] , 数据库:%s,表名%s , 类型: %s",
 83                     entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
 84                     entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
 85                     eventType));
 86 
 87             for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
 88                 if (eventType == CanalEntry.EventType.DELETE) {
 89                     printColumn(rowData.getBeforeColumnsList());
 90                 } else if (eventType == CanalEntry.EventType.INSERT) {
 91                     printColumn(rowData.getAfterColumnsList());
 92                 } else {
 93                     System.out.println("-------修改之前");
 94                     printColumn(rowData.getBeforeColumnsList());
 95                     System.out.println("-------修改之后");
 96                     printColumn(rowData.getAfterColumnsList());
 97                 }
 98             }
 99         }
100     }
101 
102     private void printColumn(List<CanalEntry.Column> columns) {
103         Map<String,Object> aaMap = new HashMap<>();
104         for (CanalEntry.Column column : columns) {
105             aaMap.put(column.getName(), column.getValue());
106         }
107         System.out.println( new JSONObject(aaMap).toJSONString());
108     }
109 }

 

新增

spring cloud集成canal

 

 spring cloud集成canal

 

 

 

修改

spring cloud集成canal

 

 spring cloud集成canal

 

 spring cloud集成canal

 

 

删除

spring cloud集成canal

 

 spring cloud集成canal

 

 spring cloud集成canal

 

 

注意:拿到的值都是字符串,建议拿到id反查数据库,拿到对象再同步到自己的缓存。

 

上一篇:Flink解析kafka canal未压平数据为message报错


下一篇:canal 入门(基于docker)