# WebFlux简介

WebFlux官网 (opens new window)

# 随着互联网应用的快速发展,传统的阻塞式 I/O 模型在高并发场景下暴露出性能瓶颈
# 每个请求都需要占用一个线程,导致线程资源消耗巨大,系统的伸缩性受限
# 为了解决这一问题,响应式编程 模型应运而生,旨在通过异步非阻塞的方式,提高系统的吞吐量和响应能力
# Spring Framework 5.0 中引入了 WebFlux,基于响应式编程的 Web 框架,满足对高并发、低延迟的需求
# WebFlux 使得开发者可使用少量线程处理大量并发请求,特别适用于I/密集型应用,如微服务中的网关服务
  • 适用于处理大量并发请求、实时数据的场景,对于I/O密集型、高并发应用场景更为合适。
  • 响应式编程之所以能用少数线程处理大量请求,核心在于事件循环(Event Loop)机制和非阻塞I/O模型

# Spring WebFlux 架构基于响应式编程理念,核心思想

  • 非阻塞 I/O:利用 Reactor 提供的 Flux 和 Mono 类型,实现异步非阻塞的数据处理
  • 背压支持:遵循 Reactive Streams 规范,支持背压机制,防止数据生产者压垮消费者
  • 函数式编程:支持函数式编程风格,提供函数式路由和处理器定义方式
  • 可插拔组件:各组件之间解耦,便于扩展和定制

# Servlet、SpringMVC 与 Spring WebFlux 三者对比

Servlet、SpringMVC 与 Spring WebFlux 三者对比

# Spring WebFlux 快速入门示例

  • 依赖配置(使用 Maven)
<dependencies>
    <!-- Spring Boot WebFlux -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <!-- Reactor Test(可选,用于测试) -->
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
  • 创建启动类
@SpringBootApplication
public class WebFluxApplication {
    public static void main(String[] args) {
        SpringApplication.run(WebFluxApplication.class, args);
    }
}
  • 创建 Controller




 





 








@RestController
@RequestMapping("/hello")
public class HelloController {
	
	// Mono<T>:代表 0 或 1 个元素(异步)
	@GetMapping
	public Mono<String> sayHello(@RequestParam(defaultValue = "World") String name) {
		return Mono.just("Hello, " + name + "!");
	}

    // Flux<T>:代表 0 到 N 个元素(异步流)
	@GetMapping("/stream")
	public Flux<String> streamMessages() {
		return Flux.interval(Duration.ofSeconds(1))
					.map(tick -> "Message " + tick)
					.take(5);
	}
}
  • 运行 & 测试
# http://localhost:8080/hello → 返回 "Hello, World!"
# http://localhost:8080/hello?name=Alice → 返回 "Hello, Alice!"
# http://localhost:8080/hello/stream → 每秒返回一条消息,共 5 条(流式响应)

# WebFlux 的处理模式

在WebFlux中,响应式编程采用了发布订阅模式。这是一种用于处理异步数据流的模式,其中有一个发布者负责生成数据流,而一个或多个订阅者负责接收并处理这些数据

发布者(Publisher)
# 发布者是负责产生数据流的组件
# 在WebFlux中,发布者通常表示一个反应式流
# 可以是Mono(表示0或1个元素的流)或Flux(表示0到N个元素的流)
# 发布者负责将数据推送给订阅者,可以是一次性的数据,也可以是持续的数据流

订阅者(Subscriber)
# 订阅者是负责接收和处理数据的组件
# 在WebFlux中,订阅者通常表示一个订阅了Mono或Flux的处理器或回调函数
# 订阅者通过订阅发布者来表达对数据的兴趣,并在数据到达时接收和处理数据
# 订阅者可以定义处理数据的回调方法
# 如onNext(处理每个元素)、onError(处理错误)、onComplete(处理流的完成)等

订阅(Subscription)
# 订阅是发布者和订阅者之间的关联,表示订阅者对发布者的兴趣
# 订阅可以被取消,这种取消是一种解除订阅关系的操作
public class WebFluxExample {

