# WebFlux简介
WebFlux官网 (opens new window) 适用于需要处理大量并发请求、实时数据的场景,对于异步IO和非阻塞IO的应用场景更为合适
# WebFlux 和 SpringMVC
# 编程模型
Spring MVC
# 采用的是传统的同步阻塞模型,基于Servlet API
# Controller中的处理方法是同步执行的,每个请求都会占用一个Servlet线程
# 如果线程阻塞,那么整个请求处理链也会被阻塞
WebFlux
# 采用了响应式编程模型,使用Reactor库来实现异步和非阻塞
# 它允许开发者使用函数式编程风格处理异步事件,能够更好地处理高并发和实时数据
# 容器支持
Spring MVC
# 主要运行在传统的Servlet容器上,如Tomcat、Jetty等
WebFlux
# 可以运行在传统的Servlet容器上,也可以运行在支持Reactive Streams的容器上,如Netty
# 路由方式
Spring MVC
# 采用基于注解的声明式路由,通过@Controller和@RequestMapping来定义
WebFlux
# 引入了一种新的声明式的路由方式,使用RouterFunctions和HandlerFunctions
# WebFlux 的处理模式
在WebFlux中,响应式编程采用了发布订阅模式。这是一种用于处理异步数据流的模式,其中有一个发布者负责生成数据流,而一个或多个订阅者负责接收并处理这些数据
发布者(Publisher)
# 发布者是负责产生数据流的组件
# 在WebFlux中,发布者通常表示一个反应式流
# 可以是Mono(表示0或1个元素的流)或Flux(表示0到N个元素的流)
# 发布者负责将数据推送给订阅者,可以是一次性的数据,也可以是持续的数据流
订阅者(Subscriber)
# 订阅者是负责接收和处理数据的组件
# 在WebFlux中,订阅者通常表示一个订阅了Mono或Flux的处理器或回调函数
# 订阅者通过订阅发布者来表达对数据的兴趣,并在数据到达时接收和处理数据
# 订阅者可以定义处理数据的回调方法
# 如onNext(处理每个元素)、onError(处理错误)、onComplete(处理流的完成)等
订阅(Subscription)
# 订阅是发布者和订阅者之间的关联,表示订阅者对发布者的兴趣
# 订阅可以被取消,这种取消是一种解除订阅关系的操作
public class WebFluxExample {
public static void main(String[] args) {
// 创建一个Mono发布者
Mono<String> monoPublisher = Mono.create(monoSink -> {
// 发布者发出数据
monoSink.success("Hello, WebFlux!");
});
// 订阅者订阅发布者,并定义处理数据的回调方法
monoPublisher.subscribe(
// onNext回调,处理数据
data -> System.out.println("Received: " + data),
// onError回调,处理错误
error -> System.err.println("Error: " + error),
// onComplete回调,处理流的完成
() -> System.out.println("Processing completed")
);
}
}
# 函数式路由
- Spring WebFlux中,仍可使用@Controller和@Service注解,但引入了新的声明式路由方式-函数式路由
- 函数式路由不依赖于@Controller和@Service注解,而是使用Router Functions(路由器函数)和Handler Functions(处理器函数)来定义路由和处理逻辑
@Configuration
public class WebFluxRouter {
@Bean
public RouterFunction<ServerResponse> route() {
return RouterFunctions.route()
.POST("/hello", this::handleHelloRequest)
.build();
}
private Mono<ServerResponse> handleHelloRequest(ServerRequest request) {
Mono<String> requestBody = request.bodyToMono(String.class);
return requestBody.flatMap(body -> {
String responseData = "Hello, " + body;
return ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromValue(responseData));
});
}
}
# 实际操作
- 假设我们有三个异步任务,分别是获取用户信息、获取商品信息和获取订单信息
@Service
public class AsyncService {
@Async
public CompletableFuture<String> getUserInfoAsync() {
// 模拟获取用户信息的异步操作
return CompletableFuture.completedFuture("User Info");
}
@Async
public CompletableFuture<String> getProductInfoAsync() {
// 模拟获取商品信息的异步操作
return CompletableFuture.completedFuture("Product Info");
}
@Async
public CompletableFuture<String> getOrderInfoAsync() {
// 模拟获取订单信息的异步操作
return CompletableFuture.completedFuture("Order Info");
}
}
- 注解驱动的实现方式
@RestController
@RequestMapping("/mvc")
public class AsyncController {
@Autowired
private AsyncService asyncService;
@GetMapping("/getData")
public ResponseEntity<String> getData() throws ExecutionException, InterruptedException {
CompletableFuture<String> userFuture = asyncService.getUserInfoAsync();
CompletableFuture<String> productFuture = asyncService.getProductInfoAsync();
CompletableFuture<String> orderFuture = asyncService.getOrderInfoAsync();
// 等待所有异步任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(userFuture, productFuture, orderFuture);
allOf.join();
// 获取异步任务的结果
String userData = userFuture.get();
String productData = productFuture.get();
String orderData = orderFuture.get();
// 合并结果
String result = userData + "\n" + productData + "\n" + orderData;
return ResponseEntity.ok(result);
}
}
- 函数式路由实现方式
@Configuration
public class WebFluxRouter {
@Bean
public RouterFunction<ServerResponse> route(AsyncService asyncService) {
return RouterFunctions.route()
.GET("/webflux/getData", request -> handleGetData(request, asyncService))
.build();
}
private Mono<ServerResponse> handleGetData(ServerRequest request, AsyncService asyncService) {
Mono<String> userMono = asyncService.getUserInfoAsync();
Mono<String> productMono = asyncService.getProductInfoAsync();
Mono<String> orderMono = asyncService.getOrderInfoAsync();
// 合并异步任务的结果
return Mono.zip(userMono, productMono, orderMono)
.flatMap(tuple -> {
String userData = tuple.getT1();
String productData = tuple.getT2();
String orderData = tuple.getT3();
String result = userData + "\n" + productData + "\n" + orderData;
// 返回合并后的结果
return ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromValue(result));
});
}
}