# SpringAsync

Spring Async的底层是JDK中的异步编程模型

# 灵活使用多种Future机制

  • 在使用 Spring 异步编程模型时,我们不可避免地需要使用到 JDK 中的 Future 机制
  • 传统 Future 机制存在很多不完善之处,Java 8中引入了 CompletableFuture 对传统Future进行了优化
  • 开发人员可以直接通过 CompletableFuture 将异步执行结果交给另外一个异步线程来处理
  • 这样在异步任务完成后,我们使用任务结果时则不需要等待
CompletableFuture<String> completableFuture = new CompletableFuture<>();

// 一旦构建了CompletableFuture对象
// 就可以使用该对象的get()方法去获取一个Future对象
// 也可以通过它的complete()方法去手工完成一个Future对象

# CompletableFuture 通常有三种使用方法,分别针对不同的应用场景

  • 第一种就是使用runAsync()方法异步执行任务,系统会启动一个新的线程,执行完后不会返回任何值
@RequestMapping(value = "/health_description")
public CompletableFuture<String> syncHealthDescription() {
	CompletableFuture<String> completableFuture = new CompletableFuture<>();
	CompletableFuture.runAsync(new Runnable() {
		@Override
		public void run() {
		   try {
			  completableFuture.complete(healthService.getHealthDescription().get());
		   } catch (InterruptedException | ExecutionException e) {
			  completableFuture.completeExceptionally(e);
		   }
		}
	});
	return completableFuture;
}
  • 如果想要在异步执行任务完成之后返回值,那么可以使用CompletableFuture的supplyAsync()方法
@RequestMapping(value = "/health_description")
public CompletableFuture<String> syncHealthDescription () {
	CompletableFuture.supplyAsync(new Supplier<String>() {
		@Override
		public String get() {
		   try {
			  return healthService.getHealthDescription().get();
		   } catch (InterruptedException | ExecutionException e) {
			  LOGGER.error(e);
		   }
		  return "No health description found";
		}
	});
	return completableFuture;
}
  • 基于CompletableFuture的典型场景使用回调
# 为了构建异步通知效果,系统势必需要一种回调机制,在Future执行完成之后能够自动执行一些回调处理
# CompletableFuture.get()方法实际上也是需要等到Future执行完成之后才能返回
# CompletableFuture提供的thenApply()、thenAccept()、thenRun()方法将回调添加到CompletableFuture
// 这里以thenApply()为例给出示例代码
// 演示了第二个任务依赖第一个任务返回结果的处理场景
// 在第一个任务执行完返回值10时触发了第二个任务的执行逻辑,从而得到结果为10×5=50
public CompletableFuture<Long> thenApplyDemo() {
	CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
		@Override
		public Long get() {
			long firstValue = 10;
			System.out.println("firstValue="+firstValue);
			return firstValue;
		}
	}).thenApply(new Function<Long, Long>() {
		@Override
		public Long apply(Long t) {
			long secondValue = t*5;
			System.out.println("secondValue="+secondValue);
			return secondValue;
		}
	});
	return future; // 执行结果为50
}

# 开启异步线程的方法

# 注解@Async

# springboot 框架的注解,使用时有一些限制,@Async 注解不能在类本身直接调用
# 可以使用单独的 Service 实现异步方法,然后在其他的类中调用该 Service 中的异步方法
  • 添加注解:在springboot的config中添加 @EnableAsync注解,开启异步线程功能
@Configuration
@EnableAsync
public class MyConfig {
    // 自己配置的Config
}
  • 创建异步方法Service和实现类,并且返回的类型一定是Future< T > T是需要返回的值的类型
// Service类
public interface IExecuteService {
    /**
     * 一些耗时的操作,使用单独线程处理
     * 这里就简单写了一个sleep5秒的操作
     */
    @Async
    public Future<String> sleepingTest();
}

// Service实现类
@Service
public class ExecuteServiceImpl implements IExecuteService {
	
    private static final Logger log = LoggerFactory.getLogger(ExecuteServiceImpl.class);
	
    @Override
    public Future<String> sleepingTest() {
        log.info("SleepingTest start");
        try {
            Thread.sleep(5000);
			return "0"
        } catch (Exception e) {
            log.error("SleepingTest:" + e.toString());
			return "1"
        }
        log.info("SleepingTest end");
    }
}
  • 调用异步方法
@RestController
public class TestController {
    private static final Logger log = LoggerFactory.getLogger(TestController.class);
	
    @Autowired
    private IExecuteService executeService;
	
    @RequestMapping("/executeTask")
    public String executeTask() {
        log.info("executeTask Start!");
		// 调用异步方法
		Future<String> result = executeService.sleepingTest();
		// 写一个死循环 进行线程的调用
		while (true) {
		    // result.isDone() 表示方法执行结束
		    if (map == null && result.isDone()) {
		        // 获取到我们原本需要的数据 返回后break
		        map = result.get();
		         break;
		     }
		}
        log.info("executeTask End!");
        return "executeTask";
    }
}

