知方号

知方号

异步编程

异步编程

概述

我们这里主要探讨Spring框架5.0中引入的新的WebFlux技术栈,并介绍其存在的价值与意义、并发模型与适用场景、如何基于WebFlux实现异步编程,以及其内部的实现原理。

Spring WebFlux概述

Spring框架中包含的原始Web框架Spring Web MVC是专为Servlet API和Servlet容器构建的。

反应式栈的Web框架Spring WebFlux则是在Spring 5.0版中才添加的,它是完全无阻塞的,支持Reactive Streams回压,并可以在Netty、Undertow和Servlet 3.1+容器等服务器上运行。其中,WebFlux中的Flux源自Reactor库中的Flux流对象。

如下图左侧所示是spring-webmvc模块提供的基于Servlet的传统Spring MVC技术栈,右侧所示是spring-webflux模块的反应式编程技术栈(Reactive Stack)。

【Web技术栈对比】

Servlet API最初是为了通过Filter→Servlet链进行单次传递而构建的。Servlet 3.0规范中添加的异步请求处理允许应用程序及时退出Filter-Servlet链(及时释放容器线程),但保持响应打开以便异步线程进行后续处理。Spring MVC的异步处理支持是围绕该机制构建的。当controller返回DeferredResult时,将退出Filter-Servlet链,并释放Servlet容器线程。稍后,当设置DeferredResult时,会对请求进行重新分派,使用DeferredResult值(就像controller返回它一样)以恢复处理。

相比之下,Spring WebFlux既不是基于Servlet API构建的,也不需要额外的异步请求处理功能,因为它在设计上是异步的。其对异步的处理是内置于框架规范中的,并通过请求处理的所有阶段进行内在支持。

从编程模型的角度来看,Spring MVC和Spring WebFlux都支持异步和反应式作为controller方法中的返回值。Spring MVC甚至支持流媒体,包括反应性回压功能,但是其对响应的写入仍然是阻塞的(并且在单独的线程上执行),Servlet 3.1确实为非阻塞IO提供了API,但是使用它会远离Servlet API的其余部分,比如其规范是同步的(Filter,Servlet)或阻塞的(getParameter,getPart)。WebFlux则不同,其依赖于非阻塞IO,并且每次写入都不需要额外的线程进行支持。

Reactive编程&Reactor库

Reactive(反应式编程),其是指围绕变化做出反应的编程模型,比如对IO事件做出反应的网络组件、对鼠标事件做出反应的UI控制器等。从这个意义上说,非阻塞是被动的,因为我们现在处于一种模式,即在操作完成或数据可用时对结果做出反应。

Reactive Streams是一个规范(在Java 9中也采用),用于定义具有回压的异步组件之间的交互。例如,数据存储库(充当发布者)可以产生数据(从数据库迭代出数据),然后HTTP服务器(充当订阅服务器)可以把迭代出的数据写入请求响应中,那么数据库中迭代数据的快慢就取决于HTTP服务器向响应对象里面写入的快慢。Reactive Streams的主要目的是让订阅者控制发布者生成数据的速度。

另外Reactive Streams的目的是建立回压的一种机制和一个边界限制,如果发布者不能降低自己生产数据的速度,那么它要决定是否缓存、丢失或者报错失败。

Reactive Streams在互操作性方面发挥着重要作用。它对库和基础架构组件很有用,但作为应用程序API不太有用,因为它太低级了。应用程序需要更高级别和更丰富的功能API来组成异步逻辑——类似于Java 8 Stream API,但其不仅适用于集合。这是Reactive库所扮演的角色,Java中已有的Reactive库有Reactor和RxJava,Spring团队认为Reactor是Spring WebFlux的首选Reactive库。Reactor提供Mono和Flux API流类型,其提供了与ReactiveX词汇表对齐的丰富运算符,处理0…1(Mono)和0…N(Flux)的数据序列。Reactor是一个Reactive Streams库,因此它的所有运营商都支持非阻塞反压功能,它是与Spring合作开发的。

WebFlux要求Reactor作为核心依赖,但它可以通过Reactive Streams与其他反应库(比如RxJava)进行交互操作。作为一般规则,WebFlux API接收普通Publisher作为输入,在内部使其适配Reactor类型,使用它并返回Flux或Mono作为输出。因此,可以将任何Publisher作为输入传递,并且可以对输出应用操作符,但是需要调整输出以与其他类型的反应库(例如RxJava)一起使用。只要可行(例如带注解的controller),WebFlux就会透明地适配RxJava或其他反应库的使用。

WebFlux服务器

Spring WebFlux可以在Tomcat、Jetty、Servlet 3.1+容器以及非Servlet容器(如Netty和Undertow)上运行。所有服务器都适用于低级别的通用API,因此可以跨服务器支持更高级别的编程模型。

