文章目录
1、实现流程
- @EnableAsync 开启异步注解,注入BeanPostProcess
- 对类中有方法带有@Async的进行拦截创建代理类
- 最终会由AnnotationAsyncExecutionInterceptor处理异步方法
2、源码解析
通过流程顺序,异步一句解析
2.1 @EnableAsync
直接看代码
- 通过Import导入一个Selector
- 导入配置类ProxyAsyncConfiguration(默认)
- 通过配置类导入AsyncAnnotationBeanPostProcessor
//通过Import导入一个Selector
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
//.......省略
}
#导入配置类ProxyAsyncConfiguration
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
/**
* Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
* for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
* respectively.
*/
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
//通过配置类导入BeanPostProcess
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
2.2 AsyncAnnotationBeanPostProcessor
核心方法
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
//暂时忽略
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
//最后我们详细讲解
//判断Bean是否有资格创建代理
//通过Advisor的Pointcut判断bean是否有资格,此处可以简单理解为判断Class或Class中的Method是否标记@Async(满足一个即可)
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
//创建代理对象需要的Advisor(最中方法的异步执行都在这个类中) 最后我们详细讲解
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
// Use original ClassLoader if bean class not locally loaded in overriding class loader
ClassLoader classLoader = getProxyClassLoader();
if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {
classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();
}
//创建代理对象返回
return proxyFactory.getProxy(classLoader);
}
// No proxy needed.
return bean;
}
2.3 AnnotationAsyncExecutionInterceptor
@Async标记的方法是怎么异步执行的,又是怎么可以返回结果的
核心代码
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
//创建线程任务
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//传递任务,Executor,和方法的返回值。 @Async是否需要返回值就是根据方法的返回值类型判断的
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
//判断返回值类型,执行方式不同
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
3、深度解析
后续会不上AOP章节
前面我们提到了几个点,怎么判断@Async,为什么会最终AnnotationAsyncExecutionInterceptor执行的。 这些需要又AOP的前置知识,不然会一头雾水。
基础知识虽然很枯燥,但是很重要,是一切实现的基础。
3.1 怎么判断类是否需要被代理
通过AopUtils判断,此处传入了一个advisor.
Advisor接口整合了Adivce和Pointcut。
最终执行方法的AnnotationAsyncExecutionInterceptor就是一个Advice。就是在当前的this.advisor对象中存储。
Pointcut是在Aop中实现判断的,整合CalssFilter和MethodMatcher.
protected boolean isEligible(Class<?> targetClass) {
Boolean eligible = this.eligibleBeans.get(targetClass);
if (eligible != null) {
return eligible;
}
if (this.advisor == null) {
return false;
}
//此处判断
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}
判断方法
public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
Assert.notNull(pc, "Pointcut must not be null");
if (!pc.getClassFilter().matches(targetClass)) {
return false;
}
//通过Pointcut判断. 此处使用ClassFilterAwareUnionMethodMatcher
MethodMatcher methodMatcher = pc.getMethodMatcher();
if (methodMatcher == MethodMatcher.TRUE) {
// No need to iterate the methods if we're matching any method anyway...
return true;
}
IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
}
Set<Class<?>> classes = new LinkedHashSet<>();
if (!Proxy.isProxyClass(targetClass)) {
classes.add(ClassUtils.getUserClass(targetClass));
}
classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));
for (Class<?> clazz : classes) {
Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
for (Method method : methods) {
if (introductionAwareMethodMatcher != null ?
introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
//此处判断
methodMatcher.matches(method, targetClass)) {
return true;
}
}
}
return false;
}
最终判断的MethodMatcher是AnnotationMethodMatcher
判断Method上是否携带annotationType注解 那annotationType会不会是@Async呢? 答案是肯定的
public class AnnotationMethodMatcher extends StaticMethodMatcher {
private final Class<? extends Annotation> annotationType;
//判断Method上是否携带annotationType注解 那annotationType会不会是@Async呢? 答案是肯定的
private boolean matchesMethod(Method method) {
return (this.checkInherited ? AnnotatedElementUtils.hasAnnotation(method, this.annotationType) :
method.isAnnotationPresent(this.annotationType));
}
}
线索还需要通过AsyncAnnotationBeanPostProcessor 找,我们刚才说了Advisor整合了Adive和Pointcu,Pointcut又整合了CalssFilter和MethodMatcher. 最终是通过MethodMatcher处理的。 我们从之前的源码得到信息AsyncAnnotationBeanPostProcessor 中Advisor的属性属性,我们通过找advisor的复制地方下手
AsyncAnnotationBeanPostProcessor 是一个BeanFactoryAware,在bean生命周期回调setBeanFactory时复制,通过AsyncAnnotationAdvisor构造器,对Adivce和Pointcut初始化,Pointcut中传递了的Annotation就是@Async,Adice就是最开始我们说的AnnotationAsyncExecutionInterceptor
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);,
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
//添加@Async注解
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
//此处通过ComposablePointcut通过union组合,(union满足一个即可)
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//创建Advice,就是我们最开始说的最终方法异步执行的Advice: AnnotationAsyncExecutionInterceptor
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}