Spring RSocket:基于服务注册发现的 RSocket 负载均衡

RSocket 分布式通讯协议是 Spring Reactive 的核心内容,从 Spring Framework 5.2 开始,RSocket 已经是 Spring 的内置功能,Spring Boot 2.3 也添加了 spring-boot-starter-rsocket,简化了 RSocket 的服务编写和服务调用。RSocket 通讯的核心架构中包含两种模式,分别是 Broker 代理模式和服务直连通讯模式。

Broker 的通讯模式更灵活,如 Alibaba RSocket Broker,采用的是事件驱动模型架构。而目前更多的架构则是面向服务化设计,也就是我们常说的服务注册发现和服务直连通讯的模式,其中最知名的就是 Spring Cloud 技术栈,涉及到配置推送、服务注册发现、服务网关、断流保护等等。在面向服务化的分布式网络通讯中,如 REST API、gRPC 和 Alibaba Dubbo 等,都与 Spring Cloud 有很好地集成,用户基本不用关心服务注册发现和客户端负载均衡这些底层细节,就可以完成非常稳定的分布式网络通讯架构。

RSocket 作为通讯协议的后起之秀,核心是二进制异步化消息通讯,是否也能和 Spring Cloud 技术栈结合,实现服务注册发现、客户端负载均衡,从而更高效地实现面向服务的架构?这篇文章我们就讨论一下 Spring Cloud 和 RSocket 结合实现服务注册发现和负载均衡。

服务注册发现

服务注册发现的原理非常简单,主要涉及三种角色:服务提供方、服务消费者和服务注册中心。典型的架构如下:



服务提供方,如 RSocket Server,在应用启动后,会向服务注册中心注册应用相关的信息,如应用名称,ip 地址,Web Server 监听端口号等,当然还会包括一些元信息,如服务的分组(group),服务的版本号(version),RSocket 的监听端口号,如果是 WebSocket 通讯,还需要提供 ws 映射路径等,不少开发者会将服务提供方的服务接口列表作为 tags 提交给服务注册中心,方便后续的服务查询和治理。

在本文中,我们采用 Consul 作为服务注册中心,主要是 Consul 比较简单,下载后执行 consul agent -dev 就可以启动对应的服务,当然你可以使用 Docker Compose,配置也非常简单,然后 docker-compose up -d 就可以启动 Consul 服务。

当我们向服务中心注册和查询服务时,都需要有一个应用名称,对应到 Spring Cloud 中,也就是 Spring Boot 对应的 spring.application.name 的值,这里我们称之为应用名称,也就是后续的服务查找都是基于该应用名称进行的。如果你调用 ReactiveDiscoveryClient.getInstances(String serviceId); 查找服务实例列表时,这个 serviceId 参数其实就是 Spring Boot 的应用名称。考虑到服务注册和后续的 RSocket 服务路由的配合以及方便大家理解,这里我们打算设计一个简单的命名规范。

假设你有一个服务应用,功能名称为 calculator,同时提供两个服务: 数学计算器服务(MathCalculatorService)和汇率计算器服务(ExchangeCalculatorService), 那么我们该如何来命名该应用及其对应的服务接口名?

这里我们采用类似 Java package 命名规范,采用域名倒排的方式,如 calculator 应用对应的则为 com-example-calculator 样式,为何是中划线,而不是点?. 在 DNS 解析中作为主机名是非法的,只能作为子域名存在,不能作为主机名,而目前的服务注册中心设计都遵循 DNS 规约,所以我们采用中划线的方式来命名应用。这样采用域名倒排和应用名结合的方式,可以确保应用之间不会重名,另外也方便和 Java Package 名称进行转换,也就是 - 和 . 之间的相互转换。

那么应用包含的服务接口应该如何命名?服务接口全名是由应用名称和 interface 名称组合而成,规则如下:

String serviceFullName = appName.replace("-", ".") + "." + serviceInterfaceName;

例如以下的服务命名都是合乎规范的:

  • com.example.calculator.MathCalculatorService
  • com.example.calculator.ExchangeCalculatorService