Spring WebFlux没有内置用来启动或停止服务器的功能,但是可以通过Spring配置和WebFlux基础架构组装应用程序,写简单的几行代码就可以启动服务器。

Spring Boot有一个WebFlux启动器(starter),可以自动启动。另外默认情况下,starter使用Netty作为服务器(基于reactor-netty支持),可以通过更改Maven或Gradle依赖项轻松切换到Tomcat、Jetty或Undertow服务器。Spring Boot之所以默认用Netty作为服务器,是因为Netty在异步、非阻塞领域中使用得比较广泛,并允许客户端和服务器共享资源(比如共享NioEventLoopGroup)。

Tomcat、Jetty容器可以与Spring MVC、WebFlux一起使用。但请记住,它们的使用方式不同。Spring MVC依赖于Servlet阻塞IO,并允许应用程序在需要时直接使用Servlet API。Spring WebFlux依赖于Servlet 3.1非阻塞IO,并在低级适配器后面使用Servlet API,而不是直接使用。

Undertow作为服务器时,Spring WebFlux直接使用Undertow API而不使用Servlet API。

那么WebFlux是如何做到平滑地切换不同服务器的呢?在WebFlux中HttpHandler有一个简单的规范,只有一个方法来处理请求和响应:

代码语言:javascript复制public interface HttpHandler { /** * Handle the given request and write to the response. * @param request current request * @param response current response * @return indicates completion of request handling */ Mono handle(ServerHttpRequest request, ServerHttpResponse response);}

该方法是故意被设计为最小化的,它的主要目的是成为不同HTTP服务器API的最小抽象,而且WebFlux底层基础设施是基于其进行编程的,所以不同类型的服务器只需要添加一个适配器来适配HttpHandler即可,主要服务器与其对应的适配器如表所示

比如,基于Reactor Netty实现服务器时,可以使用下面代码适配HttpHandler并启动服务器:

代码语言:javascript复制HttpHandler handler = ...ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);HttpServer.create(host, port).newHandler(adapter).block();

Netty服务器启动后会监听客户端的请求,当boss IO线程接收到完成TCP三次握手的请求后,会把连接套接字通道传递给worker IO线程进行具体处理,后者则会调用适配器ReactorHttpHandlerAdapter的apply方法进行处理,然后适配器就会把请求再转发给基础层的HttpHandler的实现类HttpWebHandlerAdapter的handle方法进行处理,其内部则会调用请求分配器DispatcherHandler的handle方法把请求分配到具体的controller进行执行。

比如,基于Tomcat实现服务器时,可以使用下面的代码适配HttpHandler并启动服务器:

代码语言:javascript复制HttpHandler handler = ...Servlet servlet = new TomcatHttpHandlerAdapter(handler);Tomcat server = new Tomcat();File base = new File(System.getProperty("java.io.tmpdir"));Context rootContext = server.addContext("", base.getAbsolutePath());Tomcat.addServlet(rootContext, "main", servlet);rootContext.addServletMappingDecoded("/", "main");server.setHost(host);server.setPort(port);server.start();

Tomcat服务器启动后会监听客户端的请求,当请求监听线程接收到完成TCP三次握手的请求后,会把请求交给Tomcat容器内的HTTP处理器(比如Http11Processor)进行处理,后者则会使请求经过一层层容器后再经过Filter链调用到Tomcat的TomcatHttpHandlerAdapter适配器的service方法,然后适配器就会把请求转发给基础层的HttpHandler的实现类HttpWebHandlerAdapter的handle方法进行处理,其内部则会调用请求分配器DispatcherHandler的handle方法把请求分配到具体的controller进行执行。

在WebFlux提供的HttpHandler层以下是通用的基础设施,上层具体服务器只需要创建自己的适配器,即可方便地使用WebFlux底层功能。

WebFlux的并发模型

Spring MVC和Spring WebFlux都支持带注解的controllers,但并发模型和对线程是否阻塞的假设存在关键差异。

在Spring MVC(及一般的Servlet应用程序)中,假设应用程序可以阻塞当前线程(例如远程过程调用),则Servlet容器一般使用大型线程池来化解请求期间的潜在阻塞问题。

在Spring WebFlux(以及一般的非阻塞服务器,例如Netty)中,假设应用程序不会阻塞,因此非阻塞服务器使用小的固定大小的线程池(事件循环IO工作线程)来处理请求。

如果确实需要使用阻塞库,该怎么办?Reactor和RxJava分别提供了publishOn和observeOn运算符将流上的后续操作切换到其他的线程上进行处理。这意味着在阻塞API方案中,有一个简单的适配方案。但请记住,阻塞API不适合这种并发模型。

在Reactor和RxJava中,可以使用操作符

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至lizi9903@foxmail.com举报,一经查实,本站将立刻删除。