# 开启异步线程的几种方法

# 注解@Async

springboot框架的注解,使用时也有一些限制,@Async注解不能在类本身直接调用
在springboot框架中,可以使用单独的Service实现异步方法,然后在其他的类中调用该Service中的异步方法即可,具体如下:

  • 添加注解:在springboot的config中添加 @EnableAsync注解,开启异步线程功能
@Configuration
@EnableAsync
public class MyConfig {
    // 自己配置的Config
}
  • 创建异步方法Service和实现类,并且返回的类型一定是Future< T >,T是需要返回的值得类型
//Service类
public interface IExecuteService {
    /**
     * 一些耗时的操作,使用单独线程处理
     * 这里就简单写了一个sleep5秒的操作
     */
    @Async
    public Future<String> sleepingTest();
}

//Service实现类
@Service
public class ExecuteServiceImpl implements IExecuteService {
    private static final Logger log = LoggerFactory.getLogger(ExecuteServiceImpl.class);
    @Override
    public Future<String> sleepingTest() {
        log.info("SleepingTest start");
        try {
            Thread.sleep(5000);
			return "0"
        } catch (Exception e) {
            log.error("SleepingTest:" + e.toString());
			return "1"
        }
        log.info("SleepingTest end");
    }
}
  • 调用异步方法
@RestController
public class TestController {
    private static final Logger log = LoggerFactory.getLogger(TestController.class);
    @Autowired
    private IExecuteService executeService;
    @RequestMapping("/executeTask")
    public String executeTask() {
        log.info("executeTask Start!");
		//调用异步方法
		Future<String> result = executeService.sleepingTest();
		//写一个死循环 进行线程的调用
		while (true) {
		    //result.isDone() 表示方法执行结束
		    if (map == null && result.isDone()) {
		        //获取到我们原本需要的数据 返回后break
		        map = result.get();
		         break;
		     }
		}
        log.info("executeTask End!");
        return "executeTask";
    }
}

Future接口提供几个很方便的函数,判断线程是否完成

future.isDone():判断是否执行完毕
future.get():获取线程的返回值,同样可以返回异常, 子线程可以向上抛出异常给父级

while (true) {
	if (task1.isDone() && task2.isDone() && task3.isDone()) {
		break;
	}
}

// 或者
List<Future<String>> lResult = new ArrayList<>();
for (MultipartFile f : file) {
	Future<String> result = tbService.asyncUpload(f, tb.getNumber());
	lResult.add(result);
}
//异步执行是否完成
while (true){
	if(lResult.stream().allMatch(a->a.isDone())){
		break;
	}
}

# AsyncManager

使用AsyncManager方法,也是SpringBoot框架中带的任务管理器,可以实现异步线程

  • 创建AsyncManager类,这个在springboot框架中应该也是有的
/**
 * 异步任务管理器
 *
 * @author thcb
 */
public class AsyncManager {
    /**
     * 操作延迟10毫秒
     */
    private final int OPERATE_DELAY_TIME = 10;
    /**
     * 异步操作任务调度线程池
     */
    private ScheduledExecutorService executor = SpringUtils.getBean("scheduledService");
    /**
     * 单例模式
     */
    private AsyncManager() {
    }
    private static AsyncManager me = new AsyncManager();
    public static AsyncManager me() {
        return me;
    }
    /**
     * 执行任务
     *
     * @param task 任务
     */
    public void execute(TimerTask task) {
        executor.schedule(task, OPERATE_DELAY_TIME, TimeUnit.MILLISECONDS);
    }
    /**
     * 停止任务线程池
     */
    public void shutdown() {
        Threads.shutdownAndAwaitTermination(executor);
    }
}
  • 创建一个耗时的操作类
public TimerTask sleepingTest() {
	return new TimerTask() {
		@Override
		public void run() {
			// 耗时操作
			try {
				Thread.sleep(5000);
			} catch (Exception e) {
				log.error("SleepingTest:" + e.toString());
			}
		}
	};
}
  • 执行异步操作
AsyncManager.me().execute(sleepingTest());

# 线程池

使用线程池可以设定更多的参数,线程池在网上也有很多详细的介绍,在这我只介绍一种,带拒绝策略的线程池

  • 创建线程池
//线程池信息: 
//核心线程数量5,最大数量10,队列大小20
//超出核心线程数量的线程存活时间:30秒, 指定拒绝策略的
private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10,
   30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(20),new RejectedExecutionHandler(){
	@Override
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		log.error("有任务被拒绝执行了");
	}
});
  • 创建一个耗时的操作类
