# WebFlux简介

WebFlux官网 (opens new window) 适用于需要处理大量并发请求、实时数据的场景,对于异步IO和非阻塞IO的应用场景更为合适

# WebFlux 和 SpringMVC

# 编程模型
Spring MVC 
# 采用的是传统的同步阻塞模型,基于Servlet API
# Controller中的处理方法是同步执行的,每个请求都会占用一个Servlet线程
# 如果线程阻塞,那么整个请求处理链也会被阻塞
WebFlux
# 采用了响应式编程模型,使用Reactor库来实现异步和非阻塞
# 它允许开发者使用函数式编程风格处理异步事件,能够更好地处理高并发和实时数据

# 容器支持
Spring MVC
# 主要运行在传统的Servlet容器上,如Tomcat、Jetty等
WebFlux
# 可以运行在传统的Servlet容器上,也可以运行在支持Reactive Streams的容器上,如Netty

# 路由方式
Spring MVC
# 采用基于注解的声明式路由,通过@Controller和@RequestMapping来定义
WebFlux
# 引入了一种新的声明式的路由方式,使用RouterFunctions和HandlerFunctions

# WebFlux 的处理模式

在WebFlux中,响应式编程采用了发布订阅模式。这是一种用于处理异步数据流的模式,其中有一个发布者负责生成数据流,而一个或多个订阅者负责接收并处理这些数据

发布者(Publisher)
# 发布者是负责产生数据流的组件
# 在WebFlux中,发布者通常表示一个反应式流
# 可以是Mono(表示0或1个元素的流)或Flux(表示0到N个元素的流)
# 发布者负责将数据推送给订阅者,可以是一次性的数据,也可以是持续的数据流

订阅者(Subscriber)
# 订阅者是负责接收和处理数据的组件
# 在WebFlux中,订阅者通常表示一个订阅了Mono或Flux的处理器或回调函数
# 订阅者通过订阅发布者来表达对数据的兴趣,并在数据到达时接收和处理数据
# 订阅者可以定义处理数据的回调方法
# 如onNext(处理每个元素)、onError(处理错误)、onComplete(处理流的完成)等

订阅(Subscription)
# 订阅是发布者和订阅者之间的关联,表示订阅者对发布者的兴趣
# 订阅可以被取消,这种取消是一种解除订阅关系的操作
public class WebFluxExample {

    public static void main(String[] args) {
        // 创建一个Mono发布者
        Mono<String> monoPublisher = Mono.create(monoSink -> {
            // 发布者发出数据
            monoSink.success("Hello, WebFlux!");
        });

        // 订阅者订阅发布者,并定义处理数据的回调方法
        monoPublisher.subscribe(
                // onNext回调,处理数据
                data -> System.out.println("Received: " + data),
                // onError回调,处理错误
                error -> System.err.println("Error: " + error),
                // onComplete回调,处理流的完成
                () -> System.out.println("Processing completed")
        );
    }
}

# 函数式路由

  • Spring WebFlux中,仍可使用@Controller和@Service注解,但引入了新的声明式路由方式-函数式路由
  • 函数式路由不依赖于@Controller和@Service注解,而是使用Router Functions(路由器函数)和Handler Functions(处理器函数)来定义路由和处理逻辑
@Configuration
public class WebFluxRouter {

    @Bean
    public RouterFunction<ServerResponse> route() {
        return RouterFunctions.route()
                .POST("/hello", this::handleHelloRequest)
                .build();
    }

    private Mono<ServerResponse> handleHelloRequest(ServerRequest request) {
        Mono<String> requestBody = request.bodyToMono(String.class);
        return requestBody.flatMap(body -> {
            String responseData = "Hello, " + body;
            return ServerResponse.ok()
                    .contentType(MediaType.TEXT_PLAIN)
                    .body(BodyInserters.fromValue(responseData));
        });
    }
}

# 实际操作

  • 假设我们有三个异步任务,分别是获取用户信息、获取商品信息和获取订单信息
@Service
public class AsyncService {

    @Async
    public CompletableFuture<String> getUserInfoAsync() {
        // 模拟获取用户信息的异步操作
        return CompletableFuture.completedFuture("User Info");
    }

    @Async
    public CompletableFuture<String> getProductInfoAsync() {
        // 模拟获取商品信息的异步操作
        return CompletableFuture.completedFuture("Product Info");
    }

    @Async
    public CompletableFuture<String> getOrderInfoAsync() {
        // 模拟获取订单信息的异步操作
        return CompletableFuture.completedFuture("Order Info");
    }
}
  • 注解驱动的实现方式
@RestController
@RequestMapping("/mvc")
public class AsyncController {

    @Autowired
    private AsyncService asyncService;

    @GetMapping("/getData")
    public ResponseEntity<String> getData() throws ExecutionException, InterruptedException {
        CompletableFuture<String> userFuture = asyncService.getUserInfoAsync();
        CompletableFuture<String> productFuture = asyncService.getProductInfoAsync();
        CompletableFuture<String> orderFuture = asyncService.getOrderInfoAsync();

        // 等待所有异步任务完成
        CompletableFuture<Void> allOf = CompletableFuture.allOf(userFuture, productFuture, orderFuture);
        allOf.join();

        // 获取异步任务的结果
        String userData = userFuture.get();
        String productData = productFuture.get();
        String orderData = orderFuture.get();

        // 合并结果
        String result = userData + "\n" + productData + "\n" + orderData;

        return ResponseEntity.ok(result);
    }
}
  • 函数式路由实现方式
@Configuration
public class WebFluxRouter {

    @Bean
    public RouterFunction<ServerResponse> route(AsyncService asyncService) {
        return RouterFunctions.route()
                .GET("/webflux/getData", request -> handleGetData(request, asyncService))
                .build();
    }

    private Mono<ServerResponse> handleGetData(ServerRequest request, AsyncService asyncService) {
        Mono<String> userMono = asyncService.getUserInfoAsync();
        Mono<String> productMono = asyncService.getProductInfoAsync();
        Mono<String> orderMono = asyncService.getOrderInfoAsync();

        // 合并异步任务的结果
        return Mono.zip(userMono, productMono, orderMono)
                .flatMap(tuple -> {
                    String userData = tuple.getT1();
                    String productData = tuple.getT2();
                    String orderData = tuple.getT3();

                    String result = userData + "\n" + productData + "\n" + orderData;

                    // 返回合并后的结果
                    return ServerResponse.ok()
                            .contentType(MediaType.TEXT_PLAIN)
                            .body(BodyInserters.fromValue(result));
                });
    }
}