Spring WebFlux
基本介绍
学习文档
WebFlux的特点
- 完全非阻塞
- 支持响应式流的背压
- 在诸如 Netty、Undertow 和 Servlet 容器之类的服务器上运行
为什么需要 WebFlux?
- 用一个非阻塞 web stack 来处理少量线程的并发,并以较少的硬件资源进行扩展。
- 函数式编程
WebFlux 支持两种编程模式
- 基于注解的响应式编程
- 函数式路由和处理
Reactive Streams
Reactive Streams 的介绍 :Reactive Streams 是一种处理异步数据流的标准。
backpressure 背压
什么是背压
背压(backpressure):是一种流控制机制,允许消费者控制生产者的数据生成速度,以防止在数据生产速度超过消费速度时,消费者被压垮。
- Producer 生产者
- Consumer 消费者
- Subscription 生产者和消费者的一种契约,消费者可以通过它来控制生产者的数据生成速度。
背压的实现
- 消费者准备好接收数据时,会创建一个 Subscription ,并将其提供给生产者。这个 Subscription 包含了消费者可以接收的数据量。
- 生产者接收到了 Subscription 后,开始生成数据,但它只会生成和发送消费者指定的数据量。
- 当消费者处理完这些数据后,它会更新 Subscription 中的数据量,并将更新后的 Subscription 传递给生产者。生产者会根据新的数据量生成和发送数据。
- 这个过程会一直重复,直到消费者不再需要数据,或者生产者无法生成更多的数据。
背压与负反馈的区别?
负反馈(英语:negative feedback),是反馈的一种。是指系统的输出会影响系统的输入,在输出变动时,所造成的影响恰和原来变动的趋势相反; 反之,就称为正反馈。另一种说法是系统在一个条件变化时,系统会作出抵抗该变化的行为、变动持续减少。例如人的体温上升时会流汗,流汗会散热使体温下降,就是负反馈的一个例子;
背压是一种在数据流处理中的控制机制,主要用于处理生产者和消费者处理数据速度不匹配的问题。 消费者可以通过背压控制生产者的数据生成速度,防止在数据生产速度超过消费速度时,消费者被压垮。 背压主要关注的是数据流的速度控制。
负反馈是一种在控制系统中的控制机制,主要用于处理系统的稳定性问题。 负反馈通过将系统的输出与系统的输入进行比较,然后根据比较结果对系统的输入进行调整,使系统的输出更加稳定。 负反馈主要关注的是系统的稳定性。
Spring WebFlux 并发模型
调用阻塞API
Reactor 和 RxJava 提供了 publishOn 操作符,可以在不同的线程上继续处理。
publishOn 它可以改变数据流中后续操作符的执行上下文,即将一个 Publisher 的执行切换到另一个 Scheduler 上。
Flux.range(1, 10)
// 并行调度器(由 reactor 提供)
.publishOn(Schedulers.parallel())
.map(i -> i * i)
.subscribe(System.out::println);
Reactor 提供了以下几种类型的调度器(Schedulers):
- Schedulers.immediate():立即在当前线程执行任务,如果当前线程不能立即执行任务,那么它会创建一个新的线程来执行任务。
- Schedulers.single():创建一个只有一个线程的调度器,所有任务都在这个单一线程中执行。这个调度器适合用于不需要并发执行的长时间运行的任务。
- Schedulers.elastic():创建一个弹性的线程池,线程池的大小会根据需要动态调整。这个调度器适合用于 I/O 阻塞的场景。
- Schedulers.parallel():创建一个支持并行执行的调度器,线程池的大小默认等于处理器的数量。这个调度器适合用于 CPU 密集型的工作。
- Schedulers.fromExecutor(Executor executor):创建一个使用自定义 Java Executor 的调度器。
- Schedulers.newSingle(String name):创建一个新的单线程调度器。
- Schedulers.newParallel(String name, int parallelism):创建一 个新的并行调度器,可以指定并行度。
- Schedulers.newElastic(String name, int ttlSeconds):创建一个新的弹性调度器,可以指定线程的生存时间。
可变的状态
在 Reactor 和 RxJava 这样的反应式编程库中,我们通过一系列的操作符来定义数据流的处理逻辑。这些操作符会在运行时形成一个 reactive pipeline,数据会按照这个 pipeline 中的操作符的顺序,依次在各个阶段进行处理。(避免并发问题。)
Flux.range(1, 5)
.map(i -> i * i) // 第一个操作符,用于计算平方
.filter(i -> i % 2 == 0) // 第二个操作符,用于过滤偶数
.subscribe(System.out::println); // 订阅并打印结果
这个 reactive pipeline 在运行时,数据会按照 map -> filter -> subscribe 的顺序依次处理。每个操作符都不会被并发调用,所以我们不需要担心并发问题。例如,即使 map 操作符在处理不同的数据时,它也不会被并发调用,所以我们不需要担心它的内部状态(例如一个计数器)会被并发修改。
线程模型
在使用 Spring WebFlux 运行的服务器上,你应该看到哪些线程?
- 如果你只使用 Spring WebFlux 的基础功能(没有使用如数据库访问等额外的功能),那么你的服务器大概会有一个线程用于服务器本身的运行,还有几个线程用于处理用户的请求,这几个线程的数量通常和你的 CPU 核心数一样多。但是,如果你的服务器是一个 Servlet 容器,比如 Tomcat,那么它可能会有更多的线程。这是因为 Servlet 容器需要同时支持传统的阻塞 I/O 和 Servlet 3.1 引入的非阻塞 I/O。阻塞 I/O 需要为每个请求分配一个线程,所以需要更多的线程。
- 响应式的 WebClint 操作是以事件循环的方式进行的。因此,看到的与之相关的线程数量是小且固定的。
- Reactor 和 RxJava 提供了线程池抽象,称为调度器(scheduler)
- 数据访问库和其他第三方依赖也可以创建和使用它们自己的线程。
配置
Spring 并不提供 start 和 stop 服务器的支持,如果使用的是 Spring Boot ,那么可以直接配置 WebClient。
Reactive 核心
- Server
- HttpHandle
- WebHandle API
- Client
- ClientHttpConnector
WebClient webClient = WebClient.create("http://localhost:8080");
Mono<String> result = webClient.get()
.uri("/hello")
.retrieve()
.bodyToMono(String.class);
HttpHandle
Netty
Tomcat
HttpHandler handler = ...
Servlet servlet = new TomcatHttpHandlerAdapter(handler);
Tomcat server = new Tomcat();
File base = new File(System.getProperty("java.io.tmpdir"));
Context rootContext = server.addContext("", base.getAbsolutePath());
Tomcat.addServlet(rootContext, "main", servlet);
rootContext.addServletMappingDecoded("/", "main");
server.setHost(host);
server.setPort(port);
server.start();
WebHandler API
package: org.springframework.web.server
通过多个 WebExceptionHandler、多个 WebFilter、单个WebHandler 链处理请求。(通过 WebHttpHandlerBuilder 组装)
- WebExceptionHandler 处理请求过程中出现的异常(根据注册顺序来处理异常)
- WebFilter 对请求进行预处理(根据注册顺序来处理请求)
- WebHandler 处理请求,在一个请求处理链中,只能有一个 WebHandler
WebHandler 的作用
- User session with attributes.
- Request attributes.
- Resolved Locale or principal for the request.
- Access to parsed and cached form data.
- Abstractions for multipart data.
- and more...
Filter
在 Spring WebFlux中,WebFilter 是一个用于处理 HTTP 请求的接口,它可以在请求被 WebHandler 处理之前和之后执行一些逻辑。 这种逻辑可以包括修改请求或响应、添加日志、添加安全检查等。
CORS
内置了 CorsWebFilter.
Exceptions
使用 WebExceptionHandler(接口) 来处理 WebHandler 处理链的异常。
WebExceptionHandler 的实现类
- ResponseStatusExceptionHandler 处理 ResponseStatusException 异常,会将响应的HTTP状态码设置为异常的状态码。
- WebFluxResponseStatusExceptionHandler 是ResponseStatusExceptionHandler的扩展。除了处理ResponseStatusException,它还可以处理带有@ResponseStatus注解的异常。当抛出这种异常时,WebFluxResponseStatusExceptionHandler会将响应的HTTP状态码设置为注解的状态码。
Codecs
- Encoder Decoder: 处理HTTP请求和响应的数据的关键组件。编解码器可以将数据从一种格式转换为另一种格式。例如,一个编解码器可以将JSON数据转换为Java对象,或者将Java对象转换为JSON数据。
- HttpMessageReader HttpMessageWriter: 定义了如何读取和写入HTTP消息。你可以使用这些接口来处理特定类型的HTTP消息,例如表单数据、多部分内容、服务器发送的事件等。
- EncoderHttpMessageWriter DecoderHttpMessageReader:
- DataBuffer: 所有的编解码器都在DataBuffer上工作,DataBuffer是一个抽象的字节缓冲区表示,它可以表示多种类型的 字节缓冲区,例如Netty的ByteBuf、Java的ByteBuffer等。
Jackson JSON
Jackson2Decoder
- Jackson 的异步、非阻塞处理器将字节块流转换成 TokenBuffer,每一个 TokenBuffer 代表一个 JSON 对象。
- 每一个 TokenBuffer 对象通过 ObjectMapper 转换成更高级的对象。
- 解码 single-value 时,只有一个 TokenBuffer.
- 解码 multi-value 时,有多个 TokenBuffer.
Jackson2Encoder
- single-value 使用 ObjectMapper
- multi-value(application/json) 使用 Flux#collectToList()
- multi-value(application/x-ndjson or application/stream+x-jackson-simle)
Form Data
FormHttpMessageReader FormHttpMessageWriter 支持编码和解码 application/x-www-form-urlencoded 的内容
一旦你使用了 getFormData()方法,原始的请求体就不能再被读取了。这是因为getFormData() 方法会将请求体中的数据读取到一个缓存中,然后返回这个缓存。一旦数据被读取到缓存中,原始的请求体就会被清空。
Multipart
MultipartHttpMessageReader MultipartHttpMessageWriter 支持编码和解码 multipart/form-data multipart/mixed multipart/related 的内容.
一旦你使用了 getMultipartData()方法,原始的请求体就不能再被读取了。这是因为 getMultipartData() 方法会将请求体中的数据读取到一个缓存中,然后返回这个缓存。一旦数据被读取到缓存中,原始的请求体就会被清空。
Limits
Streaming
DataBuffer
日志
Log id
org.springframework.web.server.ServerWebExchange.getLogPrefix
Sensitive Data
@Configuration
@EnableWebFlux
class MyConfig implements WebFluxConfigurer {
@Override
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
configurer.defaultCodecs().enableLoggingRequestDetails(true);
}
}
Consumer<ClientCodecConfigurer> consumer = configurer ->
configurer.defaultCodecs().enableLoggingRequestDetails(true);
WebClient webClient = WebClient.builder()
.exchangeStrategies(strategies -> strategies.codecs(consumer))
.build();
Appenders
Custom codecs
WebClient webClient = WebClient.builder()
.codecs(configurer -> {
CustomDecoder decoder = new CustomDecoder();
configurer.customCodecs().registerWithDefaultConfig(decoder);
})
.build();
DispatcherHandler
front controller (前端控制器模式)
central WebHandler: DispatchHandler, 为请求处理 提供一个共享算法。
WebHttpHandlerBuilder
ApplicationContext context = ...
HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
Special Bean Types
实现了 WebFlux 框架协议的 Spring 管理的对象实例。
- HandleMapping 将请求映射到处理器,返回 Mono 或 Flux 类型
- HandlerAdapter 适配不同类型的处理器
- HandlerResultHandler 处理控制器方法的返回值。在响应式编程模型中,它处理的通常是Mono或Flux类型的返回值,将这些返回值转换为发送给客户端的响应。
WebFlux Config
Processing
- Each HandlerMapping is asked to find a matching handler, and the first match is used.
- If a handler is found, it is run through an appropriate HandlerAdapter, which exposes the return value from the execution as HandlerResult.
- The HandlerResult is given to an appropriate HandlerResultHandler to complete processing by writing to the response directly or by using a view to render.
Result Handling
HandlerAdapter -> HandleResult -> HandlerResultHandler.
- ResponseEntityResultHandler
- ServerResponseResultHandler
- ResponseBodyResultHandler
- ViewResolutionResultHandler
Exceptions
HandlerAdapter 的实现版本能够在内部处理在调用请求处理器(如控制器方法)时产生的异常。
View Resolution
HandlerResultHandler
Handling
传递给 ViewResolutionResultHandler 的 HandlerResult 包含 handler 返回值和在请求处理期间添加了属性的 model 。
返回值是下面的其中一个
- String,CharSequence : 逻辑视图的名称,通过一系列配置的 ViewResolver 解析成一个 View。
- void:基于请求路径(URL)选择一个默认视图名称。
- Rendering:API for view resolution scenarios
- Model,Map:额外的 model attributes 性将被添加到该请求的 model 中。
- ohters
Redirecting
在视图名称 前加入 redirect:
前缀。
UrlBasedViewResolver
Rendering.redirectTo("abc").build()
Content Negotiation
ViewResolutionResultHandler
HttpMessageWriterView
Annotation Controller
Spring WebFlux 提供了一个基于注解的编程模型
@RestController
public class HelloController {
@GetMapping("/hello")
public String handle() {
return "Hello WebFlux";
}
}
@Controller
为了确保 @Controller 被正确使用,需要这个 Bean 能被扫描
@Configuration
@ComponentScan("org.example.web")
public class WebConfig {
// ...
}
@RestControler = @Controller + @ResponseBody
AOP Proxies
for example: @Transactional
@EnableTransactionManagement
Mapping Requests
@RequestMapping
可以匹配 URL, HTTP method, request parameters, headers, and media types.
variants
- @GetMapping
- @PostMapping
- @PutMapping
- @DeleteMapping
- @PatchMapping
URI Patterns
?
匹配一个字符(0..1)*
在一个 Ptah segment 中匹配任意字符(0..n) ,"/resources/*.png"
可匹配"/resources/file.png"
但是不匹配"/resources/folder/file.png"
**
匹配一个或多个 path segment 直到 path 结束。"/resources/**"
可匹配"/resources/file.png"
也可以匹配"/resources/folder/file.png"
{name}
匹配一个 path segment 并且捕获它,作为一个 name 的变量。"/projects/{project}/versions"
可匹配"/projects/spring-web/versions"
并且捕获"spring-web"
作为project
变量的值。{name:[a-z]+}
匹配正则a-z
作为 path variable 的值。"/projects/{project:[a-z]+}/versions"
可匹配"/projects/spring/versions"
但不匹配"/projects/spring1/versions"
{*path}
匹配零个或多个 path segment 直到 path 结束,并将其捕获为 path 变量的值。"/resources/{*file}"
匹配"/resources/images/file.png"
and captures file=/images/file.png
URI 变量会被自动转成合适类型,或者抛出 TypeMismatchException 异常。
@Controller
@RequestMapping("/owners/{ownerId}") // Class-level URI
public class OwnerController {
@GetMapping("/pets/{petId}") // Method-level URI
public Pet findPet(@PathVariable Long ownerId, @PathVariable Long petId) {
// ...
}
}
// spring-web-3.0.5.jar
// version 3.0.5
// ext .jar
@GetMapping("/{name:[a-z-]+}-{version:\\d\\.\\d\\.\\d}{ext:\\.[a-z]+}")
public void handle(@PathVariable String version, @PathVariable String ext) {
// ...
}
Pattern Comparison
当多个模式与 URL 匹配时,必须对它们进行比较以找到最佳匹配。
PathPattern.SPECIFICITY_COMPARATOR
Consumable Media Types
指定 Content-Type
@PostMapping(path = "/pets", consumes = "application/json")
// @PostMapping(path = "/pets", consumes = "!application/json") 表示非 application/json
public void addPet(@RequestBody Pet pet) {
// ...
}
Producible Media Types
指定 Accept
@GetMapping(path = "/pets/{petId}", produces = "application/json")
// @GetMapping(path = "/pets/{petId}", produces = "!application/json") 表示非 application/json
@ResponseBody
public Pet getPet(@PathVariable String petId) {
// ...
}
Parameters and Headers
根据查询参数条件缩小映射范围
- 参数是否存在 myParam !myParam
- 参数是否为特定值 myParam=myValue
参数
// 判断 myParan 是否等于 myValue
// 处理 GET /pets/123?myParam=myValue
// 不处理 GET /pets/123?myParam=otherValue
@GetMapping(path = "/pets/{petId}", params = "myParam=myValue")
public void findPet(@PathVariable String petId) {
// ...
}
请求头
@GetMapping(path = "/pets/{petId}", headers = "myHeader=myValue")
public void findPet(@PathVariable String petId) {
// ...
}