Spring WebFlux
基本介绍
学习文档
WebFlux的特点
- 完全非阻塞
- 支持响应式流的背压
- 在诸如 Netty、Undertow 和 Servlet 容器之类的服务器上运行
为什么需要 WebFlux?
- 用一个非阻塞 web stack 来处理少量线程的并发,并以较少的硬件资源进行扩展。
- 函数式编程
WebFlux 支持两种编程模式
- 基于注解的响应式编程
- 函数式路由和处理
Reactive Streams
Reactive Streams 的介绍 :Reactive Streams 是一种处理异步数据流的标准。
backpressure 背压
什么是背压
背压(backpressure):是一种流控制机制,允许消费者控制生产者的数据生成速度,以防止在数据生产速度超过消费速度时,消费者被压垮。
- Producer 生产者
- Consumer 消费者
- Subscription 生产者和消费者的一种契约,消费者可以通过它来控制生产者的数据生成速度。
背压的实现
- 消费者准备好接收数据时,会创建一个 Subscription ,并将其提供给生产者。这个 Subscription 包含了消费者可以接收的数据量。
- 生产者接收到了 Subscription 后,开始生成数据,但它只会生成和发送消费者指定的数据量。
- 当消费者处理完这些数据后,它会更新 Subscription 中的数据量,并将更新后的 Subscription 传递给生产者。生产者会根据新的数据量生成和发送数据。
- 这个过程会一直重复,直到消费者不再需要数据,或者生产者无法生成更多的数据。
背压与负反馈的区别?
负反馈(英语:negative feedback),是反馈的一种。是指系统的输出会影响系统的输入,在输出变动时,所造成的影响恰和原来变动的趋势相反; 反之,就称为正反馈。另一种说法是系统在一个条件变化时,系统会作出抵抗该变化的行为、变动持续减少。例如人的体温上升时会流汗,流汗会散热使体温下降,就是负反馈的一个例子;
背压是一种在数据流处理中的控制机制,主要用于处理生产者和消费者处理数据速度不匹配的问题。 消费者可以通过背压控制生产者的数据生成速度,防止在数据生产速度超过消费速度时,消费者被压垮。 背压主要关注的是数据流的速度控制。
负反馈是一种在控制系统中的控制机制,主要用于处理系统的稳定性问题。 负反馈通过将系统的输出与系统的输入进行比较,然后根据比较结果对系统的输入进行调整,使系统的输出更加稳定。 负反馈主要关注的是系统的稳定性。
Spring WebFlux 并发模型
调用阻塞API
Reactor 和 RxJava 提供了 publishOn 操作符,可以在不同的线程上继续处理。
publishOn 它可以改变数据流中后续操作符的执行上下文,即将一个 Publisher 的执行切换到另一个 Scheduler 上。
Flux.range(1, 10)
// 并行调度器(由 reactor 提供)
.publishOn(Schedulers.parallel())
.map(i -> i * i)
.subscribe(System.out::println);
Reactor 提供了以下几种类型的调度器(Schedulers):
- Schedulers.immediate():立即在当前线程执行任务,如果当前线程不能立即执行任务,那么它会创建一个新的线程来执行任务。
- Schedulers.single():创建一个只有一个线程的调度器,所有任务都在这个单一线程中执行。这个调度器适合用于不需要并发执行的长时间运行的任务。
- Schedulers.elastic():创建一个弹性的线程池,线程池的大小会根据需要动态调整。这个调度器适合用于 I/O 阻塞的场景。
- Schedulers.parallel():创建一个支持并行执行的调度器,线程池的大小默认等于处理器的数量。这个调度器适合用于 CPU 密集型的工作。
- Schedulers.fromExecutor(Executor executor):创建一个使用自定义 Java Executor 的调度器。
- Schedulers.newSingle(String name):创建一个新的单线程调度器。
- Schedulers.newParallel(String name, int parallelism):创建一个新的并行调度器,可以指定并行度。
- Schedulers.newElastic(String name, int ttlSeconds):创建一个新的弹性调度器,可以指定线程的生存时间。
可变的状态
在 Reactor 和 RxJava 这样的反应式编程库中,我们通过一系列的操作符来定义数据流的处理逻辑。这些操作符会在运行时形成一个 reactive pipeline,数据会按照这个 pipeline 中的操作符的顺序,依次在各个阶段进行处理。(避免并发问题。)
Flux.range(1, 5)
.map(i -> i * i) // 第一个操作符,用于计算平方
.filter(i -> i % 2 == 0) // 第二个操作符,用于过滤偶数
.subscribe(System.out::println); // 订阅并打印结果
这个 reactive pipeline 在运行时,数据会按照 map -> filter -> subscribe 的顺序依次处理。每个操作符都不会被并发调用,所以我们不需要担心并发问题。例如,即使 map 操作符在处理不同的数据时,它也不会被并发调用,所以我们不需要担心它的内部状态(例如一个计数器)会被并发修改。
线程模型
在使用 Spring WebFlux 运行的服务器上,你应该看到哪些线程?
- 如果你只使用 Spring WebFlux 的基础功能(没有使用如数据库访问等额外的功能),那么你的服务器大概会有一个线程用于服务器本身的运行,还有几个线程用于处理用户的请求,这几个线程的数量通常和你的 CPU 核心数一样多。但是,如果你的服务器是一个 Servlet 容器,比如 Tomcat,那么它可能会有更多的线程。这是因为 Servlet 容器需要同时支持传统的阻塞 I/O 和 Servlet 3.1 引入的非阻塞 I/O。阻塞 I/O 需要为每个请求分配一个线程,所以需要更多的线程。
- 响应式的 WebClint 操作是以事件循环的方式进行的。因此,看到的与之相关的线程数量是小且固定的。
- Reactor 和 RxJava 提供了线程池抽象,称为调度器(scheduler)
- 数据访问库和其他第三方依赖也可以创建和使用它们自己的线程。
配置
Spring 并不提供 start 和 stop 服务器的支持,如果使用的是 Spring Boot ,那么可以直接配置 WebClient。