Future接口提供几个很方便的函数,判断线程是否完成

future.isDone():判断是否执行完毕
future.get():获取线程的返回值,同样可以返回异常, 子线程可以向上抛出异常给父级

while (true) {
	if (task1.isDone() && task2.isDone() && task3.isDone()) {
		break;
	}
}

// 或者
List<Future<String>> lResult = new ArrayList<>();
for (MultipartFile f : file) {
	Future<String> result = tbService.asyncUpload(f, tb.getNumber());
	lResult.add(result);
}
// 异步执行是否完成
while (true){
	if(lResult.stream().allMatch(a->a.isDone())){
		break;
	}
}

# AsyncManager

使用AsyncManager方法,也是SpringBoot框架中带的任务管理器,可以实现异步线程

  • 创建AsyncManager类,这个在springboot框架中应该也是有的
// 异步任务管理器
public class AsyncManager {
    // 操作延迟10毫秒
    private final int OPERATE_DELAY_TIME = 10;
    
	// 异步操作任务调度线程池
    private ScheduledExecutorService executor = SpringUtils.getBean("scheduledService");
    
	// 单例模式
    private AsyncManager() { }
    private static AsyncManager me = new AsyncManager();
    public static AsyncManager me() {
        return me;
    }
    
	// 执行任务
    public void execute(TimerTask task) {
        executor.schedule(task, OPERATE_DELAY_TIME, TimeUnit.MILLISECONDS);
    }
    
	// 停止任务线程池
    public void shutdown() {
        Threads.shutdownAndAwaitTermination(executor);
    }
}
  • 创建一个耗时的操作类
public TimerTask sleepingTest() {
	return new TimerTask() {
		@Override
		public void run() {
			// 耗时操作
			try {
				Thread.sleep(5000);
			} catch (Exception e) {
				log.error("SleepingTest:" + e.toString());
			}
		}
	};
}
  • 执行异步操作
AsyncManager.me().execute(sleepingTest());

# 线程池

# 线程池是一组预先创建好的线程,这些线程可以被重复利用来执行各种任务
# 使用线程池可以有效地管理线程资源,避免频繁地创建和销毁线程所带来的开销

使用线程池可以设定更多的参数,线程池有很多详细的介绍,在这我只介绍一种,带拒绝策略的线程池

  • 创建线程池
// 线程池信息
private static final ThreadPoolExecutor threadPoolExecutor = 
                     new ThreadPoolExecutor(5, // 核心线程数量5
											10, // 最大数量10
                                            30, // 超出核心线程数量的线程存活时间:30秒
											TimeUnit.SECONDS,
											// 队列大小20
											new LinkedBlockingQueue<Runnable>(20),
											// 指定拒绝策略的
											new RejectedExecutionHandler()
{
	@Override
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		log.error("有任务被拒绝执行了");
	}
});
  • 创建一个耗时的操作类
