Project Reactor - map的工作原理

Project Reactor - map的工作原理

sjmyuan 64 2023-01-18

Project Reactor是基于Reactive Streams设计的,要了解map的工作原理,我们需要先了解Reactive Streams。

Reactive Streams

组成部分

Reactive Streams是一个规范, 它包含了四个部分

  1. Publisher
    发布者,在收到订阅者的请求时才发布数据,且总数据大小可能是未知的。
  2. Subscriber
    订阅者,可以请求多次数据,请求时可以根据自身处理能力决定数据的大小。
  3. Subscription
    订阅,代表着发布者和订阅者之间订阅关系。发布者与订阅者的交互都是通过订阅来实现。
  4. Processor
    数据处理器,既是发布者也是订阅者,可以用来对发布的数据进行转换或对错误进行处理。

工作原理

Publisher, SubscriberSubscription之间的交互流程如下

  1. 主程序创建Publiser实例。
  2. 主程序创建Subscriber实例。
  3. 主程序调用Publisher.subscribe(Subscriber)订阅Publisher
  4. Publisher创建Subscription
  5. Publisher调用Subscriber.onSubscribe(Subscription)来通知Subscriber订阅成功。
  6. Subscriber调用Subscription.request(n)来请求固定大小的数据,该调用可发生多次。
  7. Subscription调用Subscriber.onNext(value)来向Subscriber发布数据,该调用可发生多次。
  8. 在不需要订阅时,Subscriber需要调用Subscription.cancel()来取消订阅。
  9. 在数据发布结束时,Subscription需要调用Subscriber.onComplete()来通知Subscriber数据发布结束。
  10. 在数据发布出错时,Subscription需要调用Subscriber.onError()来通知Subscriber数据发布错误。

map

使用方法

这里我们以Flux.map为例(Mono.map的工作原理类似), 它的使用方法如下

Flux.just(1).map(x -> x + 1).blockFirst(); // 2

map可以对流中的每个元素进行计算并用计算结果替换原来的元素。

这行代码可以被分为三个部分

  1. 创建一个初始Publisher,这里是FluxJust
  2. 创建一个新的Publisher作为FluxJust的装饰类并保存map的转换函数(x -> x+1),这里是FluxMap,它使用了装饰器模式。
  3. 创建一个Subscriber来订阅FluxMap,这里是BlockingFirstSubscriber。在FluxMap被订阅后,它会在FluxJustBlockingFirstSubscriber之间加入一个MapSubscriberMapSubscriber会作为Processor执行map的转换函数。

工作原理

注意:为了突出重点,这里进行了简化,忽略了一些对理解map工作原理不重要的类,例如InnerOperator,InternalFluxOperator。

可以看到MapSubscriber并没有严格的实现Processor,它直接继承了Subscription,而不是PublisherFluxMap.subscribe()在每次被调用时都会生成一个新的MapSubscriber实例,因此MapSubscriber永远都只会有一个Subscriber,那么直接继承Subscription就可以简化生成Subscription的步骤。

FluxMap的作用是增加易用性。如果我们只有MapSubscriber,伪代码就会变成下面这样

Subscriber subscriber = new BlockingFirstSubscriber();
MapSubscriber mapSubscriber = new MapSubscriber(subscriber, x -> x + 1);
Flux<Int> flux = Flux.just(1);
flux.subscribe(mapSubscriber);

如果我们要进行两次map,伪代码会变成

Subscriber subscriber = new BlockingFirstSubscriber();
MapSubscriber mapSubscriber1 = new MapSubscriber(subscriber, x -> x + 1);
MapSubscriber mapSubscriber2 = new MapSubscriber(mapSubscriber1, x -> x + 2);
Flux<Int> flux = Flux.just(1);
flux.subscribe(mapSubscriber2);

我们既要嵌套更多的MapSubscriber,又要修改flux.subscribe()的入参,无法解耦PublisherSubscriber。引入FluxMap后,我们只需要增加更多的FluxMap,不需要修改其他地方。

以上面的代码为例,map的工作流程如下

  1. 主程序创建FluxJust实例,它是一个CorePublisher
  2. 主程序调用FluxJust.map(f)对数据进行转换。
  3. FluxJust创建FluxMap实例,它包含了FluxJustf,并且也是一个CorePublisher
  4. 主程序创建BlockingFirstSubscriber实例。
  5. 主程序调用FluxMap.subscribe(BlockingFirstSubscriber)订阅FluxMap
  6. FluxMap创建MapSubscriber实例,它包含了BlockingFirstSubscriberf
  7. FluxMap调用FluxJust.subscribe(MapSubscriber)订阅FluxJust
  8. FluxJust创建Subscription实例。
  9. FluxJust调用MapSubscriber.onSubscribe(Subscription)来通知MapSubscriber订阅成功。
  10. MapSubscriber调用BlockingFirstSubscriber.onSubscribe(MapSubscriber)来通知BlockingFirstSubscriber订阅成功,这里MapSubscriber既是一个Subscriber,也是一个Subscription
  11. BlockingFirstSubscriber调用MapSubscriber.request(n)来请求固定大小的数据,该调用可发生多次。
  12. MapSubscriber调用Subscription.request(n)来请求固定大小的数据,该调用可发生多次。
  13. Subscription调用MapSubscriber.onNext(value)来向MapSubscriber发布数据,该调用可发生多次。
  14. MapSubscriber调用f(value)来对数据进行转换,这是map的核心逻辑。
  15. MapSubscriber调用BlockingFirstSubscriber.onNext(mappedValue)来向BlockingFirstSubscriber发布转换后的数据,该调用可发生多次。
  16. 在数据发布结束时,Subscription需要调用MapSubscriber.onComplete()来通知MapSubscriber数据发布结束。
  17. MapSubscriber调用BlockingFirstSubscriber.onComplete()来通知BlockingFirstSubscriber数据发布结束。

总结

map是Reactor中最基本的流操作方法,掌握它的工作原理可以帮助我们更好的理解其他操作,下一篇我们会在其基础上讲解filter的工作原理。