publicclassWorker{
privatestaticfinalLoggerlogger=LoggerFactory.getLogger(Worker.class);
privatestaticJdbcTemplatejdbcTemplate;
privatefinalObjectMappermapper=newObjectMapper();
privateZKConnectorzkClient=null;
privateTransportClientclient=null;
privateTimestampcurrentTimestamp=null;
privateTimestamppreviousTimestamp=null;
privatestaticfinalStringoggSql="select*fromt_ordert0leftjoint_order_attachedinfot1ont0.order_id=t1.order_idwhere";
privateStringsql;
publicStringgetSql(){
returnsql;
}
publicvoidsetSql(Stringsql){
this.sql=sql;
}
privateTransportClientgetClient(){
Settingssettings=Settings.settingsBuilder().put("cluster.name",Constant.CLUSTER).build();
TransportClientclient=TransportClient.builder().settings(settings).build();
try{
client.addTransportAddress(newInetSocketTransportAddress(InetAddress.getByName(Constant.ESHOST),Constant.ESPORT));
}catch(UnknownHostExceptione){
e.printStackTrace();
}
returnclient;
}
publicWorker(AbstractApplicationContextctx){
//初始化Oracle连接
jdbcTemplate=(JdbcTemplate)ctx.getBean("jdbcTemplate");
client=getClient();
zkClient=newZKConnector();
zkClient.createConnection(Constant.ZKSERVER,Constant.SESSION_TIMEOUT);
//初始化zookeeper锁,由于zookeeper不能联级创建
if(!zkClient.exist(Constant.ZK_PATH)){
zkClient.createPersistNode(Constant.ZK_PATH,"");
}
if(currentTimestamp==null){
StringzkTimestamp=zkClient.readData(Constant.NODE_PATH);
if(zkTimestamp!=null&&!zkTimestamp.equals(""))
{
try
{
currentTimestamp=Timestamp.valueOf(zkTimestamp);
logger.info("获取zookeeper最后同步时间:"+currentTimestamp);
}catch(Exceptione){
zkClient.deleteNode(Constant.NODE_PATH);
}
}
}
}
publicvoiddoWork(){
logger.info("start...");
//一直进行同步工作
while(true){
Stringsqlwhere="";
//根据时间戳获取Mycat中规则表数据
Stringsql="";
//若最后一次同步时间为空,则按最后更新时间排序,取最小的时间作为当前时间戳
if(currentTimestamp!=null){
sql="selectorder_id,timestampfromt_order_changeswhererownumto_timestamp(‘"+currentTimestamp.toString()+"‘,‘yyyy-mm-ddhh24:mi:ss.ff6‘)";
}else{
sql="selectorder_id,timestampfromt_order_changeswhererownum
}
//查詢该时间段的订单id
List
//升序会将最后一次的时间也就是最大的时间作为当前的currentTimeStamp
ids=jdbcTemplate.query(sql,newObject[]{},newRowMapper
{
publicStringmapRow(ResultSetresult,introwNum)throwsSQLException{
currentTimestamp=result.getTimestamp("timestamp");
returnresult.getString("order_id");
}
});
if(ids.size()==0){
continue;
}
inti=0;
List
for(Stringid:ids){
//若存在更新的id则跳过
if(checkIds.contains(id)){
continue;
}
if(i==0){
sqlwhere=sqlwhere.concat("t0.order_id=‘"+id+"‘");
}else{
sqlwhere=sqlwhere.concat("ort0.order_id=‘"+id+"‘");
}
checkIds.add(id);
i++;
}
System.out.println(oggSql.concat(sqlwhere));
//objs即是Oracle里面查询出来需要同步的数据
List
{
SimpleDateFormatsdf=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss");
publicJSONObjectmapRow(ResultSetresult,introwNum)throwsSQLException{
intc=result.getMetaData().getColumnCount();
JSONObjectobj=newJSONObject();
for(intt=1;t
{
if(result.getObject(t)==null)
{
continue;
}
if(result.getMetaData().getColumnType(t)==Types.DATE)
{
obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(),result.getDate(t));
}elseif(result.getMetaData().getColumnType(t)==Types.TIMESTAMP)
{
Datedate=newDate(result.getTimestamp(t).getTime());
Stringf=sdf.format(date);
obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(),sdf.format(date));
}else
{
obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(),result.getObject(t));
}
}
returnobj;
}
});
BulkRequestBuilderbulkRequest=null;
try{
bulkRequest=client.prepareBulk();
for(JSONObjectobj:objs){
byte[]json;
try{
json=mapper.writeValueAsBytes(obj);
bulkRequest.add(newIndexRequest(Constant.INDEX,Constant.INDEX,obj.getString("order_id"))
.source(json));
}catch(JsonProcessingExceptione){
e.printStackTrace();
}
}
BulkResponsebulkResponse=bulkRequest.get();
if(bulkResponse.hasFailures()){
logger.info("====================批量创建索引过程中出现错误下面是错误信息==========================");
longcount=0L;
for(BulkItemResponsebulkItemResponse:bulkResponse){
System.out.println("发生错误的索引id为:"+bulkItemResponse.getId()+",错误信息为:"+bulkItemResponse.getFailureMessage());
count++;
}
logger.info("====================批量创建索引过程中出现错误上面是错误信息共有:"+count+"条记录==========================");
currentTimestamp=previousTimestamp;
}else{
logger.info("Thelastestcurrenttimestamp:".concat(currentTimestamp.toString()));
previousTimestamp=currentTimestamp;
//将写入成功后的时间写到zookeeper中
zkClient.writeData(Constant.NODE_PATH,String.valueOf(currentTimestamp));
}
}catch(NoNodeAvailableExceptione){
currentTimestamp=previousTimestamp;
e.printStackTrace();
}
}
}
}