dts reader

  1 package com.aliyun.dts.connect;
  2 
  3 import com.alibaba.fastjson.JSONObject;
  4 import com.aliyun.drc.client.message.DataMessage;
  5 import com.aliyun.drc.clusterclient.ClusterClient;
  6 import com.aliyun.drc.clusterclient.ClusterListener;
  7 import com.aliyun.drc.clusterclient.DefaultClusterClient;
  8 import com.aliyun.drc.clusterclient.RegionContext;
  9 import com.aliyun.drc.clusterclient.message.ClusterMessage;
 10 
 11 import java.io.UnsupportedEncodingException;
 12 import java.util.List;
 13 
 14 public class DTSReader {
 15 
 16     private final ClusterClient dts_client;
 17 
 18     public DTSReader(String accessKey, String accessSecret, String subscribeInstanceID) throws  Exception {
 19         this.dts_client = initClusterClient(accessKey, accessSecret, subscribeInstanceID);
 20     }
 21 
 22 
 23     private ClusterClient initClusterClient(String accessKey, String accessSecret, String subscribeInstanceID)
 24             throws Exception {
 25         // 创建一个context,仅是属性设置
 26         RegionContext context = new RegionContext();
 27         // 运行SDK的服务器是否使用公网IP连接DTS(若使用内网IP访问,需要设置false)
 28         // 在订阅任务启动时,需要网络连接时需要关注该设置项
 29         context.setUsePublicIp(true);
 30         // 设置用户accessKey secret
 31         context.setAccessKey(accessKey);
 32         context.setSecret(accessSecret);
 33 
 34         // 创建消费者
 35         ClusterClient client = new DefaultClusterClient(context);
 36         ClusterListener listener = new ClusterListener() {
 37             // @Override
 38             public void noException(Exception e) {
 39                 // TODO Auto-generated method stub
 40                 e.printStackTrace();
 41             }
 42 
 43 
 44 
 45             // @Override
 46             public void notify(List<ClusterMessage> messages) throws Exception {
 47                 //处理订阅任务收到的消息
 48                 for (ClusterMessage message : messages) {
 49                     replicateMessage(message);
 50                 }
 51             }
 52         };
 53 
 54         client.askForGUID(subscribeInstanceID);
 55         client.addConcurrentListener(listener);
 56 
 57         return client;
 58     }
 59 
 60     private void replicateMessage(final ClusterMessage message) {
 61         // 处理消息
 62         try {
 63             // 转换消息格式并处理
 64             convertRecord(message);
 65             // 确认消息以消费
 66             message.ackAsConsumed();
 67         } catch (Exception e) {
 68             // TODO 根据业务需求进行必要的重试
 69             e.printStackTrace();
 70         }
 71     }
 72 
 73     private void convertRecord(ClusterMessage message) throws UnsupportedEncodingException {
 74         DataMessage.Record record = message.getRecord();
 75         System.out.println("Record Op type:" + record.getOpt().toString());
 76         JSONObject jsonRecord;
 77         String key = null;
 78         switch (record.getOpt()) {
 79             case INSERT: // 数据插入
 80                 jsonRecord = convertFields(record, 0, 1);
 81                 key = record.getPrimaryKeys();
 82                 System.out.println("Record Insert:Json format:" + jsonRecord.toJSONString());
 83                 break;
 84             case UPDATE:// 数据更新
 85             case REPLACE:// replace操作
 86                 JSONObject oldJsonRecord = convertFields(record, 0, 2);
 87                 System.out.println("Record Update Before:Json format:" + oldJsonRecord.toJSONString());
 88                 jsonRecord = convertFields(record, 1, 2);
 89                 System.out.println("Record Update Before:Json format:" + jsonRecord.toJSONString());
 90                 key = record.getPrimaryKeys();
 91                 break;
 92             case DELETE:// 数据删除
 93                 jsonRecord = convertFields(record, 0, 1);
 94                 System.out.println("Record Delete:Json format:" + jsonRecord.toJSONString());
 95                 key = record.getPrimaryKeys();
 96                 break;
 97             default:
 98                 return;
 99         }
100         //数据表中对主Key列名
101         System.out.println("PrimaryKey Column Name:" + key);
102         //drds中物理数据库名和物理数据表名
103         System.out.println("Record DB Name:"+record.getDbname()+",Table Name:"+record.getTablename());
104         //drds中逻辑数据库名和逻辑表名
105         System.out.println("Record Logical DB Name:"+record.getLogicalDbname()+",Table Name:"+record.getLogicalTablename());
106 
107     }
108 
109     // 将消息组成JSON格式输出
110     private JSONObject convertFields(DataMessage.Record record, int start, int step)
111             throws UnsupportedEncodingException {
112         List<DataMessage.Record.Field> fields = record.getFieldList();
113         JSONObject ret = new JSONObject();
114         for (int i = start; i < fields.size(); i += step) {
115             DataMessage.Record.Field field = fields.get(i);
116             JSONObject object = new JSONObject();
117             object.put("type", field.getType().toString());
118             object.put("encoding", field.getEncoding());
119             if (field.getValue() != null) {
120                 object.put("value", field.getValue().toString(field.getEncoding()));
121             } else {
122                 object.put("value", null);
123             }
124             ret.put(field.getFieldname(), object);
125         }
126         return ret;
127     }
128 
129     public void start() throws Exception {
130         System.out.println("Start DTS subscription client...");
131         dts_client.start();
132     }
133 
134     public void stop() throws Exception {
135         System.out.println("Stop DTS Subscription Client...");
136         dts_client.stop();
137     }
138 
139 
140 }

 

上一篇:dtc 编译命令


下一篇:dtb和dtc文件浅析