1 package com.aliyun.dts.connect; 2 3 import com.alibaba.fastjson.JSONObject; 4 import com.aliyun.drc.client.message.DataMessage; 5 import com.aliyun.drc.clusterclient.ClusterClient; 6 import com.aliyun.drc.clusterclient.ClusterListener; 7 import com.aliyun.drc.clusterclient.DefaultClusterClient; 8 import com.aliyun.drc.clusterclient.RegionContext; 9 import com.aliyun.drc.clusterclient.message.ClusterMessage; 10 11 import java.io.UnsupportedEncodingException; 12 import java.util.List; 13 14 public class DTSReader { 15 16 private final ClusterClient dts_client; 17 18 public DTSReader(String accessKey, String accessSecret, String subscribeInstanceID) throws Exception { 19 this.dts_client = initClusterClient(accessKey, accessSecret, subscribeInstanceID); 20 } 21 22 23 private ClusterClient initClusterClient(String accessKey, String accessSecret, String subscribeInstanceID) 24 throws Exception { 25 // 创建一个context,仅是属性设置 26 RegionContext context = new RegionContext(); 27 // 运行SDK的服务器是否使用公网IP连接DTS(若使用内网IP访问,需要设置false) 28 // 在订阅任务启动时,需要网络连接时需要关注该设置项 29 context.setUsePublicIp(true); 30 // 设置用户accessKey secret 31 context.setAccessKey(accessKey); 32 context.setSecret(accessSecret); 33 34 // 创建消费者 35 ClusterClient client = new DefaultClusterClient(context); 36 ClusterListener listener = new ClusterListener() { 37 // @Override 38 public void noException(Exception e) { 39 // TODO Auto-generated method stub 40 e.printStackTrace(); 41 } 42 43 44 45 // @Override 46 public void notify(List<ClusterMessage> messages) throws Exception { 47 //处理订阅任务收到的消息 48 for (ClusterMessage message : messages) { 49 replicateMessage(message); 50 } 51 } 52 }; 53 54 client.askForGUID(subscribeInstanceID); 55 client.addConcurrentListener(listener); 56 57 return client; 58 } 59 60 private void replicateMessage(final ClusterMessage message) { 61 // 处理消息 62 try { 63 // 转换消息格式并处理 64 convertRecord(message); 65 // 确认消息以消费 66 message.ackAsConsumed(); 67 } catch (Exception e) { 68 // TODO 根据业务需求进行必要的重试 69 e.printStackTrace(); 70 } 71 } 72 73 private void convertRecord(ClusterMessage message) throws UnsupportedEncodingException { 74 DataMessage.Record record = message.getRecord(); 75 System.out.println("Record Op type:" + record.getOpt().toString()); 76 JSONObject jsonRecord; 77 String key = null; 78 switch (record.getOpt()) { 79 case INSERT: // 数据插入 80 jsonRecord = convertFields(record, 0, 1); 81 key = record.getPrimaryKeys(); 82 System.out.println("Record Insert:Json format:" + jsonRecord.toJSONString()); 83 break; 84 case UPDATE:// 数据更新 85 case REPLACE:// replace操作 86 JSONObject oldJsonRecord = convertFields(record, 0, 2); 87 System.out.println("Record Update Before:Json format:" + oldJsonRecord.toJSONString()); 88 jsonRecord = convertFields(record, 1, 2); 89 System.out.println("Record Update Before:Json format:" + jsonRecord.toJSONString()); 90 key = record.getPrimaryKeys(); 91 break; 92 case DELETE:// 数据删除 93 jsonRecord = convertFields(record, 0, 1); 94 System.out.println("Record Delete:Json format:" + jsonRecord.toJSONString()); 95 key = record.getPrimaryKeys(); 96 break; 97 default: 98 return; 99 } 100 //数据表中对主Key列名 101 System.out.println("PrimaryKey Column Name:" + key); 102 //drds中物理数据库名和物理数据表名 103 System.out.println("Record DB Name:"+record.getDbname()+",Table Name:"+record.getTablename()); 104 //drds中逻辑数据库名和逻辑表名 105 System.out.println("Record Logical DB Name:"+record.getLogicalDbname()+",Table Name:"+record.getLogicalTablename()); 106 107 } 108 109 // 将消息组成JSON格式输出 110 private JSONObject convertFields(DataMessage.Record record, int start, int step) 111 throws UnsupportedEncodingException { 112 List<DataMessage.Record.Field> fields = record.getFieldList(); 113 JSONObject ret = new JSONObject(); 114 for (int i = start; i < fields.size(); i += step) { 115 DataMessage.Record.Field field = fields.get(i); 116 JSONObject object = new JSONObject(); 117 object.put("type", field.getType().toString()); 118 object.put("encoding", field.getEncoding()); 119 if (field.getValue() != null) { 120 object.put("value", field.getValue().toString(field.getEncoding())); 121 } else { 122 object.put("value", null); 123 } 124 ret.put(field.getFieldname(), object); 125 } 126 return ret; 127 } 128 129 public void start() throws Exception { 130 System.out.println("Start DTS subscription client..."); 131 dts_client.start(); 132 } 133 134 public void stop() throws Exception { 135 System.out.println("Stop DTS Subscription Client..."); 136 dts_client.stop(); 137 } 138 139 140 }