Project Reactor - flatMap的工作原理

Project Reactor - flatMap的工作原理

sjmyuan 64 2023-02-02

flatMap是Project Reactor中的重要功能,也是很多功能的实现基础。在我们介绍过的Project Reactor - map的工作原理中,map其实可以用 flatMap来实现。本文将介绍flatMap的工作原理。

由于能力有限,我在尽量保证准确性的情况下对多线程部分做了简化,如有错误还请批评指正。

使用方法

这里我们以Flux.flatMap为例, 它的使用方法如下

Flux.just(1, 2).flatMap(x -> Flux.just(x,x)).blockFirst(); // 1,1,2,2

flatMap可以用流中的每个元素生成一个新的流并将它们合并起来。

这行代码可以被分为三个部分

  1. 创建一个初始Publisher,这里是FluxArray
  2. 创建一个新的Publisher作为FluxArray的装饰类并保存flatMap的转换函数(x -> FluxJust(x,x)),这里是FluxFlatMap,它使用了装饰器模式。
  3. 创建一个Subscriber来订阅FluxFlatMap,这里是BlockingFirstSubscriber。在FluxFlatMap被订阅后,它会在FluxArrayBlockingFirstSubscriber之间加入一个FlatMapMainFlatMapMain会作为Processor执行flatMap的转换函数,在接收到一个数据后,它会用转换函数生成一个新的Publisher,并创建一个FlatMapInner来订阅该PublisherFlatMapInner会将接收到的数据转发给FlatMapMain,然后FlatMapMain再发送给BlockingFirstSubscriber

可以看到,如果每个元素生成的流都只有一个元素的话,最终的效果与map相同。

Flux.just(1, 2).flatMap(x -> Flux.just(x+1)).blockFirst(); 
// 等价于 Flux.just(1, 2).map(x -> x + 1).blockFirst();

工作原理

注意:这里我们没有考虑Fusabe包中的实现类,它是对交互流程的优化,不影响我们对正常流程的理解。

FluxFlatMap保存了flatMap的转换函数,在被订阅后,它会创建FlatMapMain并将转换函数传给它。

以上面的代码为例,flatMap的工作流程如下

