flatMap是Project Reactor中的重要功能,也是很多功能的实现基础。在我们介绍过的Project Reactor - map的工作原理中,map其实可以用 flatMap来实现。本文将介绍flatMap的工作原理。
由于能力有限,我在尽量保证准确性的情况下对多线程部分做了简化,如有错误还请批评指正。
使用方法
这里我们以Flux.flatMap为例, 它的使用方法如下
Flux.just(1, 2).flatMap(x -> Flux.just(x,x)).blockFirst(); // 1,1,2,2
flatMap可以用流中的每个元素生成一个新的流并将它们合并起来。
这行代码可以被分为三个部分

- 创建一个初始
Publisher,这里是FluxArray。 - 创建一个新的
Publisher作为FluxArray的装饰类并保存flatMap的转换函数(x -> FluxJust(x,x)),这里是FluxFlatMap,它使用了装饰器模式。 - 创建一个
Subscriber来订阅FluxFlatMap,这里是BlockingFirstSubscriber。在FluxFlatMap被订阅后,它会在FluxArray和BlockingFirstSubscriber之间加入一个FlatMapMain。FlatMapMain会作为Processor执行flatMap的转换函数,在接收到一个数据后,它会用转换函数生成一个新的Publisher,并创建一个FlatMapInner来订阅该Publisher。FlatMapInner会将接收到的数据转发给FlatMapMain,然后FlatMapMain再发送给BlockingFirstSubscriber。
可以看到,如果每个元素生成的流都只有一个元素的话,最终的效果与map相同。
Flux.just(1, 2).flatMap(x -> Flux.just(x+1)).blockFirst();
// 等价于 Flux.just(1, 2).map(x -> x + 1).blockFirst();
工作原理

注意:这里我们没有考虑Fusabe包中的实现类,它是对交互流程的优化,不影响我们对正常流程的理解。
FluxFlatMap保存了flatMap的转换函数,在被订阅后,它会创建FlatMapMain并将转换函数传给它。
以上面的代码为例,flatMap的工作流程如下

