在微服务架构中,不同服务之间通过应用协议进行数据传输。典型的传输方式包括基于 HTTP 协议的 REST 或 SOAP API 和基于 TCP 字节流的 gRPC 等。HTTP 协议的优势在于其广泛的适用性,有非常多的服务器和客户端实现的支持,但 HTTP 协议本身比较简单,只支持请求-响应模式。gRPC 等基于 TCP 的协议使用二进制字节流传输,保证了传输的效率。不过 gRPC 基于 HTTP/2,不支持其他传输层实现。HTTP/2 协议使用了二进制字节流,但是并没有改变 HTTP/1 协议已有的语义,更多的只是改变了传输时的格式。已有应用协议的问题在于单一的交互模式,只支持请求-响应模式,而此模式对于很多应用场景来说是不合适的。典型的例子是消息推送,以 HTTP 协议为例,如果客户端需要获取最新的推送消息,就必须使用轮询。客户端不停的发送请求到服务器来检查更新,这无疑造成了大量的资源浪费。虽然服务器发送事件(Server-Sent Events,SSE)可以用来推送消息,不过 SSE 是一个简单的文本协议,仅提供有限的功能。此外,WebSocket 可以进行双向数据传输,不过它没有提供应用层协议支持。请求-响应模式的另外一个问题是,如果某个请求的响应时间过长,会阻塞之后的其他请求的处理。RSocket 协议的出现,很好的解决了已有协议的这些问题。
RSocket 是一个 OSL 七层模型中 5/6 层的协议,是 TCP/IP 之上的应用层协议。RSocket 可以使用不同的底层传输层,包括 TCP、WebSocket 和 Aeron。TCP 适用于分布式系统的各个组件之间交互,WebSocket 适用于浏览器和服务器之间的交互,Aeron 是基于 UDP 协议的传输方式,这就保证了 RSocket 可以适应于不同的场景。使用 RSocket 的应用层实现可以保持不变,只需要根据系统环境、设备能力和性能要求来选择合适的底层传输方式即可。RSocket 作为一个应用层协议,可以很容易在其基础上定义应用自己的协议。此外,RSocket 使用二进制格式,保证了传输的高效,节省带宽。而且,通过基于反应式流语义的流控制,RSocket 保证了消息传输中的双方不会因为请求的压力过大而崩溃。
RSocket 支持四种不同的交互模式,见表 1。
模式 | 说明 |
---|---|
请求-响应(request/response) | 这是最典型也最常见的模式。发送方在发送消息给接收方之后,等待与之对应的响应消息。 |
请求-响应流(request/stream) | 发送方的每个请求消息,都对应于接收方的一个消息流作为响应。 |
发后不管(fire-and-forget) | 发送方的请求消息没有与之对应的响应。 |
通道模式(channel) | 在发送方和接收方之间建立一个双向传输的通道。 |
下面介绍 RSocket 协议的具体内容。
RSocket 协议在传输时使用帧(frame)来表示单个消息。每个帧中包含的可能是请求内容、响应内容或与协议相关的数据。一个应用消息可能被切分成多个片段(fragment)以包含在一个帧中。根据底层传输协议的不同,一个表示帧长度的字段可能是必须的。由于 TCP 协议没有提供帧支持,所以 RSocket 的帧长度字段是必须的。对于提供了帧支持的传输协议,RSocket 帧只是简单的封装在传输层消息中;对于没有提供帧支持的传输协议,每个 RSocket 帧之前都需要添加一个 24 字节的字段表示帧长度。
在每个 RSocket 帧中,最起始的部分是帧头部,包括 31 字节的流标识符,6 字节的帧类型和 10 字节的标志位。RSocket 协议中的流(stream)表示的是一个操作的单元。每个流有自己唯一的标识符。流标识符由发送方生成。值为 0 的流标识符表示与连接相关的操作。客户端的流标识符从 1 开始,每次递增 2;服务器端的流标识符从 2 开始,每次递增 2。在帧头部之后的内容与帧类型相关。
RSocket 中定义了不同类型的帧,见表 2。
帧类型 | 说明 |
---|---|
SETUP | 由客户端发送来建立连接,流标识符为 0。 |
REQUEST_RESPONSE | 请求-响应模式中的请求内容。 |
REQUEST_FNF | 发后不管模式中的请求内容。 |
REQUEST_STREAM | 请求-响应流模式中的请求内容。 |
REQUEST_CHANNEL | 通道模式中的建立通道的请求。发送方只能发一个 REQUEST_CHANNEL 帧。 |
REQUEST_N | 基于反应式流语义的流控制,相当于反应式流中的 request(n)。 |
PAYLOAD | 表示负载的帧。通过不同的标志位来表示状态。NEXT 标志位表示接收到流中的数据,COMPLETE 标志位表示流结束。 |
ERROR | 表示连接层或应用层错误。 |
CANCEL | 取消当前请求。 |
KEEPALIVE | 表示连接层或应用层错误。 |
KEEPALIVE | 启用租约模式。 |
METADATA_PUSH | 异步元数据推送。 |
RESUME | 在连接中断后恢复传输。 |
RESUME_OK | 恢复传输成功。 |
EXT | 帧类型扩展。 |
RSocket 中的负载分成元数据和数据两种,二者可以使用不同的编码方式。元数据是可选的。帧头部有标志位指示帧中是否包含元数据。某些特定类型的帧可以添加元数据。
根据不同的交互模式,发送方和接收方之间有不同的帧交互,下面是几个典型的帧交互示例:
在请求-响应模式中,发送方发送 REQUEST_RESPONSE 帧,接收方发送 PAYLOAD 帧并设置 COMPLETE 标志位。
在发后不管模式中,发送方发送 REQUEST_FNF 帧。
在请求-响应流模式中,发送方发送 REQUEST_STREAM 帧,接收方发送多个 PAYLOAD 帧。设置 COMPLETE 标志位的 PAYLOAD 帧表示流结束。
在通道模式中,发送方发送 REQUEST_CHANNEL 帧。发送方和接收方都可以发送 PAYLOAD 帧给对方。设置 COMPLETE 标志位的 PAYLOAD 帧表示其中一方的流结束。
除了发后不管模式之外,其余模式中的接收方都可以通过 ERROR 帧或 CANCEL 帧来结束流。
RSocket 使用 Reactive Streams 语义来进行流控制(flow control),也就是 request(n)模式。流的发送方通过 REQUEST_N 帧来声明它允许接收方发送的 PAYLOAD 帧的数量。REQUEST_N 帧一旦发出就不能收回,而且所产生的效果是累加的。比如,发送方发送 request(2)和 request(3)帧之后,接收方允许发送 5 个 PAYLOAD 帧。
除了基于 Reactive Streams 语义的流程控制之外,RSocket 还可以使用租约模式。租约模式只是限定了在某个时间段内,请求者所能发送的最大请求数量。
RSocket 提供了不同语言的实现,包括 Java、Kotlin、JavaScript、Go、.NET 和 C++ 等。对 Java 项目来说,只需要添加相应的 Maven 依赖即可。RSocket 的 Java 实现库都在 Maven 分组 io.rsocket 中。其中常用的库包括核心功能库 rsocket-core 和表 3中列出的传输层实现。本文中使用的版本是 1.0.0-RC3。
Maven 实现库名称 | 底层实现 | 支持协议 |
---|---|---|
rsocket-transport-netty | Reactor Netty | TCP 和 WebSocket |
rsocket-transport-akka | Akka | TCP 和 WebSocket |
rsocket-transport-aeron | Aeron | UDP |
在代码清单 1 中,Maven 项目中添加了 RSocket 相关的依赖和基于 Reactor Netty 的传输层实现。
<
dependency
>
<
groupId
>io.rsocket</
groupId
>
<
artifactId
>rsocket-core</
artifactId
>
<
version
>1.0.0-RC3</
version
>
</
dependency
>
<
dependency
>
<
groupId
>io.rsocket</
groupId
>
<
artifactId
>rsocket-transport-netty</
artifactId
>
<
version
>1.0.0-RC3</
version
>
</
dependency
>
下面介绍 RSocket 中不同模式的使用方式。
首先从最常用的请求-响应模式开始介绍 RSocket 的用法。代码清单 2 给出了一个请求-响应模式的 RSocket 服务器和客户端的示例。RSocketFactory 类用来创建 RSocket 服务器和客户端。
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Mono;
public class RequestResponseExample {
public static void main(String[] args) {
RSocketFactory.receive()
.acceptor(((setup, sendingSocket) -> Mono.just(
new AbstractRSocket() {
@Override
public Mono<
Payload
> requestResponse(Payload payload) {
return Mono.just(DefaultPayload.create("ECHO >> " + payload.getDataUtf8()));
}
}
)))
.transport(TcpServerTransport.create("localhost", 7000)) //指定传输层实现
.start() //启动服务器
.subscribe();
RSocket socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", 7000)) //指定传输层实现
.start() //启动客户端
.block();
socket.requestResponse(DefaultPayload.create("hello"))
.map(Payload::getDataUtf8)
.doOnNext(System.out::println)
.block();
socket.dispose();
}
}
RSocketFactory.receive()
方法返回用来创建服务器的 ServerRSocketFactory
类的对象。ServerRSocketFactory
的 acceptor()
方法的参数是 SocketAcceptor
接口,该接口只有一个方法 Mono<RSocket>
accept(ConnectionSetupPayload setup, RSocket sendingSocket)
。该方法的参数 setup
表示的是 SETUP
帧的负载内容,而 sendingSocket
表示的是发送请求的 RSocket
对象。该方法返回的 Mono<RSocket>
对象包含的是处理请求的 RSocket
对象。代码中使用 Lambda
表达式实现了 SocketAcceptor
接口。在代码中,SocketAcceptor
的 accept()
方法返回的是抽象类 AbstractRSocket
的匿名实现,只实现了其中的 requestResponse()
方法。具体的请求处理逻辑是在请求数据内容上添加 "ECHO >> "
前缀。接下来的 transport()
方法指定 ServerTransport
接口的实现作为 RSocket
底层的传输层实现(这里使用 TcpServerTransport
类的 create()
方法创建在 localhost
上 7000
端口的 TCP
服务器端)。再通过 start()
方法可以得到表示服务器实例的 Mono
对象。最后的 subscribe()
方法用来触发整个启动过程。
RSocketFactory.connect()
方法用来创建 RSocket
客户端,返回 ClientRSocketFactory
类的对象。接下来的 transport()
方法指定传输层 ClientTransport
实现 (这里通过 TcpClientTransport.create()
方法创建到服务器的 TCP
连接)。再通过其 start()
方法可以得到 Mono<RSocket>
对象。最后调用 block()
方法等待客户端启动并返回 RSocket
对象。
RSocket
对象用来与服务器端进行交互。RSocket
类的 requestResponse()
方法发送 Payload
接口表示的负载并等待响应。该方法的返回值是表示响应的 Mono<Payload>
对象。对于返回的响应,示例中只是简单地输出到控制台。DefaultPayload.create()
方法可以简单地创建 Payload
对象。RSocket
类的 dispose()
方法用来销毁该对象。
请求-响应流模式的用法与请求-响应模式很相似。代码清单 3 给出了请求-响应流模式的示例。服务器端的 AbstractRSocket
类的实现覆写了 requestStream()
方法。对于每个请求的 Payload 对象,都需要返回一个表示响应流的 Flux<Payload>
对象。这里的实现逻辑是把请求数据的字符串变成包含单个字符的流。客户端的 RSocket 对象使用 requestStream()
来发送请求,得到的是 Flux<Payload>
对象。
public class RequestStreamExample {
public static void main(String[] args) {
RSocketFactory.receive()
.acceptor(((setup, sendingSocket) -> Mono.just(
new AbstractRSocket() {
@Override
public Flux<
Payload
> requestStream(Payload payload) {
return Flux.fromStream(payload.getDataUtf8().codePoints().mapToObj(c -> String.valueOf((char) c))
.map(DefaultPayload::create));
}
}
)))
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.subscribe();
RSocket socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block();
socket.requestStream(DefaultPayload.create("hello"))
.map(Payload::getDataUtf8)
.doOnNext(System.out::println)
.blockLast();
socket.dispose();
}
}
发后不管模式的用法和之前的两种模式也是相似的。在代码清单 4 中,AbstractRSocket
类的实现覆写了 fireAndForget()
方法,对于请求的 Payload 对象,只需要返回 Mono<Void>
对象即可。客户端 RSocket 对象使用 fireAndForget()
方法发送请求。在发后不管模式中,由于发送方不需要等待接收方的响应,因此当程序结束时,服务器端并不一定接收到了请求。
public class FireAndForgetExample {
public static void main(String[] args) {
RSocketFactory.receive()
.acceptor(((setup, sendingSocket) -> Mono.just(
new AbstractRSocket() {
@Override
public Mono<
Void
> fireAndForget(Payload payload) {
System.out.println("Receive: " + payload.getDataUtf8());
return Mono.empty();
}
}
)))
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.subscribe();
RSocket socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block();
socket.fireAndForget(DefaultPayload.create("hello")).block();
socket.fireAndForget(DefaultPayload.create("world")).block();
socket.dispose();
}
}
通道模式同样以相似的方式实现。在代码清单 5 中,服务器端的 AbstractRSocket
类的实现覆写了 requestChannel()
方法,对于请求的 Publisher<Payload>
对象,返回 Flux<Payload>
对象。客户端 RSocket
对象使用 requestChannel()
方法发送请求并处理响应。
public class RequestChannelExample {
public static void main(String[] args) {
RSocketFactory.receive()
.acceptor(((setup, sendingSocket) -> Mono.just(
new AbstractRSocket() {
@Override
public Flux<
Payload
> requestChannel(Publisher<
Payload
> payloads) {
return Flux.from(payloads).flatMap(payload ->
Flux.fromStream(
payload.getDataUtf8().codePoints().mapToObj(c -> String.valueOf((char) c))
.map(DefaultPayload::create)));
}
}
)))
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.subscribe();
RSocket socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block();
socket.requestChannel(Flux.just("hello", "world", "goodbye").map(DefaultPayload::create))
.map(Payload::getDataUtf8)
.doOnNext(System.out::println)
.blockLast();
socket.dispose();
}
}
Spring 框架提供了对 RSocket 的集成,作为 spring-messaging 模块支持的一种消息传输方式。对于 Spring Boot 应用来说,只需要添加对 spring-boot-starter-rsocket
的依赖即可。本文使用的 Spring Boot 是 2.2.0.M6 版本,对应于 Spring 框架的 5.2.0.RC2
版本。代码清单 6 给出了 Spring Boot 应用中使用 RSocket 需要添加的 Maven
依赖。
<
dependency
>
<
groupId
>org.springframework.boot</
groupId
>
<
artifactId
>spring-boot-starter-rsocket</
artifactId
>
<
version
>2.2.0.M6</
version
>
</
dependency
>
<
dependency
>
<
groupId
>io.projectreactor</
groupId
>
<
artifactId
>reactor-test</
artifactId
>
<
version
>3.3.0.RC1</
version
>
<
scope
>test</
scope
>
</
dependency
>
在 Spring 应用的 application.properties
文件中,添加 spring.rsocket.server.port=7100
来设置内置
RSocket 服务器的端口。代码清单 7 中的 EchoController
是代码清单 2 中的请求-响应模式的实现。方法 echo()
接收 String 类型的请求,并返回
Mono<String>作为响应。@MessageMapping("echo")
注解指定了所处理消息的目的地。与 Spring 集成之后,RSocket
的使用变得很简洁。
@Controller
public class EchoController {
@MessageMapping("echo")
public Mono<
String
> echo(String input) {
return Mono.just("ECHO >> " + input);
}
}
下面添加测试用例来进行测试。代码清单 8 中的 AbstractTest
类是一个抽象的 RSocket
测试类。其中包含的 createRSocketRequester()
方法用来创建发送 RSocket 请求的 RSocketRequester
对象。RSocketRequester.Builder
类用来创建 RSocketRequester
对象,其中 dataMimeType()
方法指定负载中数据的 MIME
类型,connect()
方法指定连接的传输层实现。
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.util.MimeTypeUtils;
abstract class AbstractTest {
@Value("${spring.rsocket.server.port}")
private int serverPort;
@Autowired
private RSocketRequester.Builder builder;
RSocketRequester createRSocketRequester() {
return builder.dataMimeType(MimeTypeUtils.TEXT_PLAIN)
.connect(TcpClientTransport.create(serverPort)).block();
}
}
代码清单 9 中的 EchoServerTest
类测试代码清单 7 中的 EchoController
。在测试中,首先使用 createRSocketRequester()
方法来创建 RSocketRequester
对象,再使用 route()
方法来指定消息的目的地,最后使用 data()
方法指定负载中的数据。retrieveMono()
方法用来发送请求。通过
StepVerifier 类来验证返回的 Mono<String>
对象。
@SpringBootTest
class EchoServerTest extends AbstractTest {
@Test
@DisplayName("Test echo server")
void testEcho() {
RSocketRequester requester = createRSocketRequester();
Mono<
String
> response = requester.route("echo")
.data("hello")
.retrieveMono(String.class);
StepVerifier.create(response)
.expectNext("ECHO >> hello")
.expectComplete()
.verify();
}
}
除了请求-响应模式之外,其他模式也可以在 Spring 中使用。代码清单 10 中的 StringSplitController
类使用了请求-响应流模式。
@Controller
public class StringSplitController {
@MessageMapping("stringSplit")
public Flux<
String
> stringSplit(String input) {
return Flux.fromStream(input.codePoints().mapToObj(c -> String.valueOf((char) c)));
}
}
代码清单 11 是代码清单 10 对应的测试用例。
@SpringBootTest
public class StringSplitTest extends AbstractTest {
@Test
@DisplayName("Test string split")
void testStringSplit() {
RSocketRequester requester = createRSocketRequester();
Flux<
String
> response = requester.route("stringSplit")
.data("hello")
.retrieveFlux(String.class);
StepVerifier.create(response)
.expectNext("h", "e", "l", "l", "o")
.expectComplete()
.verify();
}
}
如果需要使用 WebSocket 作为传输层实现,只需要替换 RSocket 服务器和客户端使用的传输层 Java 类即可,其他代码并不需要改动。在代码清单
12 中,WebsocketServerTransport
类是 WebSocket 服务器端实现,而 WebsocketClientTransport
类是客户端的实现。除了 transport()
方法使用的参数不同之外,其他的代码都与代码清单 2 相同。
public class WebSocketRequestResponseExample {
public static void main(String[] args) {
RSocketFactory.receive()
.acceptor(((setup, sendingSocket) -> Mono.just(
new AbstractRSocket() {
@Override
public Mono<
Payload
> requestResponse(Payload payload) {
return Mono.just(DefaultPayload.create("ECHO >> " + payload.getDataUtf8()));
}
}
)))
.transport(WebsocketServerTransport.create("localhost", 7000))
.start()
.subscribe();
RSocket socket = RSocketFactory.connect()
.transport(WebsocketClientTransport.create("localhost", 7000))
.start()
.block();
socket.requestResponse(DefaultPayload.create("hello"))
.map(Payload::getDataUtf8)
.doOnNext(System.out::println)
.block();
socket.dispose();
}
}
对于使用 Spring WebFlux 的应用,可以配置使用 RSocket 作为 WebSocket 服务器端实现。在代码清单 13 中的 Spring
配置中,在路径 "/ws"
上启用了基于 RSocket 的 WebSocket 支持。
@Configuration
@EnableWebFlux
public class Config {
@Bean
RSocketWebSocketNettyRouteProvider rSocketWebsocketRouteProvider(
RSocketMessageHandler messageHandler) {
return new RSocketWebSocketNettyRouteProvider("/ws",
messageHandler.responder());
}
static class RSocketWebSocketNettyRouteProvider implements NettyRouteProvider {
private final String mappingPath;
private final SocketAcceptor socketAcceptor;
RSocketWebSocketNettyRouteProvider(String mappingPath, SocketAcceptor socketAcceptor) {
this.mappingPath = mappingPath;
this.socketAcceptor = socketAcceptor;
}
@Override
public HttpServerRoutes apply(HttpServerRoutes httpServerRoutes) {
ServerTransport.ConnectionAcceptor acceptor = RSocketFactory.receive()
.acceptor(this.socketAcceptor)
.toConnectionAcceptor();
return httpServerRoutes.ws(this.mappingPath, WebsocketRouteTransport.newHandler(acceptor));
}
}
}
由于 RSocket 有自己的二进制协议,在浏览器端的实现需要使用 RSocket 提供的 JavaScript 客户端与服务器端交互。在 Web 应用中使用 RSocket 提供的
NodeJS 模块 rsocket-websocket-client
即可。本文附带的示例代码中包含一个 React 应用,连接到使用 RSocket
的 WebSocket 服务器并发送消息。代码清单 14中的 MessageService
类负责与服务器交互。首先创建一个 JavaScript 实现的 RSocket 客户端中的 RSocketClient
对象,并使用 RSocketWebSocketClient
作为传输层实现。在连接成功之后,使用 RSocketClient
的 requestStream()
方法发送消息并处理响应。这里的处理逻辑是调用提供的消息回调方法 messageCallback
。
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。