注意:这里我们假设数据是顺序发布的,没有体现tryEmit的多线程同步逻辑,后面会解释多线程下tryEmit的工作原理。

  1. 主程序创建FluxArray实例,它是一个CorePublisher
  2. 主程序调用FluxArray.flatMap(f)对数据进行过滤。
  3. FluxArray创建FluxFlatMap实例,它包含了FluxArrayf,并且也是一个CorePublisher
  4. 主程序创建BlockingFirstSubscriber实例。
  5. 主程序调用FluxFlatMap.subscribe(BlockingFirstSubscriber)订阅`FluxFlatMap。
  6. FluxFlatMap创建FlatMapMain实例,它包含了BlockingFirstSubscriberf
  7. FluxFlatMap调用FluxArray.subscribe(FlatMapMain)订阅FluxArray
  8. FluxArray创建Subscription实例。
  9. FluxArray调用FlatMapMain.onSubscribe(Subscription)来通知FlatMapMain订阅成功。
  10. FlatMapMain调用BlockingFirstSubscriber.onSubscribe(FlatMapMain)来通知BlockingFirstSubscriber订阅成功,这里FlatMapMain既是一个Subscriber,也是一个Subscription
  11. FlatMapMain调用Subscription.request(maxConcurrency)来请求数据,其大小为flatMap允许的最大并行数。该调用不需要等待BlockingFirstSubscriber调用FlatMapMain.request(n),因为FlatMapMain需要先将Subscription发送的数据转换为新的Publisher,然后才能将新的Publisher数据发给BlockingFirstSubscriber,所以转换过程可以提前发生。
  12. BlockingFirstSubscriber调用FlatMapMain.request(n)来请求固定大小的数据,该调用可发生多次。
  13. Subscription调用FlatMapMain.onNext(value)来向FlatMapMain发布数据,该调用可发生多次。
  14. FlatMapMain调用f(value)来对数据进行转换,生成新的Publisher,这里我们将其命名为MappedPublisher
  15. FlatMapMain生成一个FlatMapInner,并使用一个数组来管理所有的FlatMapInner
  16. 在数据发布结束时,Subscription需要调用FlatMapMain.onComplete()来通知FlatMapMain数据发布结束。该步骤可以发生在第11步之后的任何时间点,和后续步骤是并行关系。
  17. FlatMapMain调用MappedPublisher.subscribe(FlatMapInner)来订阅MappedPublisher
  18. MappedPublisher创建Subscription实例,这里我们将其命名为MappedSubscription
  19. MappedPublisher调用FlatMapInner.onSubscribe(MappedSubscription)来通知FlatMapInner订阅成功。
  20. FlatMapInner调用MappedSubscription.request(prefetch)来请求数据,其大小为自定义的预加载数据大小。
  21. MappedSubscription调用FlatMapInner.onNext(value)来向FlatMapInner发布数据,该调用可发生多次。
  22. FlatMapInner调用FlatMapMain.tryEmit(value)来向FlatMapMain发布数据。
  23. FlatMapMain调用BlockingFirstSubscriber.onNext(value)来向BlockingFirstSubscriber发布数据。图中是单线程的情况,如果有多个FlatMapInner在同时调用FlatMapMain.tryEmitFlatMapMain会将没有获取到锁的数据存储在FlatMapInner的数据队列中。在下次tryEmit调用时,FlatMapMain会将数据队列中的数据全部发布。
  24. FlatMapInner调用MappedSubscription.request(1)来继续请求数据。
  25. 在数据发布结束时,MappedSubscription调用FlatMapInner.onComplete()来通知FlatMapInner数据发布结束。
  26. 将数据发布结束的FlatMapInner标记为结束。
  27. FlatMapInner调用FlatMapMain.innerComplete(FlatMapInner)来通知FlatMapMain数据发布结束。
  28. 如果FlatMapInner数据队列已经清空且数据发布已结束,将该FlatMapInner移出数组,即不再接收其数据。
  29. 如果所有FlatMapInner都已经移出数组,调用BlockingFirstSubscriber.onComplete()来通知BlockingFirstSubscriber数据发布结束。

下面我们看一下FlatMapMain.tryEmit在多线程状态下的工作原理。

这里我们假设Subscription发布了三个元素,每个元素都生成了一个MappedSubscription且都有一个FlatMapInner订阅了它,三个FlatMapInner都在调用FlatMapMain.tryEmit发布数据。

  • 状态1

    • 第一个FlatMapInner获取了锁,FlatMapMain从其数据队列中获取数据并发布给Subscriber
    • 第二个FlatMapInner没有获取了锁,数据全部存储到它的数据队列中。
    • 第三个FlatMapInner没有获取了锁,数据全部存储到它的数据队列中。
  • 状态2

    • 第一个FlatMapInner数据队列中的数据全部发布完成后, FlatMapMain开始从第二个FlatMapInner读取数据,它也就获取了锁。第一个FlatMapInner在这之后发布的数据会存储到它的数据队列中。
    • 第二个FlatMapInner获取了锁,FlatMapMain从其数据队列中获取数据并发布给Subscriber
    • 第三个FlatMapInner没有获取了锁,数据全部存储到它的数据队列中。
  • 状态3

    • 第一个FlatMapInner没有获取了锁,数据全部存储到它的数据队列中。
    • 第二个FlatMapInner数据队列中的数据全部发布完成后, FlatMapMain开始从第三个FlatMapInner读取数据,它也就获取了锁。第二个FlatMapInner在这之后发布的数据会存储到它的数据队列中。
    • 第三个FlatMapInner获取了锁,FlatMapMain从其数据队列中获取数据并发布给Subscriber
  • 重复

    第三个FlatMapInner数据队列中的数据全部发布完成后, FlatMapMain又开始从第一个FlatMapInner读取数据,它也就获取了锁,又回到了状态1。

总结

tryEmit的工作原理可以看出,flatMap并不能保证数据发布的顺序,它基本是按照先收到先发布的规则来实现的。如果想要数据按顺序发布,我们需要用到flatMapSequential,下一篇我们就介绍一下它的工作原理。