// 由于线程池需要传入一个Runnable,所以此类继承Runnable,还是用sleep模拟耗时操作
static class MyTask implements Runnable {
	private int taskNum;
	public MyTask(int num) {
		this.taskNum = num;
	}
	@Override
	public void run() {
		System.out.println("正在执行task " + taskNum);
		try {
			Thread.sleep(4000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("task " + taskNum + "执行完毕");
	}
}
  • 执行线程池
// 开启线程池,这里通过一个for循环模拟一下,可以看一下log输出
// 有兴趣的可以修改一下for循环和sleep的数值,看看线程池具体的操作和拒绝流程
for (int i = 0; i < 20; i++) {
	MyTask myTask = new MyTask(i);
	threadPoolExecutor.execute(myTask);
	System.out.println("线程池中线程数:" + threadPoolExecutor.getPoolSize());
	System.out.println("队列中等待执行的任务数:" + threadPoolExecutor.getQueue().size());
	System.out.println("已执行完的任务数:" + threadPoolExecutor.getCompletedTaskCount());
}
threadPoolExecutor.shutdown();

# 如何设置线程池

# 当使用异步编程时,需要告诉程序(编译器或运行时环境)这个异步方法在执行的时候要使用哪个线程池
# 默认情况下,Spring将查找容器中已定义的唯一线程池TaskExecutor或查找名为TaskExecutor的Executor类
# 如果这两种情况都没有找到,Spring将使用内置的 SimpleAsyncTaskExecutor 来处理异步方法的执行

但是,有时候我们并不希望应用程序中的所有任务都使用相同的线程池,而是想要为不同的方法配置不同的线程池,这时候就需要为异步方法调用来声明一个线程池定义。

# 设置方法一

  • 创建一个类型为ThreadPoolTaskExecutor的Bean
@Bean
public static ThreadPoolTaskExecutor getExecutor() {
	ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
	taskExecutor.setCorePoolSize(30);
	taskExecutor.setMaxPoolSize(30);
	taskExecutor.setQueueCapacity(50);
	taskExecutor.setThreadNamePrefix("Web");
	return taskExecutor;
}
  • 在创建WebAsyncTask时通过在构造函数中传入这个ThreadPoolTaskExecutor来获取线程
@Autowired
private ThreadPoolTaskExecutor executor;

@RequestMapping(value = "case4", method = RequestMethod.GET)
public WebAsyncTask<String> taskWithThreadPool() {
	
	System.err.println("The main Thread name is " + Thread.currentThread().getName());
	
	// 传入自定义的executor,并模拟开启一个异步任务
	WebAsyncTask<String> task1 = new WebAsyncTask<String>(10 * 1000L, executor, () -> {
		System.out.println("Current Thread name is " + Thread.currentThread().getName());
		Thread.sleep(5000L);
		return "task executed!";
	});
	
	// 任务执行完成时调用该方法
	task1.onCompletion(() -> {
		System.out.println("task finished!");
	});
	
	System.out.println("task can do other things!");
	return task1;
}

# 设置方法二

在Spring Boot中,你可以使用@Async注解和TaskExecutor来实现异步方法执行

  • 首先,在主配置类中启用异步支持。在SpringBootApplication类上添加@EnableAsync注解:
@EnableAsync
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
  • 通过 @Configuration 配置类来配置一个线程池
@Configuration
@EnableAsync
public class AppConfig {
    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        return executor;
    }
}
  • 创建一个接口,定义需要异步执行的方法:
public interface AsyncService {
    @Async
    void asyncMethod();
}
  • 实现接口,并添加@Async注解:
@Service
public class AsyncServiceImpl implements AsyncService {
    @Override
    @Async
    public void asyncMethod() {
        // 需要异步执行的代码
        System.out.println("Execute async method: " + Thread.currentThread().getName());
    }
}
  • 在其他类中调用异步方法
@RestController
public class MyController {
    @Autowired
    private AsyncService asyncService;

    @GetMapping("/executeAsync")
    public String executeAsync() {
        System.out.println("Start execution: " + Thread.currentThread().getName());
        asyncService.asyncMethod();
        System.out.println("End execution: " + Thread.currentThread().getName());
        return "Async method executed";
    }
}

WebAsyncTask和@Async注解的不同点

相较于@Async注解,WebAsyncTask的主要优势是提供了一组非常实用的回调方法,开发人员可以在执行过程完毕、超时以及发生异常时设置自定义的处理逻辑。

# 判断线程是否执行完毕

# 使用join()方法

Java中的Thread类提供了join()方法,用于等待线程执行完毕。调用join()方法会将当前线程阻塞,直到被调用的线程执行完成后才会继续执行

// 创建了一个新的线程,并在主线程中调用了join()方法
public class JoinExample implements Runnable {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new JoinExample());
        thread.start();
        thread.join();
        System.out.println("Thread execution completed.");
    }

    @Override
    public void run() {
        // 线程执行的逻辑
    }
}

# 使用CountDownLatch类

CountDownLatch是Java中的一个同步辅助类,它可以用来同步一个或多个线程的执行。我们可以通过创建一个CountDownLatch对象,并设置计数器的初始值,然后在线程执行完毕时调用countDown()方法减少计数器的值,最后使用await()方法等待计数器变为0

// 创建了一个CountDownLatch对象,并将其作为参数传递给线程对象
// 在线程执行完毕时,调用了countDown()方法减少计数器的值
// 主线程通过调用await()方法等待计数器变为0
public class CountDownLatchExample implements Runnable {
    private final CountDownLatch latch;

    public CountDownLatchExample(CountDownLatch latch) {
        this.latch = latch;
    }

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        Thread thread = new Thread(new CountDownLatchExample(latch));
        thread.start();
        latch.await();
        System.out.println("Thread execution completed.");
    }

    @Override
    public void run() {
        // 线程执行的逻辑
        latch.countDown();
    }
}

# 使用volatile关键字

在Java中,volatile关键字用于修饰变量,保证变量的可见性和禁止指令重排序。我们可以定义一个volatile类型的变量,在线程执行完毕时将其置为true,然后通过判断该变量的值来判断线程是否执行完毕

// 定义了一个volatile类型的变量finished,并将其初始值设置为false
// 在线程执行完毕时将其置为true。主线程通过循环判断finished的值来等待线程执行完毕
public class VolatileExample implements Runnable {
    private volatile boolean finished = false;

    public static void main(String[] args) throws InterruptedException {
        VolatileExample example = new VolatileExample();
        Thread thread = new Thread(example);
        thread.start();
        while (!example.finished) {
            Thread.sleep(100);
        }
        System.out.println("Thread execution completed.");
    }

    @Override
    public void run() {
        // 线程执行的逻辑
        finished = true;
    }
}