而 com.example.calculator.math.MathCalculatorService 则是错误的, 因为在应用名称和接口名称之间多了 math。为何要采用这种命名规范?首先让我们看一下服务消费方是如何调用远程服务的。假设服务消费方拿到一个服务接口,如 com.example.calculator.MathCalculatorService,那么他该如何发起服务调用呢?

  • 首先根据 Service 全面提取处对应的应用名称(appName),如 com.example.calculator.MathCalculatorService 服务对应的 appName 则为 com-example-calculator。如果应用和服务接口之间不存在任何关系,那么想要获取服务接口对应的服务提供方信息,你可能还需要应用名称,这会相对来说比较麻烦。如果接口名称中包含对应的应用信息,则会简单很多,你可以理解为应用是服务全面中的一部分。
  • 调用 ReactiveDiscoveryClient.getInstances(appName) 获取应用名对应的服务实例列表(ServiceInstance),ServiceInstance 对象会包含诸如 IP 地址,Web 端口号、RSocket 监听端口号等其他元信息。
  • 根据 RSocketRequester.Builder.transports(servers) 构建具有负载均衡能力的 RSocketRequester 对象。
  • 使用服务全称和具体功能名称作为路由进行 RSocketRequester 的 API 调用,样例代码如下:

rsocketRequester .route("com.example.calculator.MathCalculatorService.square") .data(number) .retrieveMono(Integer.class)

通过上述的命名规范,我们可以从服务接口全称中提取出应用名,然后和服务注册中心交互查找对应的实例列表,然后建立和服务提供者的连接,最后基于服务名称进行服务调用。该命名规范,基本做到到了最小化的依赖,开发者完全是基于服务接口调用,非常简单。

RSocket 服务编写

有了服务的命名规范和服务注册,编写 RSocket 服务,这个还是非常简单,和编写一个 Spring Bean 没有任何区别。引入 spring-boot-starter-rsocket 依赖,创建一个 Controller 类,添加对应的 MessagMapping annotation 作为基础路由,然后实现功能接口添加功能名称,样例代码如下:

@Controller @MessageMapping("com.example.calculator.MathCalculatorService") public class MathCalculatorController implements MathCalculatorService { @MessageMapping("square") public Mono<Integer> square(Integer input) { System.out.println("received: " + input); return Mono.just(input * input); } }

上述代码看起来好像有点奇怪,既然是服务实现,添加 @Controller 和 @MessageMapping,看起来好像有点不伦不类的。当然这些 annotation 都是一些技术细节体现,你也能看出,RSocket 的服务实现是基于 Spring Message 的,是面向消息化的。这里我们其实只需要添加一个自定义的 @SpringRSocketService annotation 就可以解决这个问题,代码如下:

@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Controller @MessageMapping() public @interface SpringRSocketService { @AliasFor(annotation = MessageMapping.class) String[] value() default {}; }

回到服务对应的实现代码,我们改为使用 @SpringRSocketService annotation,这样我们的代码就和标准的 RPC 服务接口完全一模一样啦,也便于理解。此外 @SpringRSocketService 和 @RSocketHandler 这两个 Annotation,也方便我们后续做一些 Bean 扫描、IDE 插件辅助等。

@SpringRSocketService("com.example.calculator.MathCalculatorService") public class MathCalculatorImpl implements MathCalculatorService { @RSocketHandler("square") public Mono<Integer> square(Integer input) { System.out.println("received: " + input); return Mono.just(input * input); } }

最后我们添加一下 spring-cloud-starter-consul-discovery 依赖,设置一下 bootstrap.properties,然后在 application.properties 设置一下 RSocket 监听的端口和元信息,我们还将该应用提供的服务接口列表作为 tags 传给服务注册中心,当然这个也是方便我们后续的服务管理。样例如下:

spring.application.name=com-example-calculator spring.cloud.consul.discovery.instance-id=com-example-calculator-${random.uuid} spring.cloud.consul.discovery.prefer-ip-address=true server.port=0 spring.rsocket.server.port=6565 spring.cloud.consul.discovery.metadata.rsocketPort=${spring.rsocket.server.port} spring.cloud.consul.discovery.tags=com.example.calculator.ExchangeCalculatorService,com.example.calculator.MathCalculatorService

RSocket 服务应用启动后,我们在 Consul 控制台就可以看到服务注册上来的信息,截屏如下:



RSocket 客户端接入

客户端接入稍微有一点复杂,主要是要基于服务接口全面要做一系列相关的操作,但是前面我们已经有了命名规范,所以问题也不大。客户端应用同样会接入服务注册中心,这样我们就可以获得 ReactiveDiscoveryClient bean,接下来就是根据服务接口全名,如 com.example.calculator.ExchangeCalculatorService 构建出具有负载均衡的 RSocketRequester。

