[es与数据库同步写模式]四海骄阳

[es与数据库同步写模式]四海骄阳

publicclassWorker{

privatestaticfinalLoggerlogger=LoggerFactory.getLogger(Worker.class);

privatestaticJdbcTemplatejdbcTemplate;

privatefinalObjectMappermapper=newObjectMapper();

privateZKConnectorzkClient=null;

privateTransportClientclient=null;

privateTimestampcurrentTimestamp=null;

privateTimestamppreviousTimestamp=null;

privatestaticfinalStringoggSql="select*fromt_ordert0leftjoint_order_attachedinfot1ont0.order_id=t1.order_idwhere";

privateStringsql;

publicStringgetSql(){

returnsql;

}

publicvoidsetSql(Stringsql){

this.sql=sql;

}

privateTransportClientgetClient(){

Settingssettings=Settings.settingsBuilder().put("cluster.name",Constant.CLUSTER).build();

TransportClientclient=TransportClient.builder().settings(settings).build();

try{

client.addTransportAddress(newInetSocketTransportAddress(InetAddress.getByName(Constant.ESHOST),Constant.ESPORT));

}catch(UnknownHostExceptione){

e.printStackTrace();

}

returnclient;

}

publicWorker(AbstractApplicationContextctx){

//初始化Oracle连接

jdbcTemplate=(JdbcTemplate)ctx.getBean("jdbcTemplate");

client=getClient();

zkClient=newZKConnector();

zkClient.createConnection(Constant.ZKSERVER,Constant.SESSION_TIMEOUT);

//初始化zookeeper锁,由于zookeeper不能联级创建

if(!zkClient.exist(Constant.ZK_PATH)){

zkClient.createPersistNode(Constant.ZK_PATH,"");

}

if(currentTimestamp==null){

StringzkTimestamp=zkClient.readData(Constant.NODE_PATH);

if(zkTimestamp!=null&&!zkTimestamp.equals(""))

{

try

{

currentTimestamp=Timestamp.valueOf(zkTimestamp);

logger.info("获取zookeeper最后同步时间:"+currentTimestamp);

}catch(Exceptione){

zkClient.deleteNode(Constant.NODE_PATH);

}

}

}

}

publicvoiddoWork(){

logger.info("start...");

//一直进行同步工作

while(true){

Stringsqlwhere="";

//根据时间戳获取Mycat中规则表数据

Stringsql="";

//若最后一次同步时间为空,则按最后更新时间排序,取最小的时间作为当前时间戳

if(currentTimestamp!=null){

sql="selectorder_id,timestampfromt_order_changeswhererownumto_timestamp(‘"+currentTimestamp.toString()+"‘,‘yyyy-mm-ddhh24:mi:ss.ff6‘)";

}else{

sql="selectorder_id,timestampfromt_order_changeswhererownum

}

//查詢该时间段的订单id

Listids=newArrayList();

//升序会将最后一次的时间也就是最大的时间作为当前的currentTimeStamp

ids=jdbcTemplate.query(sql,newObject[]{},newRowMapper()

{

publicStringmapRow(ResultSetresult,introwNum)throwsSQLException{

currentTimestamp=result.getTimestamp("timestamp");

returnresult.getString("order_id");

}

});

if(ids.size()==0){

continue;

}

inti=0;

ListcheckIds=newArrayList();

for(Stringid:ids){

//若存在更新的id则跳过

if(checkIds.contains(id)){

continue;

}

if(i==0){

sqlwhere=sqlwhere.concat("t0.order_id=‘"+id+"‘");

}else{

sqlwhere=sqlwhere.concat("ort0.order_id=‘"+id+"‘");

}

checkIds.add(id);

i++;

}

System.out.println(oggSql.concat(sqlwhere));

//objs即是Oracle里面查询出来需要同步的数据

Listobjs=jdbcTemplate.query(oggSql.concat(sqlwhere),newObject[]{},newRowMapper()

{

SimpleDateFormatsdf=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss");

publicJSONObjectmapRow(ResultSetresult,introwNum)throwsSQLException{

intc=result.getMetaData().getColumnCount();

JSONObjectobj=newJSONObject();

for(intt=1;t

{

if(result.getObject(t)==null)

{

continue;

}

if(result.getMetaData().getColumnType(t)==Types.DATE)

{

obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(),result.getDate(t));

}elseif(result.getMetaData().getColumnType(t)==Types.TIMESTAMP)

{

Datedate=newDate(result.getTimestamp(t).getTime());

Stringf=sdf.format(date);

obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(),sdf.format(date));

}else

{

obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(),result.getObject(t));

}

}

returnobj;

}

});

BulkRequestBuilderbulkRequest=null;

try{

bulkRequest=client.prepareBulk();

for(JSONObjectobj:objs){

byte[]json;

try{

json=mapper.writeValueAsBytes(obj);

bulkRequest.add(newIndexRequest(Constant.INDEX,Constant.INDEX,obj.getString("order_id"))

.source(json));

}catch(JsonProcessingExceptione){

e.printStackTrace();

}

}

BulkResponsebulkResponse=bulkRequest.get();

if(bulkResponse.hasFailures()){

logger.info("====================批量创建索引过程中出现错误下面是错误信息==========================");

longcount=0L;

for(BulkItemResponsebulkItemResponse:bulkResponse){

System.out.println("发生错误的索引id为:"+bulkItemResponse.getId()+",错误信息为:"+bulkItemResponse.getFailureMessage());

count++;

}

logger.info("====================批量创建索引过程中出现错误上面是错误信息共有:"+count+"条记录==========================");

currentTimestamp=previousTimestamp;

}else{

logger.info("Thelastestcurrenttimestamp:".concat(currentTimestamp.toString()));

previousTimestamp=currentTimestamp;

//将写入成功后的时间写到zookeeper中

zkClient.writeData(Constant.NODE_PATH,String.valueOf(currentTimestamp));

}

}catch(NoNodeAvailableExceptione){

currentTimestamp=previousTimestamp;

e.printStackTrace();

}

}

}

}

[es与数据库同步写模式]四海骄阳

[es与数据库同步写模式]四海骄阳

上一篇:sql中#与$的区别


下一篇:PHPCMS V9{loop subcat(0,0,0,$siteid) $r}怎么解释?