注意:这里我们假设数据是顺序发布的,没有体现tryEmit的多线程同步逻辑,后面会解释多线程下tryEmit的工作原理。
- 主程序创建
FluxArray实例,它是一个CorePublisher。 - 主程序调用
FluxArray.flatMap(f)对数据进行过滤。 FluxArray创建FluxFlatMap实例,它包含了FluxArray和f,并且也是一个CorePublisher。- 主程序创建
BlockingFirstSubscriber实例。 - 主程序调用
FluxFlatMap.subscribe(BlockingFirstSubscriber)订阅`FluxFlatMap。 FluxFlatMap创建FlatMapMain实例,它包含了BlockingFirstSubscriber和f。FluxFlatMap调用FluxArray.subscribe(FlatMapMain)订阅FluxArray。FluxArray创建Subscription实例。FluxArray调用FlatMapMain.onSubscribe(Subscription)来通知FlatMapMain订阅成功。FlatMapMain调用BlockingFirstSubscriber.onSubscribe(FlatMapMain)来通知BlockingFirstSubscriber订阅成功,这里FlatMapMain既是一个Subscriber,也是一个Subscription。FlatMapMain调用Subscription.request(maxConcurrency)来请求数据,其大小为flatMap允许的最大并行数。该调用不需要等待BlockingFirstSubscriber调用FlatMapMain.request(n),因为FlatMapMain需要先将Subscription发送的数据转换为新的Publisher,然后才能将新的Publisher数据发给BlockingFirstSubscriber,所以转换过程可以提前发生。BlockingFirstSubscriber调用FlatMapMain.request(n)来请求固定大小的数据,该调用可发生多次。Subscription调用FlatMapMain.onNext(value)来向FlatMapMain发布数据,该调用可发生多次。FlatMapMain调用f(value)来对数据进行转换,生成新的Publisher,这里我们将其命名为MappedPublisher。FlatMapMain生成一个FlatMapInner,并使用一个数组来管理所有的FlatMapInner。- 在数据发布结束时,
Subscription需要调用FlatMapMain.onComplete()来通知FlatMapMain数据发布结束。该步骤可以发生在第11步之后的任何时间点,和后续步骤是并行关系。 FlatMapMain调用MappedPublisher.subscribe(FlatMapInner)来订阅MappedPublisher。MappedPublisher创建Subscription实例,这里我们将其命名为MappedSubscription。MappedPublisher调用FlatMapInner.onSubscribe(MappedSubscription)来通知FlatMapInner订阅成功。FlatMapInner调用MappedSubscription.request(prefetch)来请求数据,其大小为自定义的预加载数据大小。MappedSubscription调用FlatMapInner.onNext(value)来向FlatMapInner发布数据,该调用可发生多次。FlatMapInner调用FlatMapMain.tryEmit(value)来向FlatMapMain发布数据。FlatMapMain调用BlockingFirstSubscriber.onNext(value)来向BlockingFirstSubscriber发布数据。图中是单线程的情况,如果有多个FlatMapInner在同时调用FlatMapMain.tryEmit,FlatMapMain会将没有获取到锁的数据存储在FlatMapInner的数据队列中。在下次tryEmit调用时,FlatMapMain会将数据队列中的数据全部发布。FlatMapInner调用MappedSubscription.request(1)来继续请求数据。- 在数据发布结束时,
MappedSubscription调用FlatMapInner.onComplete()来通知FlatMapInner数据发布结束。 - 将数据发布结束的
FlatMapInner标记为结束。 FlatMapInner调用FlatMapMain.innerComplete(FlatMapInner)来通知FlatMapMain数据发布结束。- 如果
FlatMapInner数据队列已经清空且数据发布已结束,将该FlatMapInner移出数组,即不再接收其数据。 - 如果所有
FlatMapInner都已经移出数组,调用BlockingFirstSubscriber.onComplete()来通知BlockingFirstSubscriber数据发布结束。
下面我们看一下FlatMapMain.tryEmit在多线程状态下的工作原理。

这里我们假设Subscription发布了三个元素,每个元素都生成了一个MappedSubscription且都有一个FlatMapInner订阅了它,三个FlatMapInner都在调用FlatMapMain.tryEmit发布数据。
-
状态1
- 第一个
FlatMapInner获取了锁,FlatMapMain从其数据队列中获取数据并发布给Subscriber。 - 第二个
FlatMapInner没有获取了锁,数据全部存储到它的数据队列中。 - 第三个
FlatMapInner没有获取了锁,数据全部存储到它的数据队列中。
- 第一个
-
状态2
- 第一个
FlatMapInner数据队列中的数据全部发布完成后,FlatMapMain开始从第二个FlatMapInner读取数据,它也就获取了锁。第一个FlatMapInner在这之后发布的数据会存储到它的数据队列中。 - 第二个
FlatMapInner获取了锁,FlatMapMain从其数据队列中获取数据并发布给Subscriber。 - 第三个
FlatMapInner没有获取了锁,数据全部存储到它的数据队列中。
- 第一个
-
状态3
- 第一个
FlatMapInner没有获取了锁,数据全部存储到它的数据队列中。 - 第二个
FlatMapInner数据队列中的数据全部发布完成后,FlatMapMain开始从第三个FlatMapInner读取数据,它也就获取了锁。第二个FlatMapInner在这之后发布的数据会存储到它的数据队列中。 - 第三个
FlatMapInner获取了锁,FlatMapMain从其数据队列中获取数据并发布给Subscriber。
- 第一个
-
重复
第三个
FlatMapInner数据队列中的数据全部发布完成后,FlatMapMain又开始从第一个FlatMapInner读取数据,它也就获取了锁,又回到了状态1。
总结
从tryEmit的工作原理可以看出,flatMap并不能保证数据发布的顺序,它基本是按照先收到先发布的规则来实现的。如果想要数据按顺序发布,我们需要用到flatMapSequential,下一篇我们就介绍一下它的工作原理。