    public static void main(String[] args) {
        // 创建一个Mono发布者
        Mono<String> monoPublisher = Mono.create(monoSink -> {
            // 发布者发出数据
            monoSink.success("Hello, WebFlux!");
        });

        // 订阅者订阅发布者,并定义处理数据的回调方法
        monoPublisher.subscribe(
                // onNext回调,处理数据
                data -> System.out.println("Received: " + data),
                // onError回调,处理错误
                error -> System.err.println("Error: " + error),
                // onComplete回调,处理流的完成
                () -> System.out.println("Processing completed")
        );
    }
}

# 函数式路由

  • Spring WebFlux中,仍可使用@Controller和@Service注解,但引入了新的声明式路由方式-函数式路由
  • 函数式路由使用Router Functions路由器函数 和Handler Functions处理器函数 来定义路由和处理逻辑
// RouterFunction 是用来路由的,相当于传统 @RequestMapping("/xxx")
// RouterFunctions.route() 相当于注册了 URL 到处理函数的映射
public interface RouterFunction<T extends ServerResponse> {
    Mono<HandlerFunction<T>> route(ServerRequest request);
}

// 示例
@Configuration
public class RouterConfig {
    @Bean
    public RouterFunction<ServerResponse> route(HelloHandler handler) {
        return RouterFunctions
                .route(RequestPredicates.GET("/hello"), handler::hello)
                .andRoute(RequestPredicates.GET("/hi"), handler::hello);
     }
}

// HandlerFunction 就像传统 MVC 中的 @RequestMapping 方法,用来处理请求
// 入参是 ServerRequest
// 返回值是 Mono<ServerResponse>:表示异步响应
public interface HandlerFunction<T extends ServerResponse> {
	Mono<T> handle(ServerRequest request);
}

// 示例:
public class HelloHandler {
    public Mono<ServerResponse> hello(ServerRequest request) {
        return ServerResponse.ok().bodyValue("Hello, WebFlux with Functional Style!");
    }
}

// 整体示例
@Configuration
public class WebFluxRouter {

    @Bean
    public RouterFunction<ServerResponse> route() {
        return RouterFunctions.route()
                .POST("/hello", this::handleHelloRequest)
                .build();
    }

    private Mono<ServerResponse> handleHelloRequest(ServerRequest request) {
        Mono<String> requestBody = request.bodyToMono(String.class);
        return requestBody.flatMap(body -> {
            String responseData = "Hello, " + body;
            return ServerResponse.ok()
                    .contentType(MediaType.TEXT_PLAIN)
                    .body(BodyInserters.fromValue(responseData));
        });
    }
}

# Mono 和 Flux 的使用

// Mono 用于处理 单一异步值 的场景,比如数据库查询结果、HTTP 请求返回值等:
Mono<String> mono = Mono.just("Hello Mono");
Mono.empty(); // 没有值
Mono.error(new RuntimeException("Error")); // 抛出异常

// Flux 用于处理 多个异步数据元素,如消息流、数据库多条记录等:
Flux<String> flux = Flux.just("A", "B", "C");
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)).take(5);

// 常用操作符(类似 Stream)

// 创建数据源
Mono.just("one")
Flux.fromIterable(Arrays.asList("A", "B"))

// 转换(map / flatMap)
Mono.just("hello")
    .map(String::toUpperCase)    // 转为大写
    .flatMap(value -> Mono.just(value + " World")); // 拍平嵌套

// 过滤 / 条件
Flux.range(1, 10)
    .filter(i -> i % 2 == 0) // 只保留偶数

// 错误处理
Mono.error(new RuntimeException("Oops"))
    .onErrorResume(e -> Mono.just("Fallback"))

# 背压机制 Backpressure

背压机制是 Reactive Streams 的核心特性,Reactor 中也做了良好支持。通过 subscribe 方法可以指定如何处理数据和错误

flux.subscribe(
    value -> System.out.println("接收到: " + value),
    error -> System.err.println("错误: " + error),
    () -> System.out.println("流结束")
);

