/**
* 在 CompletableFuture 基础上增加调用链路的信息
*/
@Component
public class CompletableFutureWrapper {
@Autowired
private ExecutorService asyncRpcExecutorService;
public interface Task<U> {
U callback();
}
public <U> CompletableFuture<U> supplyAsync(Task<U> task){
RpcContext_inner contextInner = EagleEye.getRpcContext();
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return CompletableFuture.supplyAsync(() -> {
try {
if (contextInner != null) {
EagleEye.setRpcContext(contextInner);
}
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
return task.callback();
} finally {
EagleEye.clearRpcContext();
MDC.clear();
}
}, asyncRpcExecutorService);
}
}
记得注册一个线程池asyncRpcExecutorService
而且还有记得对我们增加的线程池进行监控
$ jstack 20275|grep AsyncRpcUtils
"AsyncRpcUtils-pool-99" #484 prio=5 os_prio=0 tid=0x00002b98383f9800 nid=0x58be waiting on condition [0x00002b98c3ae6000]
"AsyncRpcUtils-pool-98" #483 prio=5 os_prio=0 tid=0x00002b9838551800 nid=0x58bd waiting on condition [0x00002b98c39e5000]
...
$ jstack 20275|grep AsyncRpcUtils|grep waiting|wc -l
100
$ jstack 20275|grep AsyncRpcUtils|grep runnable|wc -l
0
$ jstack 20275|grep "java.lang.Thread.State"|awk '{print $2}'|sort|uniq -c
43 RUNNABLE
110 TIMED_WAITING
313 WAITING