# SpringAsync
Spring Async的底层是JDK中的异步编程模型
# 灵活使用多种Future机制
- 在使用 Spring 异步编程模型时,我们不可避免地需要使用到 JDK 中的 Future 机制
- 传统 Future 机制存在很多不完善之处,Java 8中引入了 CompletableFuture 对传统Future进行了优化
- 开发人员可以直接通过 CompletableFuture 将异步执行结果交给另外一个异步线程来处理
- 这样在异步任务完成后,我们使用任务结果时则不需要等待
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// 一旦构建了CompletableFuture对象
// 就可以使用该对象的get()方法去获取一个Future对象
// 也可以通过它的complete()方法去手工完成一个Future对象
# CompletableFuture 通常有三种使用方法,分别针对不同的应用场景
- 第一种就是使用runAsync()方法异步执行任务,系统会启动一个新的线程,执行完后不会返回任何值
@RequestMapping(value = "/health_description")
public CompletableFuture<String> syncHealthDescription() {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
completableFuture.complete(healthService.getHealthDescription().get());
} catch (InterruptedException | ExecutionException e) {
completableFuture.completeExceptionally(e);
}
}
});
return completableFuture;
}
- 如果想要在异步执行任务完成之后返回值,那么可以使用CompletableFuture的supplyAsync()方法
@RequestMapping(value = "/health_description")
public CompletableFuture<String> syncHealthDescription () {
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return healthService.getHealthDescription().get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error(e);
}
return "No health description found";
}
});
return completableFuture;
}
- 基于CompletableFuture的典型场景使用回调
# 为了构建异步通知效果,系统势必需要一种回调机制,在Future执行完成之后能够自动执行一些回调处理
# CompletableFuture.get()方法实际上也是需要等到Future执行完成之后才能返回
# CompletableFuture提供的thenApply()、thenAccept()、thenRun()方法将回调添加到CompletableFuture
// 这里以thenApply()为例给出示例代码
// 演示了第二个任务依赖第一个任务返回结果的处理场景
// 在第一个任务执行完返回值10时触发了第二个任务的执行逻辑,从而得到结果为10×5=50
public CompletableFuture<Long> thenApplyDemo() {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
@Override
public Long get() {
long firstValue = 10;
System.out.println("firstValue="+firstValue);
return firstValue;
}
}).thenApply(new Function<Long, Long>() {
@Override
public Long apply(Long t) {
long secondValue = t*5;
System.out.println("secondValue="+secondValue);
return secondValue;
}
});
return future; // 执行结果为50
}
# 开启异步线程的方法
# 注解@Async
# springboot 框架的注解,使用时有一些限制,@Async 注解不能在类本身直接调用
# 可以使用单独的 Service 实现异步方法,然后在其他的类中调用该 Service 中的异步方法
- 添加注解:在springboot的config中添加 @EnableAsync注解,开启异步线程功能
@Configuration
@EnableAsync
public class MyConfig {
// 自己配置的Config
}
- 创建异步方法Service和实现类,并且返回的类型一定是
Future< T >
T是需要返回的值的类型
// Service类
public interface IExecuteService {
/**
* 一些耗时的操作,使用单独线程处理
* 这里就简单写了一个sleep5秒的操作
*/
@Async
public Future<String> sleepingTest();
}
// Service实现类
@Service
public class ExecuteServiceImpl implements IExecuteService {
private static final Logger log = LoggerFactory.getLogger(ExecuteServiceImpl.class);
@Override
public Future<String> sleepingTest() {
log.info("SleepingTest start");
try {
Thread.sleep(5000);
return "0"
} catch (Exception e) {
log.error("SleepingTest:" + e.toString());
return "1"
}
log.info("SleepingTest end");
}
}
- 调用异步方法
@RestController
public class TestController {
private static final Logger log = LoggerFactory.getLogger(TestController.class);
@Autowired
private IExecuteService executeService;
@RequestMapping("/executeTask")
public String executeTask() {
log.info("executeTask Start!");
// 调用异步方法
Future<String> result = executeService.sleepingTest();
// 写一个死循环 进行线程的调用
while (true) {
// result.isDone() 表示方法执行结束
if (map == null && result.isDone()) {
// 获取到我们原本需要的数据 返回后break
map = result.get();
break;
}
}
log.info("executeTask End!");
return "executeTask";
}
}
Future接口提供几个很方便的函数,判断线程是否完成
future.isDone():判断是否执行完毕
future.get():获取线程的返回值,同样可以返回异常, 子线程可以向上抛出异常给父级
while (true) {
if (task1.isDone() && task2.isDone() && task3.isDone()) {
break;
}
}
// 或者
List<Future<String>> lResult = new ArrayList<>();
for (MultipartFile f : file) {
Future<String> result = tbService.asyncUpload(f, tb.getNumber());
lResult.add(result);
}
// 异步执行是否完成
while (true){
if(lResult.stream().allMatch(a->a.isDone())){
break;
}
}
# AsyncManager
使用AsyncManager方法,也是SpringBoot框架中带的任务管理器,可以实现异步线程
- 创建AsyncManager类,这个在springboot框架中应该也是有的
// 异步任务管理器
public class AsyncManager {
// 操作延迟10毫秒
private final int OPERATE_DELAY_TIME = 10;
// 异步操作任务调度线程池
private ScheduledExecutorService executor = SpringUtils.getBean("scheduledService");
// 单例模式
private AsyncManager() { }
private static AsyncManager me = new AsyncManager();
public static AsyncManager me() {
return me;
}
// 执行任务
public void execute(TimerTask task) {
executor.schedule(task, OPERATE_DELAY_TIME, TimeUnit.MILLISECONDS);
}
// 停止任务线程池
public void shutdown() {
Threads.shutdownAndAwaitTermination(executor);
}
}
- 创建一个耗时的操作类
public TimerTask sleepingTest() {
return new TimerTask() {
@Override
public void run() {
// 耗时操作
try {
Thread.sleep(5000);
} catch (Exception e) {
log.error("SleepingTest:" + e.toString());
}
}
};
}
- 执行异步操作
AsyncManager.me().execute(sleepingTest());
# 线程池
# 线程池是一组预先创建好的线程,这些线程可以被重复利用来执行各种任务
# 使用线程池可以有效地管理线程资源,避免频繁地创建和销毁线程所带来的开销
使用线程池可以设定更多的参数,线程池有很多详细的介绍,在这我只介绍一种,带拒绝策略的线程池
- 创建线程池
// 线程池信息
private static final ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(5, // 核心线程数量5
10, // 最大数量10
30, // 超出核心线程数量的线程存活时间:30秒
TimeUnit.SECONDS,
// 队列大小20
new LinkedBlockingQueue<Runnable>(20),
// 指定拒绝策略的
new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("有任务被拒绝执行了");
}
});
- 创建一个耗时的操作类
// 由于线程池需要传入一个Runnable,所以此类继承Runnable,还是用sleep模拟耗时操作
static class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在执行task " + taskNum);
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task " + taskNum + "执行完毕");
}
}
- 执行线程池
// 开启线程池,这里通过一个for循环模拟一下,可以看一下log输出
// 有兴趣的可以修改一下for循环和sleep的数值,看看线程池具体的操作和拒绝流程
for (int i = 0; i < 20; i++) {
MyTask myTask = new MyTask(i);
threadPoolExecutor.execute(myTask);
System.out.println("线程池中线程数:" + threadPoolExecutor.getPoolSize());
System.out.println("队列中等待执行的任务数:" + threadPoolExecutor.getQueue().size());
System.out.println("已执行完的任务数:" + threadPoolExecutor.getCompletedTaskCount());
}
threadPoolExecutor.shutdown();
# 如何设置线程池
# 当使用异步编程时,需要告诉程序(编译器或运行时环境)这个异步方法在执行的时候要使用哪个线程池
# 默认情况下,Spring将查找容器中已定义的唯一线程池TaskExecutor或查找名为TaskExecutor的Executor类
# 如果这两种情况都没有找到,Spring将使用内置的 SimpleAsyncTaskExecutor 来处理异步方法的执行
但是,有时候我们并不希望应用程序中的所有任务都使用相同的线程池,而是想要为不同的方法配置不同的线程池,这时候就需要为异步方法调用来声明一个线程池定义。
# 设置方法一
- 创建一个类型为ThreadPoolTaskExecutor的Bean
@Bean
public static ThreadPoolTaskExecutor getExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(30);
taskExecutor.setMaxPoolSize(30);
taskExecutor.setQueueCapacity(50);
taskExecutor.setThreadNamePrefix("Web");
return taskExecutor;
}
- 在创建WebAsyncTask时通过在构造函数中传入这个ThreadPoolTaskExecutor来获取线程
@Autowired
private ThreadPoolTaskExecutor executor;
@RequestMapping(value = "case4", method = RequestMethod.GET)
public WebAsyncTask<String> taskWithThreadPool() {
System.err.println("The main Thread name is " + Thread.currentThread().getName());
// 传入自定义的executor,并模拟开启一个异步任务
WebAsyncTask<String> task1 = new WebAsyncTask<String>(10 * 1000L, executor, () -> {
System.out.println("Current Thread name is " + Thread.currentThread().getName());
Thread.sleep(5000L);
return "task executed!";
});
// 任务执行完成时调用该方法
task1.onCompletion(() -> {
System.out.println("task finished!");
});
System.out.println("task can do other things!");
return task1;
}
# 设置方法二
在Spring Boot中,你可以使用@Async注解和TaskExecutor来实现异步方法执行
- 首先,在主配置类中启用异步支持。在SpringBootApplication类上添加@EnableAsync注解:
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
- 通过 @Configuration 配置类来配置一个线程池
@Configuration
@EnableAsync
public class AppConfig {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
}
- 创建一个接口,定义需要异步执行的方法:
public interface AsyncService {
@Async
void asyncMethod();
}
- 实现接口,并添加@Async注解:
@Service
public class AsyncServiceImpl implements AsyncService {
@Override
@Async
public void asyncMethod() {
// 需要异步执行的代码
System.out.println("Execute async method: " + Thread.currentThread().getName());
}
}
- 在其他类中调用异步方法
@RestController
public class MyController {
@Autowired
private AsyncService asyncService;
@GetMapping("/executeAsync")
public String executeAsync() {
System.out.println("Start execution: " + Thread.currentThread().getName());
asyncService.asyncMethod();
System.out.println("End execution: " + Thread.currentThread().getName());
return "Async method executed";
}
}
WebAsyncTask和@Async注解的不同点
相较于@Async注解,WebAsyncTask的主要优势是提供了一组非常实用的回调方法,开发人员可以在执行过程完毕、超时以及发生异常时设置自定义的处理逻辑。
# 判断线程是否执行完毕
# 使用join()方法
Java中的Thread类提供了join()方法,用于等待线程执行完毕。调用join()方法会将当前线程阻塞,直到被调用的线程执行完成后才会继续执行
// 创建了一个新的线程,并在主线程中调用了join()方法
public class JoinExample implements Runnable {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new JoinExample());
thread.start();
thread.join();
System.out.println("Thread execution completed.");
}
@Override
public void run() {
// 线程执行的逻辑
}
}
# 使用CountDownLatch类
CountDownLatch是Java中的一个同步辅助类,它可以用来同步一个或多个线程的执行。我们可以通过创建一个CountDownLatch对象,并设置计数器的初始值,然后在线程执行完毕时调用countDown()方法减少计数器的值,最后使用await()方法等待计数器变为0
// 创建了一个CountDownLatch对象,并将其作为参数传递给线程对象
// 在线程执行完毕时,调用了countDown()方法减少计数器的值
// 主线程通过调用await()方法等待计数器变为0
public class CountDownLatchExample implements Runnable {
private final CountDownLatch latch;
public CountDownLatchExample(CountDownLatch latch) {
this.latch = latch;
}
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Thread thread = new Thread(new CountDownLatchExample(latch));
thread.start();
latch.await();
System.out.println("Thread execution completed.");
}
@Override
public void run() {
// 线程执行的逻辑
latch.countDown();
}
}
# 使用volatile关键字
在Java中,volatile关键字用于修饰变量,保证变量的可见性和禁止指令重排序。我们可以定义一个volatile类型的变量,在线程执行完毕时将其置为true,然后通过判断该变量的值来判断线程是否执行完毕
// 定义了一个volatile类型的变量finished,并将其初始值设置为false
// 在线程执行完毕时将其置为true。主线程通过循环判断finished的值来等待线程执行完毕
public class VolatileExample implements Runnable {
private volatile boolean finished = false;
public static void main(String[] args) throws InterruptedException {
VolatileExample example = new VolatileExample();
Thread thread = new Thread(example);
thread.start();
while (!example.finished) {
Thread.sleep(100);
}
System.out.println("Thread execution completed.");
}
@Override
public void run() {
// 线程执行的逻辑
finished = true;
}
}