添加一个类ThreadPoolConfig.java
package com.cjcx.inter.framework.config; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor; @Configuration
@EnableAsync
public class ThreadPoolConfig {
/** Set the ThreadPoolExecutor's core pool size. */
private int corePoolSize = 3;
/** Set the ThreadPoolExecutor's maximum pool size. */
private int maxPoolSize = 7;
/** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */
private int queueCapacity = 5; @Bean
public Executor interfaceAsync() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("interface-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
方法调用
public boolean doSendTask(ShoppingReceiptContentDto dto) {
boolean bool = false;
logger.info("单号:{}, ====开始上传数据", dto.getOrderNum());
try {
Future<HashMap<String, Object>> future = testAync(dto);
long s = System.currentTimeMillis();
boolean flag = true;
while (flag) {
//异步任务完成并且未被取消,则获取返回的结果
if (future.isDone() && !future.isCancelled()) {
HashMap<String, Object> futureResult = future.get();
logger.info("单号:{}, ====上传的结果是:{}", dto.getOrderNum(), futureResult.get("errorCode"));
Integer errorCode = Integer.parseInt(futureResult.get("errorCode").toString());
bool = errorCode == 0 ? true : false;
flag = false;
} //如果3秒内没有响应,取消任务
if (System.currentTimeMillis() - s > 5000) {
logger.info("单号:{}, ====上传超时,5秒钟内K11服务器无返回", dto.getOrderNum());
future.cancel(true);
bool = false;
flag = false;
}
}
} catch (Exception e) {
e.printStackTrace();
}
logger.info("单号:{}, ====结束, 上传{}", dto.getOrderNum(), (bool ? "成功" : "失败"));
return bool;
}
@Async("interfaceAsync")
public Future<HashMap<String, Object>> testAync(ShoppingReceiptContentDto dto) throws InterruptedException {
HashMap<String, Object> map = new HashMap<>(); String k11Response = "";
try {
//处理数据map.put("errorCode", 0); //测试先改为成功
} catch (Exception e) {
e.printStackTrace();
map.put("errorCode", -10001);
}
return new AsyncResult<>(map);
}
@Aync 内部实现也就是FutureTask