Spring中的DeferredResult执行

背景

  在实际业务中,有一些需要接口后端需要处理一定的时间,但是又不能hang住用户接口,需要提前返回,再让用户过一定的时间再去获取对应的结果信息。<br/>
  相信大多数后端小伙伴可能选用Websocket或者是SseEmitter后端(小伙伴们可以看看之前的文章 -> SpringBoot中的服务端消息推送)。又或者让前端小伙伴轮询请求某个单独结果接口(可能你要用上医保了)。<br/>
  于是DeferredResult的存在很好的解决类似问题。

Code

  相关的代码我是直接从网上CV下来一份,如有问题,请私我删除!

DeferredResultController:

@RestController
@RequestMapping(value = "/deferred-result")
public class DeferredResultController {

    @Autowired
    private DeferredResultService deferredResultService;

    private final String requestId = "haha";

    @GetMapping(value = "/getResult")
    public DeferredResult<DeferredResultResponse> getResult(@RequestParam(value = "timeout", required = false, defaultValue = "10000") Long timeout) {
        DeferredResult<DeferredResultResponse> deferredResult = new DeferredResult<>(timeout);
        deferredResultService.getResult(requestId, deferredResult);
        return deferredResult;
    }

    @GetMapping(value = "/result")
    public String settingResult(@RequestParam(value = "desired", required = false, defaultValue = "成功") String desired) {
        DeferredResultResponse deferredResultResponse = new DeferredResultResponse();
        if (DeferredResultResponse.Msg.SUCCESS.getDesc().equals(desired)){
            deferredResultResponse.setCode(HttpStatus.OK.value());
            deferredResultResponse.setMsg(desired);
        }else{
            deferredResultResponse.setCode(HttpStatus.INTERNAL_SERVER_ERROR.value());
            deferredResultResponse.setMsg(DeferredResultResponse.Msg.FAILED.getDesc());
        }
        deferredResultService.settingResult(requestId, deferredResultResponse);
        return "Done";
    }
}

DeferredResultService:

public class DeferredResultService {

    private Map<String, Consumer<DeferredResultResponse>> taskMap;

    public DeferredResultService() {
        taskMap = new ConcurrentHashMap<>();
    }

    public void getResult(String requestId, DeferredResult<DeferredResultResponse> deferredResult) {
        // 请求超时的回调函数
        deferredResult.onTimeout(() -> {
            taskMap.remove(requestId);
            DeferredResultResponse deferredResultResponse = new DeferredResultResponse();
            deferredResultResponse.setCode(HttpStatus.REQUEST_TIMEOUT.value());
            deferredResultResponse.setMsg(DeferredResultResponse.Msg.TIMEOUT.getDesc());
            deferredResult.setResult(deferredResultResponse);
        });

        Optional.ofNullable(taskMap)
                .filter(t -> !t.containsKey(requestId))
                .orElseThrow(() -> new IllegalArgumentException(String.format("requestId=%s is existing", requestId)));
        taskMap.putIfAbsent(requestId, deferredResult::setResult);
    }

    public void settingResult(String requestId, DeferredResultResponse deferredResultResponse) {
        if (taskMap.containsKey(requestId)) {
            Consumer<DeferredResultResponse> deferredResultResponseConsumer = taskMap.get(requestId);
            deferredResultResponseConsumer.accept(deferredResultResponse);
            taskMap.remove(requestId);
        }
    }
}

代码运行情况

   大致可以看出,有两个请求路径,分别对应一个创造数据,一个获取数据,当获取数据使用的时间超时设置的超时时间,则返回对应的超时时间设置的返回值。
  1. 当我们请求获取数据接口,当我们不请求放入数据的接口时,则返回的是TIME_OUT。
  2. 当我们请求获取数据接口,当我们不请求放入数据的接口时,则返回的是SUCCESS。

运行讲解

   放入接口没有什么好理解的,就是一个纯粹的一个同步请求接口。主要关注的是对应的返回值为DeferredResult的接口,主要看看内部是如何进行执行的。从org.springframework.web.servlet.DispatcherServlet#doDispatch执行开始进行。
  1. 经过doDispatch后,接下来会到对应的org.springframework.web.servlet.mvc.method.annotation.DeferredResultMethodReturnValueHandler#handleReturnValue中,判断当前返回值类型。
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
      ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

   if (returnValue == null) {
      mavContainer.setRequestHandled(true);
      return;
   }

   DeferredResult<?> result;

   if (returnValue instanceof DeferredResult) { // 本Demo中的数据类型
      result = (DeferredResult<?>) returnValue;
   }
   else if (returnValue instanceof ListenableFuture) { // 原有Future的增强
      result = adaptListenableFuture((ListenableFuture<?>) returnValue);
   }
   else if (returnValue instanceof CompletionStage) { // 用于异步执行中的阶段处理
      result = adaptCompletionStage((CompletionStage<?>) returnValue);
   }
   else {
      // Should not happen...
      throw new IllegalStateException("Unexpected return value type: " + returnValue);
   }
    WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer); // 获得对应异步管理器并执行
}
  1. 经过包装返回值的类型,接下来就进行到了请求处理:org.springframework.web.context.request.async.WebAsyncManager#startDeferredResultProcessing
