Spring中异步注解@Async的使⽤、原理及使⽤时可能导致的问题及解决⽅法前⾔
其实最近都在研究事务相关的内容,之所以写这么⼀篇⽂章是因为前⾯写了⼀篇关于循环依赖的⽂章:
《》
然后,很多同学碰到了下⾯这个问题,添加了Spring提供的⼀个异步注解@Async循环依赖⽆法被解决了,下⾯是⼀些读者的留⾔跟⾥同学碰到的问题:
本着讲⼀个知识点就要讲明⽩、讲透彻的原则,我决定单独写⼀篇这样的⽂章对@Async这个注解做⼀下详细的介绍,这个注解带来的问题远远不⽌循环依赖这么简单,如果对它不够熟悉的话建议慎⽤。
⽂章要点
@Async的基本使⽤
这个注解的作⽤在于可以让被标注的⽅法异步执⾏,但是有两个前提条件
配置类上添加@EnableAsync注解需要异步执⾏的⽅法的所在类由Spring管理需要异步执⾏的⽅法上添加了@Async注解
我们通过⼀个Demo体会下这个注解的作⽤吧
第⼀步,配置类上开启异步:
@EnableAsync
@Configuration
@ComponentScan("com.dmz.spring.async")
public class Config {
}
第⼆步,
[code]@Component // 这个类本⾝要被Spring管理public class DmzAsyncService { @Async // 添加注解表⽰这
@Component // 这个类本⾝要被Spring管理
public class DmzAsyncService {
@Async // 添加注解表⽰这个⽅法要异步执⾏
public void testAsync(){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("testAsync invoked");
}
}
第三步,测试异步执⾏
public class Main {
public static void main(String[] args) {
AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext(Config.class);
DmzAsyncService bean = ac.getBean(DmzAsyncService.class);
System.out.println("main函数执⾏完成");
}
}
// 程序执⾏结果如下:
// main函数执⾏完成
// testAsync invoked
通过上⾯的例⼦我们可以发现,DmzAsyncService中的testAsync⽅法是异步执⾏的,那么这背后的原理是什么呢?我们接着分析
原理分析
我们在分析某⼀个技术的时候,最重要的事情是,⼀定⼀定要到代码的⼊⼝,像Spring这种都很明显,⼊⼝必定是在@EnableAsync这个注解上⾯,我们来看看这个注解⼲了啥事(本⽂基于5.2.x版本)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 这⾥是重点,导⼊了⼀个ImportSelector
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
// 这个配置可以让程序员配置需要被检查的注解,默认情况下检查的就是@Async注解
Class<? extends Annotation> annotation() default Annotation.class;
// 默认使⽤jdk代理
boolean proxyTargetClass() default false;
// 默认使⽤Spring AOP
AdviceMode mode() default AdviceMode.PROXY;
// 在后续分析我们会发现,这个注解实际往容器中添加了⼀个
/
/ AsyncAnnotationBeanPostProcessor,这个后置处理器实现了Ordered接⼝
// 这个配置主要代表了AsyncAnnotationBeanPostProcessor执⾏的顺序
int order() default Ordered.LOWEST_PRECEDENCE;
}
上⾯这个注解做的最重要的事情就是导⼊了⼀个AsyncConfigurationSelector,这个类的源码如下:
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
// 默认会使⽤SpringAOP进⾏代理
case PROXY:
return new String[] {Name()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
这个类的作⽤是像容器中注册了⼀个ProxyAsyncConfiguration,这个类的继承关系如下:
我们先看下它的⽗类AbstractAsyncConfiguration,其源码如下:
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {
@Nullable
protected AnnotationAttributes enableAsync;
@Nullable
protected Supplier<Executor> executor;
@Nullable
protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
// 这⾥主要就是检查将其导⼊的类上是否有EnableAsync注解
// 如果没有的话就报错
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
if (ableAsync == null) {
throw new IllegalArgumentException(
"@EnableAsync is not present on importing class " + ClassName());
}
}
// 将容器中配置的AsyncConfigurer注⼊
// 异步执⾏嘛,所以我们可以配置使⽤的线程池
// 另外也可以配置异常处理器
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
}
}
再来看看ProxyAsyncConfiguration这个类的源码
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
// 将通过AsyncConfigurer配置好的线程池跟异常处理器设置到这个后置处理器中
Class<? extends Annotation> customAsyncAnnotation = Class("annotation");  if (customAsyncAnnotation != DefaultValue(EnableAsync.class, "annotation")) {  bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.Boolean("proxyTargetClass"));
bpp.ableAsync.<Integer>getNumber("order"));
return bpp;
}
}
这个类本⾝是⼀个配置类,它的作⽤是向容器中添加⼀个AsyncAnnotationBeanPostProcessor。到这⼀步我们基本上就可以明⽩了,@Async注解的就是通
过AsyncAnnotationBeanPostProcessor这个后置处理器⽣成⼀个代理对象来实现异步的,接下来我们
就具体看看AsyncAnnotationBeanPostProcessor是如何⽣成代理对象的,我们主要关注⼀下⼏点即可:
是在⽣命周期的哪⼀步完成的代理?
切点的逻辑是怎么样的?它会对什么样的类进⾏拦截?
通知的逻辑是怎么样的?是如何实现异步的?
基于上⾯⼏个问题,我们进⾏逐⼀分析
是在⽣命周期的哪⼀步完成的代理?
我们抓住重点,AsyncAnnotationBeanPostProcessor是⼀个后置处理器器,按照我们对Spring的了解,⼤概率是在这个后置处理器的postProcessAfterInitialization⽅法中完成了代理,直接定位到这个⽅法,这个⽅法位于⽗类AbstractAdvisingBeanPostProcessor中,具体代码如下:
public Object postProcessAfterInitialization(Object bean, String beanName) {
// 没有通知,或者是AOP的基础设施类,那么不进⾏代理
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
return bean;
}
// 对已经被代理的类,不再⽣成代理,只是将通知添加到代理类的逻辑中
// 这⾥通过beforeExistingAdvisors决定是将通知添加到所有通知之前还是添加到所有通知之后
// 在使⽤@Async注解的时候,beforeExistingAdvisors被设置成了true
// 意味着整个⽅法及其拦截逻辑都会异步执⾏
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && TargetClass(bean))) {
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
jdk怎么使用advised.addAdvisor(this.advisor);
}
return bean;
}
}
// 判断需要对哪些Bean进⾏来代理
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
Class(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
Proxy(getProxyClassLoader());
}
return bean;
}
果不其然,确实是在这个⽅法中完成的代理。接着我们就要思考,切点的过滤规则是什么呢?
切点的逻辑是怎么样的?
其实也不难猜到肯定就是类上添加了@Async注解或者类中含有被@Async注解修饰的⽅法。基于此,我们看看这个isEligible这个⽅法的实现逻辑,这个⽅位位
于AbstractBeanFactoryAwareAdvisingPostProcessor中,也是AsyncAnnotationBeanPostProcessor的⽗类,对应代码如下:
// AbstractBeanFactoryAwareAdvisingPostProcessor的isEligible⽅法
// 调⽤了⽗类
protected boolean isEligible(Object bean, String beanName) {
return (!AutoProxyUtils.isOriginalInstance(beanName, Class()) &&
super.isEligible(bean, beanName));
}
protected boolean isEligible(Object bean, String beanName) {
return Class());
}
protected boolean isEligible(Class<?> targetClass) {
Boolean eligible = (targetClass);
if (eligible != null) {
return eligible;
}
if (this.advisor == null) {
return false;
}
// 这⾥完成的判断
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}
实际上最后就是根据advisor来确定是否要进⾏代理,在这篇⽂章中我们提到过,advisor实际就是⼀个绑定了切点的通知,那么AsyncAnnotationBeanPostProcessor这个advisor是什么时候被初始化的呢?我们直接定位到AsyncAnnotationBeanPostProcessor的setBeanFactory⽅法,其源码如下:
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
// 在这⾥new了⼀个AsyncAnnotationAdvisor
AsyncAnnotationAdvisor advisor = new utor, ptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
// 完成了初始化
this.advisor = advisor;
}
我们来看看AsyncAnnotationAdvisor中的切点匹配规程是怎么样的,直接定位到这个类的buildPointcut⽅法中,其源码如下:
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
// 就是根据这两个匹配器进⾏匹配的
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
代码很简单,就是根据cpc跟mpc两个匹配器来进⾏匹配的,第⼀个是检查类上是否有@Async注解,第⼆个是检查⽅法是是否有@Async注解。
那么,到现在为⽌,我们已经知道了它在何时创建代理,会为什么对象创建代理,最后我们还需要解决⼀个问题,代理的逻辑是怎么样的,异步到底是如何实现的?
通知的逻辑是怎么样的?是如何实现异步的?
前⾯也提到了advisor是⼀个绑定了切点的通知,前⾯分析了它的切点,那么现在我们就来看看它的通知逻辑,直接定位到AsyncAnnotationAdvisor中的buildAdvice⽅法,源码如下:protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
return interceptor;
}
简单吧,加了⼀个⽽已,对于interceptor类型的对象,我们关注它的核⼼⽅法invoke就⾏了,代码如下:
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (This() != null ? This()) : null);
Method specificMethod = Method(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 异步执⾏嘛,先获取到⼀个线程池
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
// 然后将这个⽅法封装成⼀个 Callable对象传⼊到线程池中执⾏
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
Cause(), userDeclaredMethod, Arguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, Arguments());
}
return null;
};
// 将任务提交到线程池
return doSubmit(task, executor, Method().getReturnType());
}
导致的问题及解决⽅案
问题1:循环依赖报错
就像在这张图⾥这个读者问的问题,