# WebFlux 流式响应的实现(为什么能推送)

  • 什么是流式响应?为什么会“推流”?
# 当你访问:http://localhost:8080/hello/stream

# 看到服务器每秒返回一条消息,共 5 条,这其实是服务端以 流的形式 持续发送数据给客户端的一种方式
# 这叫做:响应流(Streaming Response),也叫服务端推送(Server Push)的一种方式
# 它不是 WebSocket,而是基于 HTTP 协议长连接进行的响应拆分推送
  • WebFlux 流式响应的实现(为什么能推送)
// 比如你写的接口是这样的
// 它返回的是一个 Flux<String>,表示这是一个“多值异步流”
@GetMapping(value = "/hello/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> helloStream() {
      return Flux.interval(Duration.ofSeconds(1))
                 .take(5)
                 .map(i -> "hello " + i);
}

// 关键点:TEXT_EVENT_STREAM
// 这是关键,告诉 Spring WebFlux 用 SSE(Server-Sent Events) 格式输出响应
// 响应内容会被“流式写入”HTTP响应体,而不是等到 Flux 全部完成后才一次性返回
// 浏览器或客户端能立即显示每一条数据,不必等待

# 实际操作

  • 假设我们有三个异步任务,分别是获取用户信息、获取商品信息和获取订单信息
@Service
public class AsyncService {

    @Async
    public CompletableFuture<String> getUserInfoAsync() {
        // 模拟获取用户信息的异步操作
        return CompletableFuture.completedFuture("User Info");
    }

    @Async
    public CompletableFuture<String> getProductInfoAsync() {
        // 模拟获取商品信息的异步操作
        return CompletableFuture.completedFuture("Product Info");
    }

    @Async
    public CompletableFuture<String> getOrderInfoAsync() {
        // 模拟获取订单信息的异步操作
        return CompletableFuture.completedFuture("Order Info");
    }
}
  • 注解驱动的实现方式
@RestController
@RequestMapping("/mvc")
public class AsyncController {

    @Autowired
    private AsyncService asyncService;

    @GetMapping("/getData")
    public ResponseEntity<String> getData() throws ExecutionException, InterruptedException
	{
        CompletableFuture<String> userFuture = asyncService.getUserInfoAsync();
        CompletableFuture<String> proFuture = asyncService.getProductInfoAsync();
        CompletableFuture<String> orderFuture = asyncService.getOrderInfoAsync();

        // 等待所有异步任务完成
        CompletableFuture<Void> allOf = CompletableFuture.allOf(userFuture, 
		                                                        proFuture, 
																orderFuture);
        allOf.join();

        // 获取异步任务的结果
        String userData = userFuture.get();
        String proData = proFuture.get();
        String orderData = orderFuture.get();

        // 合并结果
        String result = userData + "\n" + proData + "\n" + orderData;

        return ResponseEntity.ok(result);
    }
}
  • 函数式路由实现方式
@Configuration
public class WebFluxRouter {

    @Bean
    public RouterFunction<ServerResponse> route(AsyncService asyncService) {
        return RouterFunctions.route()
                .GET("/webflux/getData", request -> handleGetData(request, asyncService))
                .build();
    }

    private Mono<ServerResponse> handleGetData(ServerRequest req, AsyncService asyncService)
    {    
		Mono<String> userMono = asyncService.getUserInfoAsync();
        Mono<String> productMono = asyncService.getProductInfoAsync();
        Mono<String> orderMono = asyncService.getOrderInfoAsync();

        // 合并异步任务的结果
        return Mono.zip(userMono, productMono, orderMono)
                .flatMap(tuple -> {
                    String userData = tuple.getT1();
                    String productData = tuple.getT2();
                    String orderData = tuple.getT3();

                    String result = userData + "\n" + productData + "\n" + orderData;

                    // 返回合并后的结果
                    return ServerResponse.ok()
                            .contentType(MediaType.TEXT_PLAIN)
                            .body(BodyInserters.fromValue(result));
                });
    }
}