# WebFlux简介
# 随着互联网应用的快速发展,传统的阻塞式 I/O 模型在高并发场景下暴露出性能瓶颈
# 每个请求都需要占用一个线程,导致线程资源消耗巨大,系统的伸缩性受限
# 为了解决这一问题,响应式编程 模型应运而生,旨在通过异步非阻塞的方式,提高系统的吞吐量和响应能力
# Spring Framework 5.0 中引入了 WebFlux,基于响应式编程的 Web 框架,满足对高并发、低延迟的需求
# WebFlux 使得开发者可使用少量线程处理大量并发请求,特别适用于I/密集型应用,如微服务中的网关服务
- 适用于处理大量并发请求、实时数据的场景,对于I/O密集型、高并发应用场景更为合适。
- 响应式编程之所以能用少数线程处理大量请求,核心在于事件循环(Event Loop)机制和非阻塞I/O模型
# Spring WebFlux 架构基于响应式编程理念,核心思想
- 非阻塞 I/O:利用 Reactor 提供的 Flux 和 Mono 类型,实现异步非阻塞的数据处理
- 背压支持:遵循 Reactive Streams 规范,支持背压机制,防止数据生产者压垮消费者
- 函数式编程:支持函数式编程风格,提供函数式路由和处理器定义方式
- 可插拔组件:各组件之间解耦,便于扩展和定制
# Servlet、SpringMVC 与 Spring WebFlux 三者对比

