背景
在实际业务中,有一些需要接口后端需要处理一定的时间,但是又不能hang
住用户接口,需要提前返回,再让用户过一定的时间再去获取对应的结果信息。<br/>
相信大多数后端小伙伴可能选用Websocket
或者是SseEmitter
后端(小伙伴们可以看看之前的文章 -> SpringBoot中的服务端消息推送)。又或者让前端小伙伴轮询请求某个单独结果接口(可能你要用上医保了)。<br/>
于是DeferredResult
的存在很好的解决类似问题。
Code
相关的代码我是直接从网上CV
下来一份,如有问题,请私我删除!
DeferredResultController:
@RestController
@RequestMapping(value = "/deferred-result")
public class DeferredResultController {
@Autowired
private DeferredResultService deferredResultService;
private final String requestId = "haha";
@GetMapping(value = "/getResult")
public DeferredResult<DeferredResultResponse> getResult(@RequestParam(value = "timeout", required = false, defaultValue = "10000") Long timeout) {
DeferredResult<DeferredResultResponse> deferredResult = new DeferredResult<>(timeout);
deferredResultService.getResult(requestId, deferredResult);
return deferredResult;
}
@GetMapping(value = "/result")
public String settingResult(@RequestParam(value = "desired", required = false, defaultValue = "成功") String desired) {
DeferredResultResponse deferredResultResponse = new DeferredResultResponse();
if (DeferredResultResponse.Msg.SUCCESS.getDesc().equals(desired)){
deferredResultResponse.setCode(HttpStatus.OK.value());
deferredResultResponse.setMsg(desired);
}else{
deferredResultResponse.setCode(HttpStatus.INTERNAL_SERVER_ERROR.value());
deferredResultResponse.setMsg(DeferredResultResponse.Msg.FAILED.getDesc());
}
deferredResultService.settingResult(requestId, deferredResultResponse);
return "Done";
}
}
DeferredResultService:
public class DeferredResultService {
private Map<String, Consumer<DeferredResultResponse>> taskMap;
public DeferredResultService() {
taskMap = new ConcurrentHashMap<>();
}
public void getResult(String requestId, DeferredResult<DeferredResultResponse> deferredResult) {
// 请求超时的回调函数
deferredResult.onTimeout(() -> {
taskMap.remove(requestId);
DeferredResultResponse deferredResultResponse = new DeferredResultResponse();
deferredResultResponse.setCode(HttpStatus.REQUEST_TIMEOUT.value());
deferredResultResponse.setMsg(DeferredResultResponse.Msg.TIMEOUT.getDesc());
deferredResult.setResult(deferredResultResponse);
});
Optional.ofNullable(taskMap)
.filter(t -> !t.containsKey(requestId))
.orElseThrow(() -> new IllegalArgumentException(String.format("requestId=%s is existing", requestId)));
taskMap.putIfAbsent(requestId, deferredResult::setResult);
}
public void settingResult(String requestId, DeferredResultResponse deferredResultResponse) {
if (taskMap.containsKey(requestId)) {
Consumer<DeferredResultResponse> deferredResultResponseConsumer = taskMap.get(requestId);
deferredResultResponseConsumer.accept(deferredResultResponse);
taskMap.remove(requestId);
}
}
}
代码运行情况
大致可以看出,有两个请求路径,分别对应一个创造数据,一个获取数据,当获取数据使用的时间超时设置的超时时间,则返回对应的超时时间设置的返回值。
- 当我们请求获取数据接口,当我们不请求放入数据的接口时,则返回的是TIME_OUT。
- 当我们请求获取数据接口,当我们不请求放入数据的接口时,则返回的是SUCCESS。
运行讲解
放入接口没有什么好理解的,就是一个纯粹的一个同步请求接口。主要关注的是对应的返回值为DeferredResult
的接口,主要看看内部是如何进行执行的。从org.springframework.web.servlet.DispatcherServlet#doDispatch
执行开始进行。
- 经过
doDispatch
后,接下来会到对应的org.springframework.web.servlet.mvc.method.annotation.DeferredResultMethodReturnValueHandler#handleReturnValue
中,判断当前返回值类型。
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
DeferredResult<?> result;
if (returnValue instanceof DeferredResult) { // 本Demo中的数据类型
result = (DeferredResult<?>) returnValue;
}
else if (returnValue instanceof ListenableFuture) { // 原有Future的增强
result = adaptListenableFuture((ListenableFuture<?>) returnValue);
}
else if (returnValue instanceof CompletionStage) { // 用于异步执行中的阶段处理
result = adaptCompletionStage((CompletionStage<?>) returnValue);
}
else {
// Should not happen...
throw new IllegalStateException("Unexpected return value type: " + returnValue);
}
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer); // 获得对应异步管理器并执行
}
- 经过包装返回值的类型,接下来就进行到了请求处理:
org.springframework.web.context.request.async.WebAsyncManager#startDeferredResultProcessing
public void startDeferredResultProcessing(
final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
Assert.notNull(deferredResult, "DeferredResult must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
Long timeout = deferredResult.getTimeoutValue();
if (timeout != null) {
this.asyncWebRequest.setTimeout(timeout);
}
// 增加拦截器设置 此时只有第一个和第三个(第一个是DeferredResult类中匿名实现)
List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>();
interceptors.add(deferredResult.getInterceptor());
interceptors.addAll(this.deferredResultInterceptors.values());
interceptors.add(timeoutDeferredResultInterceptor);
final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
// 增加超时 错误 完成回调处理,进行触发
this.asyncWebRequest.addTimeoutHandler(() -> {
try {
interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
});
this.asyncWebRequest.addErrorHandler(ex -> {
if (!this.errorHandlingInProgress) {
try {
if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) {
return;
}
deferredResult.setErrorResult(ex);
}
catch (Throwable interceptorEx) {
setConcurrentResultAndDispatch(interceptorEx);
}
}
});
this.asyncWebRequest.addCompletionHandler(()
-> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult));
// 请求初始之前的处理 -空实现
interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);
// 开始异步调用处理
startAsyncProcessing(processingContext);
try {
//
interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
deferredResult.setResultHandler(result -> {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
setConcurrentResultAndDispatch(result);
});
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
}
- 请求接着往下走,会进行到
org.apache.catalina.core.AsyncContextImpl#setStarted
。在其中进行异步事件处理。
public void setStarted(Context context, ServletRequest request,
ServletResponse response, boolean originalRequestResponse) {
synchronized (asyncContextLock) {
this.request.getCoyoteRequest().action(ActionCode.ASYNC_START, this); // 异步事件处理 - 钩子函数
this.context = context;
context.incrementInProgressAsyncCount();
this.servletRequest = request;
this.servletResponse = response;
this.hasOriginalRequestAndResponse = originalRequestResponse;
this.event = new AsyncEvent(this, request, response);
// 注册异步事件监听器 - 当前为空 不进行监听
List<AsyncListenerWrapper> listenersCopy = new ArrayList<>(listeners);
listeners.clear();
if (log.isDebugEnabled()) {
log.debug(sm.getString("asyncContextImpl.fireOnStartAsync"));
}
for (AsyncListenerWrapper listener : listenersCopy) {
try {
listener.fireOnStartAsync(event);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.warn(sm.getString("asyncContextImpl.onStartAsyncError",
listener.getClass().getName()), t);
}
}
}
}
- 当异步事件处理完成之后,则会还需要对应
Dispatch
进行响应。
private void setConcurrentResultAndDispatch(Object result) {
synchronized (WebAsyncManager.this) {
if (this.concurrentResult != RESULT_NONE) {
return;
}
this.concurrentResult = result;
this.errorHandlingInProgress = (result instanceof Throwable);
}
if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async result set but request already complete: " + formatRequestUri());
}
return;
}
if (logger.isDebugEnabled()) {
boolean isError = result instanceof Throwable;
logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + formatRequestUri());
}
this.asyncWebRequest.dispatch(); // ASYNC_DISPATCH事件注册
}
- 判断当前请求是否超时逻辑
org.apache.coyote.AbstractProcessor#timeoutAsync
:
@Override
public void timeoutAsync(long now) {
if (now < 0) {
doTimeoutAsync();
} else {
long asyncTimeout = getAsyncTimeout();
if (asyncTimeout > 0) {
long asyncStart = asyncStateMachine.getLastAsyncStart();
if ((now - asyncStart) > asyncTimeout) {
doTimeoutAsync();
}
} else if (!asyncStateMachine.isAvailable()) {
// Timeout the async process if the associated web application
// is no longer running.
doTimeoutAsync();
}
}
}
private void doTimeoutAsync() {
// Avoid multiple timeouts
setAsyncTimeout(-1);
asyncTimeoutGeneration = asyncStateMachine.getCurrentGeneration();
processSocketEvent(SocketEvent.TIMEOUT, true); // 发送timeout事件
}
-
HTTP
事件响应可以看看org.apache.coyote.AbstractProcessor#action
类中,针对HTTP各种事件进行各种处理(code太多,这里就不贴了)。
对比Callable异同
相同之处:<br/>
二者都是实现了异步返回数据的作用:<br/>Callable
方式通过提交任务到TaskExecutor
进行执行,任务完成之后由DispatcherServlet
进行调用返回客户端。 DeferredResult
中的结果不一定是当前处理线程放入,当结果存放完成之后再进行DispatcherServlet
将结果返回到客户端。<br/>
不同之处:<br/>
结果存放线程不同:<br/>Callable
执行过程与获取结果可以理解为是同一线程完成。DeferredResult
的结果存放不一定是当前线程。
总结
以上是关于DeferredResult
使用以及原理的简单介绍,主要让大家了解到当前框架内部支持的返回值类型(相信大家很少在实际使用),具体使用场景需要结合业务情况。