1.说在前边
在同一个类中,一个方法调用另外一个有注解(比如@Async,@Transational)的方法,注解是不会生效的。
2. SpringBoot自定义线程池
2.1 修改application.properties
task.pool.corePoolSize=20
task.pool.maxPoolSize=40
task.pool.keepAliveSeconds=300
task.pool.queueCapacity=50
2.2 线程池配置属性类
package com.dyaqi.async.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author: dongyq
* @date: 2021/12/24 15:59
* @since:
* @功能描述: 线程池配置属性类
*/
@ConfigurationProperties(prefix = "task.pool")
public class TaskThreadPoolConfig {
private int corePoolSize;
private int maxPoolSize;
private int keepAliveSeconds;
private int queueCapacity;
//...getter and setter methods...
}
2.3 创建线程池
package com.dyaqi.async.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author: dongyq
* @date: 2021/12/24 16:02
* @since:
* @功能描述: 自定义线程池
*/
@Configuration
@EnableAsync
public class TaskExecutePool {
@Autowired
private TaskThreadPoolConfig config;
@Bean("myTaskAsyncPool")
public Executor myTaskAsyncPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(config.getCorePoolSize());
//最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());
//队列容量
executor.setQueueCapacity(config.getQueueCapacity());
//活跃时间
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
//线程名字前缀
executor.setThreadNamePrefix("TaskPool-");
// setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}
上面我们通过使用ThreadPoolTaskExecutor
创建了一个线程池,同时设置了以下这些参数:
- 核心线程数20:线程池创建时候初始化的线程数
- 最大线程数40:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
- 缓冲队列50:用来缓冲执行任务的队列
- 允许线程的空闲时间300秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
- 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
- 线程池对拒绝任务的处理策略:这里采用了
CallerRunsPolicy
策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
注:处理策略
ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常(默认)。
ThreadPoolExecutor.DiscardPolic 丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务
ThreadPoolExecutor.CallerRunsPolic 由调用线程处理该任务(常用)
说明:setWaitForTasksToCompleteOnShutdown(true)
该方法就是这里的关键,用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁。同时,这里还设置了setAwaitTerminationSeconds(60),该方法用来设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
2.4 创建异步线程任务
package com.dyaqi.async.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* @author: dongyq
* @date: 2021/12/24 16:05
* @since:
* @功能描述: 异步线程调用(不能和调用方法同类)
*/
@Component
public class AsyncTask {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Async("myTaskAsyncPool") /*使用指定的线程池*/
public void doTaskPool(int i) {
logger.info("Task" + i + " started.");
}
}
2.5 修改启动类
给启动类添加注解
@EnableAsync
@EnableConfigurationProperties({TaskThreadPoolConfig.class} ) // 开启配置属性支持
2.6 测试
package com.dyaqi.async.service.impl;
import com.dyaqi.async.config.AsyncTask;
import com.dyaqi.async.service.IService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author: dongyq
* @date: 2021/12/24 15:43
* @since:
* @功能描述: ServiceImpl
*/
@Service
public class ServiceImpl implements IService {
@Autowired
private AsyncTask asyncTask;
@Override
public String get1() {
for (int i = 0; i < 100; i++) {
asyncTask.doTaskPool(i);
}
return "he";
}
}
注:Controller和IService自行创建
3. 修改SpringBoot默认线程池
因为上面的那个线程池使用时候总要加注解@Async("myTaskAsyncPool")
,(业务系统中的多处需要修改)如果我们想使用默认的线程池,即使用异步线程池时还是使用@Async的注解。但是只是想修改默认线程池的配置,将默认的异步线程池的参数可配置化,方便系统的调优。
具体实现有以下方案:
- 重新实现接口AsyncConfigurer
- 继承AsyncConfigurerSupport
- 配置由自定义的TaskExecutor替代内置的任务执行器
接下来我们介绍两种。
3.1实现AsyncConfigurer类
源码如下:
public interface AsyncConfigurer {
@Nullable
default Executor getAsyncExecutor() {
return null;
}
@Nullable
default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
说明:
Executor : 处理异步方法调用时要使用的实例,
AsyncUncaughtExceptionHandler :在使用void
返回类型的异步方法执行期间抛出异常时要使用的实例。
3.1.1 获取属性配置类
这个和上面的TaskThreadPoolConfig类相同,这里不重复。
3.1.2 装配线程池
package com.dyaqi.async.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author: dongyq
* @date: 2021/12/24 16:32
* @since:
* @功能描述: 修改SpringBoot默认线程池
*/
@Configuration
public class ImplAsyncTaskExecutePool implements AsyncConfigurer {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
TaskThreadPoolConfig config;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(config.getCorePoolSize());
//最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());
//队列容量
executor.setQueueCapacity(config.getQueueCapacity());
//活跃时间
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
//线程名字前缀
executor.setThreadNamePrefix("ImplTaskPool-");
// setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
logger.error("==========================" + ex.getMessage() + "=======================", ex);
logger.error("exception method:" + method.getName());
}
};
}
}
3.1.3 创建异步线程任务
package com.dyaqi.async.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* @author: dongyq
* @date: 2021/12/24 16:05
* @since:
* @功能描述: 异步线程调用(不能和调用方法同类)
*/
@Component
public class AsyncTask {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Async /*使用默认的线程池(已被我们修改)*/
public void doNativeTaskPool(int i) {
logger.info("Task" + i + " started.");
}
}
3.1.4 测试
ServiceImpl添加方法
@Override
public String get2() {
for (int i = 0; i < 100; i++) {
asyncTask.doNativeTaskPool(i);
}
return "he";
}
3.2继承AsyncConfigurerSupport类
源码如下:
public class AsyncConfigurerSupport implements AsyncConfigurer {
public AsyncConfigurerSupport() {
}
public Executor getAsyncExecutor() {
return null;
}
@Nullable
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
说明:
Executor : 处理异步方法调用时要使用的实例,
AsyncUncaughtExceptionHandler :在使用void
返回类型的异步方法执行期间抛出异常时要使用的实例。
3.2.1 获取属性配置类
这个和上面的TaskThreadPoolConfig类相同,这里不重复。
3.2.2 装配线程池
package com.dyaqi.async.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author: dongyq
* @date: 2021/12/24 16:48
* @since:
* @功能描述: 修改SpringBoot默认线程池
*/
@Configuration
public class ExtendsAsyncTaskExecutePool extends AsyncConfigurerSupport {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
TaskThreadPoolConfig config;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(config.getCorePoolSize());
//最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());
//队列容量
executor.setQueueCapacity(config.getQueueCapacity());
//活跃时间
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
//线程名字前缀
executor.setThreadNamePrefix("ExtTaskPool-");
// setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
logger.error("==========================" + ex.getMessage() + "=======================", ex);
logger.error("exception method:" + method.getName());
}
};
}
}
3.2.3 接下来和上面相同,这里不重复。
4.有返回值Future调用
4.1 创建异步线程任务
@Async
public Future<String> doNativeTaskPoolRe1() throws InterruptedException {
logger.info("开始执行任务一");
long l1 = System.currentTimeMillis();
Thread.sleep(2000);
long l2 = System.currentTimeMillis();
logger.info("任务一用时:" + (l2 - l1));
return new AsyncResult<>("任务一完成");
}
@Async
public Future<String> doNativeTaskPoolRe2() throws InterruptedException {
logger.info("开始执行任务二");
long l1 = System.currentTimeMillis();
Thread.sleep(2000);
long l2 = System.currentTimeMillis();
logger.info("任务二用时:" + (l2 - l1));
return new AsyncResult<>("任务二完成");
}
@Async
public Future<String> doNativeTaskPoolRe3() throws InterruptedException {
logger.info("开始执行任务三");
long l1 = System.currentTimeMillis();
Thread.sleep(2000);
long l2 = System.currentTimeMillis();
logger.info("任务三用时:" + (l2 - l1));
return new AsyncResult<>("任务三完成");
}
4.2 测试
@Override
public String get3() {
try {
logger.info("开始访问");
long l1 = System.currentTimeMillis();
Future<String> poolRe1 = asyncTask.doNativeTaskPoolRe1();
Future<String> poolRe2 = asyncTask.doNativeTaskPoolRe2();
Future<String> poolRe3 = asyncTask.doNativeTaskPoolRe3();
while (true) {//死循环,每隔2000ms执行一次,判断一下这三个异步调用的方法是否全都执行完了。
if (poolRe1.isDone() && poolRe2.isDone() && poolRe3.isDone()) {//使用Future的isDone()方法返回该方法是否执行完成
//如果异步方法全部执行完,跳出循环
break;
}
Thread.sleep(2000);//每隔2000毫秒判断一次
}
long l2 = System.currentTimeMillis();//跳出while循环时说明此时三个异步调用的方法都执行完成了,此时得到当前时间
String result = poolRe1.get();
logger.info("结束访问,用时:" + (l2 - l1));
logger.info("使用get方法获得的返回内容:" + result);
} catch (Exception e) {
e.printStackTrace();
}
return "he";
}