//由于线程池需要传入一个Runnable,所以此类继承Runnable,还是用sleep模拟耗时操作
static class MyTask implements Runnable {
	private int taskNum;
	public MyTask(int num) {
		this.taskNum = num;
	}
	@Override
	public void run() {
		System.out.println("正在执行task " + taskNum);
		try {
			Thread.sleep(4000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("task " + taskNum + "执行完毕");
	}
}
  • 执行线程池
//开启线程池,这里通过一个for循环模拟一下,可以看一下log输出
//有兴趣的可以修改一下for循环和sleep的数值,看看线程池具体的操作和拒绝流程
for (int i = 0; i < 20; i++) {
	MyTask myTask = new MyTask(i);
	threadPoolExecutor.execute(myTask);
	System.out.println("线程池中线程数目:" + threadPoolExecutor.getPoolSize() 
	      + ",队列中等待执行的任务数目:" +threadPoolExecutor.getQueue().size()
		  + ",已执行完别的任务数目:" + threadPoolExecutor.getCompletedTaskCount());
}
threadPoolExecutor.shutdown();

# SpringAsync

Spring Async的底层是JDK中的异步编程模型,所以在实战过程中,我们需要对JDK中原有的一些异步编程组件有足够的理解。

# 灵活使用多种Future机制

在使用Spring异步编程模型时,我们不可避免地需要使用到JDK中的Future机制。传统Future机制存在很多不完善之处,Java 8中引入了CompletableFuture对传统Future进行了优化。

开发人员可以直接通过CompletableFuture将异步执行结果交给另外一个异步线程来处理。这样在异步任务完成后,我们使用任务结果时则不需要等待。

CompletableFuture<String> completableFuture = new CompletableFuture<>();

一旦构建了CompletableFuture对象,就可以使用该对象的get()方法去获取一个Future对象。同时,我们也可以通过它的complete()方法去手工完成一个Future对象。

对CompletableFuture,通常有三种使用方法,分别针对不同的应用场景。

  • 第一种就是使用runAsync()方法异步执行任务,系统会启动一个新的线程,执行完后不会返回任何值
@RequestMapping(value = "/health_description")
public CompletableFuture<String> syncHealthDescription() {
	CompletableFuture<String> completableFuture = new CompletableFuture<>();
	CompletableFuture.runAsync(new Runnable() {
		@Override
		public void run() {
		   try {
			  completableFuture.complete(healthService.getHealthDescription().get());
		   } catch (InterruptedException | ExecutionException e) {
			  completableFuture.completeExceptionally(e);
		   }
		}
	});
	return completableFuture;
}
  • 如果想要在异步执行任务完成之后返回值,那么可以使用CompletableFuture的supplyAsync()方法
@RequestMapping(value = "/health_description")
public CompletableFuture<String> syncHealthDescription () {
	CompletableFuture.supplyAsync(new Supplier<String>() {
		@Override
		public String get() {
		   try {
			  return healthService.getHealthDescription().get();
		   } catch (InterruptedException | ExecutionException e) {
			  LOGGER.error(e);
		   }
		  return "No health description found";
		}
	});
	return completableFuture;
}
  • 基于CompletableFuture的典型场景使用回调。我们知道CompletableFuture.get()方法实际上也是需要等到Future执行完成之后才能返回

为了构建异步通知效果,系统势必需要一种回调机制,在Future执行完成之后能够自动执行一些回调处理。我们可以通过使用CompletableFuture所提供的thenApply()、thenAccept()和thenRun()方法来将回调添加到CompletableFuture中。这里以thenApply()为例给出示例代码。

public CompletableFuture<Long> thenApplyDemo() {
	CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
		@Override
		public Long get() {
			long firstValue = 10;
			System.out.println("firstValue="+firstValue);
			return firstValue;
		}
	}).thenApply(new Function<Long, Long>() {
		@Override
		public Long apply(Long t) {
			long secondValue = t*5;
			System.out.println("secondValue="+secondValue);
			return secondValue;
		}
	});
	return future;
}

上述示例代码的执行结果为50。我们通过thenApply()演示了第二个任务依赖第一个任务返回结果的处理场景,在第一个任务执行完返回值10时触发了第二个任务的执行逻辑,从而得到结果为10×5=50。

# 合理设置线程池

既然方法执行的过程使用了异步线程,你可能想知道我们如何声明异步方法来执行所使用的线程池。默认情况下,对于线程池而言,Spring将尝试查找容器中已定义的唯一TaskExecutor,或查找名为TaskExecutor的Executor类。如果这两种情况都没有找到对应的Executor,Spring将使用内置的SimpleAsyncTaskExecutor来处理异步方法的执行。

但是,有时候我们并不希望应用程序中的所有任务都使用相同的线程池,而是想要为不同的方法配置不同的线程池,这时候就需要为异步方法调用来声明一个线程池定义。在Spring中,我们可以创建一个类型为ThreadPoolTaskExecutor的Bean。

@Bean
public static ThreadPoolTaskExecutor getExecutor() {
  ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
  taskExecutor.setCorePoolSize(30);
  taskExecutor.setMaxPoolSize(30);
  taskExecutor.setQueueCapacity(50);
  taskExecutor.setThreadNamePrefix("Web");
  return taskExecutor;
}

然后,我们可以在创建WebAsyncTask时通过在构造函数中传入这个ThreadPoolTaskExecutor来获取线程

@Autowired
private ThreadPoolTaskExecutor executor;