原理也非常简单,前面说过,根据服务接口全称,获得其对应的应用名称,然后调用 ReactiveDiscoveryClient.getInstances(appName) 获得服务应用对应的实例列表,接下来将服务实例(ServiceInstance)列表转换为 RSockt 的 LoadbalanceTarget 列表,其实就是 POJO 转换,最后将转 LoadbalanceTarget 列表进行 Flux 封装(如使用 Sink 接口),传递给 RSocketRequester.Builder 就完成具有负载均衡能力的 RSocketRequester 构建,详细的代码细节大家可以参考项目的代码库。

这里要注意的是接下来如何感知服务端实例列表的变化,如应用上下线,服务暂停等。这里我采用一个定时任务方案,定时查询服务对应的地址列表。当然还有其他的机制,如果是标准的 Spring Cloud 服务发现接口,目前是需要客户端轮询的,当然也可以结合 Spring Cloud Bus 或者消息中间件,实现服务端列表变化的监听。如果客户端感知到服务列表的变化,只需要调用 Reactor 的 Sink 接口发送新的列表即可,RSocket Load Balance 在感知到变化后,会自动做出响应,如关闭即将失效的连接、创建新的连接等工作。

在实际的应用之间的相互通讯,会存在一些服务提供方不可用的情况,如服务方突然宕机或者其网络不可用,这就导致了服务应用列表中部分服务不可用,那么 RSocket 这个时候会如何处理?不用担心,RSocket Load Balance 有重试机制,当一个服务调用出现连接等异常,会重新从列表中获取一个连接进行通讯,而那个错误的连接也会标识为可用性为 0,不会再被后续请求所使用。服务列表推送和通讯期间的容错重试机制,这两者保证了分布式通讯的高可用性。

最后让我们启动 client-app,然后从客户端发起一个远程的 RSocket 调用,截屏如下:



上图中 com-example-calculator 服务应用包括三个实例,服务的调用会在这三个服务实例交替进行(RoundRobin 策略)。

开发体验的一些考量

虽然服务注册和发现、客户端的负载均衡这些都完成啦,调用和容错这些都没有问题,但是还有一些使用体验上的问题,这里我们也阐述一下,让开发体验做的更好。

1. 基于服务接口通讯

大多数 RPC 通讯都是基于接口的,如 Apache Dubbo、gRPC 等。那么 RSocket 能否做到?答案是其实完全可以。在服务端,我们已经是基于服务接口来实现 RSocket 服务啦,接下来我们只需要在客户端实现基于该接口的调用就可以。对于 Java 开发者来说,这不是大问题,我们只需要基于 Java Proxy 机制构建就可以,而 Proxy 对应的 InvocationHandler 会使用 RSocketRequester 来实现 invoke() 的函数调用。详细的细节请参考应用代码中的的 RSocketRemoteServiceBuilder.java 文件,而且在 client-app module 中也已经包含了解基于接口调用的 bean 实现。

2. 服务接口函数的单参数问题

使用 RSocketRequester 调用远程接口时,对应的处理函数只能接受单个参数,这个和 gRPC 的设计是类似的,当然也考虑了不同对象序列化框架的支持问题。但是考虑到实际的使用体验,可能会涉及到多参函数的情况,让调用方开发体验更好,那么这个时候该如何处理?其实从 Java 1.8 后,interface 是允许增加 default 函数的,我们可以添加一些体验更友好的 default 函数,而且还不影响服务通讯接口,样例如下:

public interface ExchangeCalculatorService { double exchange(ExchangeRequest request); default double rmbToDollar(double amount) { return exchange(new ExchangeRequest(amount, "CNY", "USD")); } }

通过 interface 的 default method,我们可以为调用方提供给便捷函数,如在网络传输的是字节数组 (byte[]),但是在 default 函数中,我们可以添加 File 对象支持,方便调用方使用。Interface 中的函数 API 负责服务通讯规约,default 函数来提升使用方的体验,这两者的配合,可以非常容易解决函数多参问题,当然 default 函数在一定程度上还可以作为数据验证的前哨来使用。

3. RSocket Broker 支持

展开阅读全文

本文系作者在时代Java发表,未经许可,不得转载。

如有侵权,请联系nowjava@qq.com删除。

编辑于

关注时代Java

关注时代Java