# Spring WebFlux 快速入门示例
- 依赖配置(使用 Maven)
<dependencies>
<!-- Spring Boot WebFlux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Reactor Test(可选,用于测试) -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
- 创建启动类
@SpringBootApplication
public class WebFluxApplication {
public static void main(String[] args) {
SpringApplication.run(WebFluxApplication.class, args);
}
}
- 创建 Controller
@RestController
@RequestMapping("/hello")
public class HelloController {
// Mono<T>:代表 0 或 1 个元素(异步)
@GetMapping
public Mono<String> sayHello(@RequestParam(defaultValue = "World") String name) {
return Mono.just("Hello, " + name + "!");
}
// Flux<T>:代表 0 到 N 个元素(异步流)
@GetMapping("/stream")
public Flux<String> streamMessages() {
return Flux.interval(Duration.ofSeconds(1))
.map(tick -> "Message " + tick)
.take(5);
}
}
- 运行 & 测试
# http://localhost:8080/hello → 返回 "Hello, World!"
# http://localhost:8080/hello?name=Alice → 返回 "Hello, Alice!"
# http://localhost:8080/hello/stream → 每秒返回一条消息,共 5 条(流式响应)
# WebFlux 的处理模式
在WebFlux中,响应式编程采用了发布订阅模式。这是一种用于处理异步数据流的模式,其中有一个发布者负责生成数据流,而一个或多个订阅者负责接收并处理这些数据
发布者(Publisher)
# 发布者是负责产生数据流的组件
# 在WebFlux中,发布者通常表示一个反应式流
# 可以是Mono(表示0或1个元素的流)或Flux(表示0到N个元素的流)
# 发布者负责将数据推送给订阅者,可以是一次性的数据,也可以是持续的数据流
订阅者(Subscriber)
# 订阅者是负责接收和处理数据的组件
# 在WebFlux中,订阅者通常表示一个订阅了Mono或Flux的处理器或回调函数
# 订阅者通过订阅发布者来表达对数据的兴趣,并在数据到达时接收和处理数据
# 订阅者可以定义处理数据的回调方法
# 如onNext(处理每个元素)、onError(处理错误)、onComplete(处理流的完成)等
订阅(Subscription)
# 订阅是发布者和订阅者之间的关联,表示订阅者对发布者的兴趣
# 订阅可以被取消,这种取消是一种解除订阅关系的操作
public class WebFluxExample {
public static void main(String[] args) {
// 创建一个Mono发布者
Mono<String> monoPublisher = Mono.create(monoSink -> {
// 发布者发出数据
monoSink.success("Hello, WebFlux!");
});
// 订阅者订阅发布者,并定义处理数据的回调方法
monoPublisher.subscribe(
// onNext回调,处理数据
data -> System.out.println("Received: " + data),
// onError回调,处理错误
error -> System.err.println("Error: " + error),
// onComplete回调,处理流的完成
() -> System.out.println("Processing completed")
);
}
}
# 函数式路由
- Spring WebFlux中,仍可使用@Controller和@Service注解,但引入了新的声明式路由方式-函数式路由
- 函数式路由使用Router Functions路由器函数 和Handler Functions处理器函数 来定义路由和处理逻辑
// RouterFunction 是用来路由的,相当于传统 @RequestMapping("/xxx")
// RouterFunctions.route() 相当于注册了 URL 到处理函数的映射
public interface RouterFunction<T extends ServerResponse> {
Mono<HandlerFunction<T>> route(ServerRequest request);
}
// 示例
@Configuration
public class RouterConfig {
@Bean
public RouterFunction<ServerResponse> route(HelloHandler handler) {
return RouterFunctions
.route(RequestPredicates.GET("/hello"), handler::hello)
.andRoute(RequestPredicates.GET("/hi"), handler::hello);
}
}
// HandlerFunction 就像传统 MVC 中的 @RequestMapping 方法,用来处理请求
// 入参是 ServerRequest
// 返回值是 Mono<ServerResponse>:表示异步响应
public interface HandlerFunction<T extends ServerResponse> {
Mono<T> handle(ServerRequest request);
}
// 示例:
public class HelloHandler {
public Mono<ServerResponse> hello(ServerRequest request) {
return ServerResponse.ok().bodyValue("Hello, WebFlux with Functional Style!");
}
}
// 整体示例
@Configuration
public class WebFluxRouter {
@Bean
public RouterFunction<ServerResponse> route() {
return RouterFunctions.route()
.POST("/hello", this::handleHelloRequest)
.build();
}
private Mono<ServerResponse> handleHelloRequest(ServerRequest request) {
Mono<String> requestBody = request.bodyToMono(String.class);
return requestBody.flatMap(body -> {
String responseData = "Hello, " + body;
return ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromValue(responseData));
});
}
}
# Mono 和 Flux 的使用
// Mono 用于处理 单一异步值 的场景,比如数据库查询结果、HTTP 请求返回值等:
Mono<String> mono = Mono.just("Hello Mono");
Mono.empty(); // 没有值
Mono.error(new RuntimeException("Error")); // 抛出异常
// Flux 用于处理 多个异步数据元素,如消息流、数据库多条记录等:
Flux<String> flux = Flux.just("A", "B", "C");
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)).take(5);
// 常用操作符(类似 Stream)
// 创建数据源
Mono.just("one")
Flux.fromIterable(Arrays.asList("A", "B"))
// 转换(map / flatMap)
Mono.just("hello")
.map(String::toUpperCase) // 转为大写
.flatMap(value -> Mono.just(value + " World")); // 拍平嵌套
// 过滤 / 条件
Flux.range(1, 10)
.filter(i -> i % 2 == 0) // 只保留偶数
// 错误处理
Mono.error(new RuntimeException("Oops"))
.onErrorResume(e -> Mono.just("Fallback"))
# 背压机制 Backpressure
背压机制是 Reactive Streams 的核心特性,Reactor 中也做了良好支持。通过 subscribe 方法可以指定如何处理数据和错误
flux.subscribe(
value -> System.out.println("接收到: " + value),
error -> System.err.println("错误: " + error),
() -> System.out.println("流结束")
);
# WebFlux 流式响应的实现(为什么能推送)
- 什么是流式响应?为什么会“推流”?
# 当你访问:http://localhost:8080/hello/stream
# 看到服务器每秒返回一条消息,共 5 条,这其实是服务端以 流的形式 持续发送数据给客户端的一种方式
# 这叫做:响应流(Streaming Response),也叫服务端推送(Server Push)的一种方式
# 它不是 WebSocket,而是基于 HTTP 协议长连接进行的响应拆分推送
- WebFlux 流式响应的实现(为什么能推送)
// 比如你写的接口是这样的
// 它返回的是一个 Flux<String>,表示这是一个“多值异步流”
@GetMapping(value = "/hello/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> helloStream() {
return Flux.interval(Duration.ofSeconds(1))
.take(5)
.map(i -> "hello " + i);
}
// 关键点:TEXT_EVENT_STREAM
// 这是关键,告诉 Spring WebFlux 用 SSE(Server-Sent Events) 格式输出响应
// 响应内容会被“流式写入”HTTP响应体,而不是等到 Flux 全部完成后才一次性返回
// 浏览器或客户端能立即显示每一条数据,不必等待
# 实际操作
- 假设我们有三个异步任务,分别是获取用户信息、获取商品信息和获取订单信息
@Service
public class AsyncService {
@Async
public CompletableFuture<String> getUserInfoAsync() {
// 模拟获取用户信息的异步操作
return CompletableFuture.completedFuture("User Info");
}
@Async
public CompletableFuture<String> getProductInfoAsync() {
// 模拟获取商品信息的异步操作
return CompletableFuture.completedFuture("Product Info");
}
@Async
public CompletableFuture<String> getOrderInfoAsync() {
// 模拟获取订单信息的异步操作
return CompletableFuture.completedFuture("Order Info");
}
}
- 注解驱动的实现方式
@RestController
@RequestMapping("/mvc")
public class AsyncController {
@Autowired
private AsyncService asyncService;
@GetMapping("/getData")
public ResponseEntity<String> getData() throws ExecutionException, InterruptedException
{
CompletableFuture<String> userFuture = asyncService.getUserInfoAsync();
CompletableFuture<String> proFuture = asyncService.getProductInfoAsync();
CompletableFuture<String> orderFuture = asyncService.getOrderInfoAsync();
// 等待所有异步任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(userFuture,
proFuture,
orderFuture);
allOf.join();
// 获取异步任务的结果
String userData = userFuture.get();
String proData = proFuture.get();
String orderData = orderFuture.get();
// 合并结果
String result = userData + "\n" + proData + "\n" + orderData;
return ResponseEntity.ok(result);
}
}
- 函数式路由实现方式
@Configuration
public class WebFluxRouter {
@Bean
public RouterFunction<ServerResponse> route(AsyncService asyncService) {
return RouterFunctions.route()
.GET("/webflux/getData", request -> handleGetData(request, asyncService))
.build();
}
private Mono<ServerResponse> handleGetData(ServerRequest req, AsyncService asyncService)
{
Mono<String> userMono = asyncService.getUserInfoAsync();
Mono<String> productMono = asyncService.getProductInfoAsync();
Mono<String> orderMono = asyncService.getOrderInfoAsync();
// 合并异步任务的结果
return Mono.zip(userMono, productMono, orderMono)
.flatMap(tuple -> {
String userData = tuple.getT1();
String productData = tuple.getT2();
String orderData = tuple.getT3();
String result = userData + "\n" + productData + "\n" + orderData;
// 返回合并后的结果
return ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromValue(result));
});
}
}