private final ThreadPoolExecutor pool;
pool = new BlockedThreadPoolExecutor(maxThreads,
maxThreads,
1, TimeUnit.DAYS,
new ArrayBlockingQueue(maxThreads),
new CustomNameThreadFactory("FaceBaseClient"));
futures.add(pool.submit(new ImportTask(index++, url, user, passwd,
this.getFetchSql(startRyid),batch,
faceBaseName, metrics, conf)));
public Integer call() {
int total = 0;
Connection con = null;
Statement stmt = null;
System.out.println("Import job ‘" + index + "‘ is starting...");
FaceBaseClient client = null;
try{
this.metrics.incrementTaskCount();
client = new FaceBaseClient(conf);
Class.forName("oracle.jdbc.driver.OracleDriver");
con = DriverManager.getConnection(url, user, passwd);
System.out.println("TaskID=" + index + ", connect db...");
stmt = con.createStatement();
System.out.println("TaskID=" + index + ", query: " + sql);
ResultSet results = stmt.executeQuery(sql);
List<FaceRecord> records = new ArrayList<FaceRecord>();
while(results.next()){
try{
records.add(resultSet2FaceRecord(results));
}catch (SQLException e1){
System.out.println("TaskID=" + index + ", Exception:" + e1.getMessage() + ", continue..");
}
if(records.size() >= this.batch){
client.updateFaceRecords(faceBaseName, records, false);
this.metrics.addAndGet(records.size());
records.clear();
}
total++;
}
if(records.size() > 0){
client.insertFaceRecords(faceBaseName, records);
this.metrics.addAndGet(records.size());
records.clear();
}
}catch (Exception e){
System.out.println("TaskID=" + index + ", Exception:" + e.getMessage());
e.printStackTrace();
} finally {
if(con != null){
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(client != null){
client.cleanup();
}
}
this.metrics.incrementFinishedTaskCount();
return total;
}