public void startDeferredResultProcessing(
      final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {

   Assert.notNull(deferredResult, "DeferredResult must not be null");
   Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");

   Long timeout = deferredResult.getTimeoutValue();
   if (timeout != null) {
      this.asyncWebRequest.setTimeout(timeout);
   }
   // 增加拦截器设置   此时只有第一个和第三个(第一个是DeferredResult类中匿名实现)
   List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>();
   interceptors.add(deferredResult.getInterceptor());
   interceptors.addAll(this.deferredResultInterceptors.values());
   interceptors.add(timeoutDeferredResultInterceptor);

   final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
   // 增加超时  错误  完成回调处理,进行触发
   this.asyncWebRequest.addTimeoutHandler(() -> {
      try {
         interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
      }
      catch (Throwable ex) {
         setConcurrentResultAndDispatch(ex);
      }
   });

   this.asyncWebRequest.addErrorHandler(ex -> {
      if (!this.errorHandlingInProgress) {
         try {
            if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) {
               return;
            }
            deferredResult.setErrorResult(ex);
         }
         catch (Throwable interceptorEx) {
            setConcurrentResultAndDispatch(interceptorEx);
         }
      }
   });

   this.asyncWebRequest.addCompletionHandler(()
         -> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult));
   // 请求初始之前的处理 -空实现
   interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);
   // 开始异步调用处理
   startAsyncProcessing(processingContext);

   try {
      // 
      interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
      deferredResult.setResultHandler(result -> {
         result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
         setConcurrentResultAndDispatch(result);
      });
   }
   catch (Throwable ex) {
      setConcurrentResultAndDispatch(ex);
   }
}
  1. 请求接着往下走,会进行到org.apache.catalina.core.AsyncContextImpl#setStarted。在其中进行异步事件处理。
public void setStarted(Context context, ServletRequest request,
        ServletResponse response, boolean originalRequestResponse) {

    synchronized (asyncContextLock) { 
        this.request.getCoyoteRequest().action(ActionCode.ASYNC_START, this); // 异步事件处理 - 钩子函数
        this.context = context;
        context.incrementInProgressAsyncCount();
        this.servletRequest = request;
        this.servletResponse = response;
        this.hasOriginalRequestAndResponse = originalRequestResponse;
        this.event = new AsyncEvent(this, request, response);
        // 注册异步事件监听器 - 当前为空 不进行监听
        List<AsyncListenerWrapper> listenersCopy = new ArrayList<>(listeners);
        listeners.clear();
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("asyncContextImpl.fireOnStartAsync"));
        }
        for (AsyncListenerWrapper listener : listenersCopy) {
            try {
                listener.fireOnStartAsync(event);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.warn(sm.getString("asyncContextImpl.onStartAsyncError",
                        listener.getClass().getName()), t);
            }
        }
    }
}
  1. 当异步事件处理完成之后,则会还需要对应Dispatch进行响应。
private void setConcurrentResultAndDispatch(Object result) {
   synchronized (WebAsyncManager.this) {
      if (this.concurrentResult != RESULT_NONE) {
         return;
      }
      this.concurrentResult = result;
      this.errorHandlingInProgress = (result instanceof Throwable);
   }

   if (this.asyncWebRequest.isAsyncComplete()) {
      if (logger.isDebugEnabled()) {
         logger.debug("Async result set but request already complete: " + formatRequestUri());
      }
      return;
   }

   if (logger.isDebugEnabled()) {
      boolean isError = result instanceof Throwable;
      logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + formatRequestUri());
   }
   this.asyncWebRequest.dispatch(); // ASYNC_DISPATCH事件注册
}
  1. 判断当前请求是否超时逻辑org.apache.coyote.AbstractProcessor#timeoutAsync:
@Override
public void timeoutAsync(long now) {
    if (now < 0) {
        doTimeoutAsync();
    } else {
        long asyncTimeout = getAsyncTimeout();
        if (asyncTimeout > 0) {
            long asyncStart = asyncStateMachine.getLastAsyncStart();
            if ((now - asyncStart) > asyncTimeout) {
                doTimeoutAsync();
            }
        } else if (!asyncStateMachine.isAvailable()) {
            // Timeout the async process if the associated web application
            // is no longer running.
            doTimeoutAsync();
        }
    }
}


private void doTimeoutAsync() {
    // Avoid multiple timeouts
    setAsyncTimeout(-1);
    asyncTimeoutGeneration = asyncStateMachine.getCurrentGeneration();
    processSocketEvent(SocketEvent.TIMEOUT, true); // 发送timeout事件
}
  1. HTTP事件响应可以看看org.apache.coyote.AbstractProcessor#action类中,针对HTTP各种事件进行各种处理(code太多,这里就不贴了)。

对比Callable异同

相同之处:<br/>
二者都是实现了异步返回数据的作用:<br/>
Callable方式通过提交任务到TaskExecutor进行执行,任务完成之后由DispatcherServlet进行调用返回客户端。 DeferredResult中的结果不一定是当前处理线程放入,当结果存放完成之后再进行DispatcherServlet将结果返回到客户端。<br/>

不同之处:<br/>
结果存放线程不同:<br/>
Callable执行过程与获取结果可以理解为是同一线程完成。DeferredResult的结果存放不一定是当前线程。

总结

   以上是关于DeferredResult使用以及原理的简单介绍,主要让大家了解到当前框架内部支持的返回值类型(相信大家很少在实际使用),具体使用场景需要结合业务情况。
上一篇:线程池监控 - 简易版


下一篇:Spring中的定时器都会了?