@RequestMapping(value = "case4", method = RequestMethod.GET)
public WebAsyncTask<String> taskWithThreadPool() {
	System.err.println("The main Thread name is " + Thread.currentThread().getName());
	// 传入自定义的executor,并模拟开启一个异步任务
	WebAsyncTask<String> task1 = new WebAsyncTask<String>(10 * 1000L,executor, () -> {
		System.out.println("Current Thread name is " + Thread.currentThread().getName());
		Thread.sleep(5000L);
		return "task executed!";
	});
	// 任务执行完成时调用该方法
	task1.onCompletion(() -> {
		System.out.println("task finished!");
	});
	System.out.println("task can do other things!");
	return task1;
}

# WebAsyncTask和@Async注解的不同点

相较于@Async注解,WebAsyncTask的主要优势是提供了一组非常实用的回调方法,开发人员可以在执行过程完毕、超时以及发生异常时设置自定义的处理逻辑。

# @EnableAsync注解与@Async注解的关联关系

从定位上讲,@EnableAsync注解是一个全局性的注解,用于为应用程序启动异步编程模型;而@Async注解可以作用于类和方法之上,用于提供方法级别的异步执行流程。@EnableAsync注解的效果相当于通过一系列的配置类,为所有标有@Async注解的方法添加异步代理机制。

# Spring Boot异步执行,提高并发能力

在Spring Boot中,你可以使用@Async注解和TaskExecutor来实现异步方法执行。以下是如何在Spring Boot应用中实现异步方法执行的示例:

  • 首先,在主配置类中启用异步支持。在SpringBootApplication类上添加@EnableAsync注解:
@EnableAsync
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("Async-");
        executor.initialize();
        return executor;
    }
}
  • 创建一个接口,定义需要异步执行的方法:
public interface AsyncService {
    @Async
    void asyncMethod();
}
  • 实现接口,并添加@Async注解:
@Service
public class AsyncServiceImpl implements AsyncService {
    @Override
    @Async
    public void asyncMethod() {
        // 需要异步执行的代码
        System.out.println("Execute async method: " + Thread.currentThread().getName());
    }
}
  • 在其他类中调用异步方法
@RestController
public class MyController {
    @Autowired
    private AsyncService asyncService;

    @GetMapping("/executeAsync")
    public String executeAsync() {
        System.out.println("Start execution: " + Thread.currentThread().getName());
        asyncService.asyncMethod();
        System.out.println("End execution: " + Thread.currentThread().getName());
        return "Async method executed";
    }
}
  • 在浏览器中访问你会看到控制台打印的信息,说明异步方法已经在一个单独的线程中执行,而不会阻塞主线程。请注意,如果你需要在异步方法中返回值,可以使用Future或CompletableFuture类型作为方法返回类型。

# 判断线程是否执行完毕

# 使用join()方法

Java中的Thread类提供了join()方法,用于等待线程执行完毕。调用join()方法会将当前线程阻塞,直到被调用的线程执行完成后才会继续执行

//创建了一个新的线程,并在主线程中调用了join()方法
public class JoinExample implements Runnable {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new JoinExample());
        thread.start();
        thread.join();
        System.out.println("Thread execution completed.");
    }

    @Override
    public void run() {
        // 线程执行的逻辑
    }
}

# 使用CountDownLatch类

CountDownLatch是Java中的一个同步辅助类,它可以用来同步一个或多个线程的执行。我们可以通过创建一个CountDownLatch对象,并设置计数器的初始值,然后在线程执行完毕时调用countDown()方法减少计数器的值,最后使用await()方法等待计数器变为0

//创建了一个CountDownLatch对象,并将其作为参数传递给线程对象
//在线程执行完毕时,调用了countDown()方法减少计数器的值。主线程通过调用await()方法等待计数器变为0
public class CountDownLatchExample implements Runnable {
    private final CountDownLatch latch;

    public CountDownLatchExample(CountDownLatch latch) {
        this.latch = latch;
    }

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        Thread thread = new Thread(new CountDownLatchExample(latch));
        thread.start();
        latch.await();
        System.out.println("Thread execution completed.");
    }

    @Override
    public void run() {
        // 线程执行的逻辑
        latch.countDown();
    }
}

# 使用volatile关键字

在Java中,volatile关键字用于修饰变量,保证变量的可见性和禁止指令重排序。我们可以定义一个volatile类型的变量,在线程执行完毕时将其置为true,然后通过判断该变量的值来判断线程是否执行完毕

//定义了一个volatile类型的变量finished,并将其初始值设置为false
//在线程执行完毕时将其置为true。主线程通过循环判断finished的值来等待线程执行完毕
public class VolatileExample implements Runnable {
    private volatile boolean finished = false;

    public static void main(String[] args) throws InterruptedException {
        VolatileExample example = new VolatileExample();
        Thread thread = new Thread(example);
        thread.start();
        while (!example.finished) {
            Thread.sleep(100);
        }
        System.out.println("Thread execution completed.");
    }

    @Override
    public void run() {
        // 线程执行的逻辑
        finished = true;
    }
}