Java 异步化处理保证trace信息不丢失

/**
 * 在 CompletableFuture 基础上增加调用链路的信息
 */
@Component
public class CompletableFutureWrapper {

    @Autowired
    private ExecutorService asyncRpcExecutorService;

    public interface Task<U> {
        U callback();
    }

    public <U> CompletableFuture<U> supplyAsync(Task<U> task){

        RpcContext_inner contextInner = EagleEye.getRpcContext();
        Map<String, String> contextMap = MDC.getCopyOfContextMap();

        return CompletableFuture.supplyAsync(() -> {
            try {
                if (contextInner != null) {
                    EagleEye.setRpcContext(contextInner);
                }

                if (contextMap != null) {
                    MDC.setContextMap(contextMap);
                }

                return task.callback();
            } finally {
                EagleEye.clearRpcContext();
                MDC.clear();
            }
        }, asyncRpcExecutorService);
    }
}

记得注册一个线程池asyncRpcExecutorService

而且还有记得对我们增加的线程池进行监控

$ jstack 20275|grep AsyncRpcUtils
"AsyncRpcUtils-pool-99" #484 prio=5 os_prio=0 tid=0x00002b98383f9800 nid=0x58be waiting on condition [0x00002b98c3ae6000]
"AsyncRpcUtils-pool-98" #483 prio=5 os_prio=0 tid=0x00002b9838551800 nid=0x58bd waiting on condition [0x00002b98c39e5000]
...
$ jstack 20275|grep AsyncRpcUtils|grep waiting|wc -l
100
$ jstack 20275|grep AsyncRpcUtils|grep runnable|wc -l
0
$ jstack 20275|grep "java.lang.Thread.State"|awk '{print $2}'|sort|uniq -c
     43 RUNNABLE
    110 TIMED_WAITING
    313 WAITING
上一篇:小白租服务器做直播平台需要注意什么?多少钱?


下一篇:Heapsort