flatMap
是Project Reactor中的重要功能,也是很多功能的实现基础。在我们介绍过的Project Reactor - map的工作原理中,map
其实可以用 flatMap
来实现。本文将介绍flatMap
的工作原理。
由于能力有限,我在尽量保证准确性的情况下对多线程部分做了简化,如有错误还请批评指正。
使用方法
这里我们以Flux.flatMap
为例, 它的使用方法如下
Flux.just(1, 2).flatMap(x -> Flux.just(x,x)).blockFirst(); // 1,1,2,2
flatMap
可以用流中的每个元素生成一个新的流并将它们合并起来。
这行代码可以被分为三个部分
- 创建一个初始
Publisher
,这里是FluxArray
。 - 创建一个新的
Publisher
作为FluxArray
的装饰类并保存flatMap
的转换函数(x -> FluxJust(x,x)
),这里是FluxFlatMap
,它使用了装饰器模式。 - 创建一个
Subscriber
来订阅FluxFlatMap
,这里是BlockingFirstSubscriber
。在FluxFlatMap
被订阅后,它会在FluxArray
和BlockingFirstSubscriber
之间加入一个FlatMapMain
。FlatMapMain
会作为Processor
执行flatMap
的转换函数,在接收到一个数据后,它会用转换函数生成一个新的Publisher
,并创建一个FlatMapInner
来订阅该Publisher
。FlatMapInner
会将接收到的数据转发给FlatMapMain
,然后FlatMapMain
再发送给BlockingFirstSubscriber
。
可以看到,如果每个元素生成的流都只有一个元素的话,最终的效果与map
相同。
Flux.just(1, 2).flatMap(x -> Flux.just(x+1)).blockFirst();
// 等价于 Flux.just(1, 2).map(x -> x + 1).blockFirst();
工作原理
注意:这里我们没有考虑Fusabe包中的实现类,它是对交互流程的优化,不影响我们对正常流程的理解。
FluxFlatMap
保存了flatMap
的转换函数,在被订阅后,它会创建FlatMapMain
并将转换函数传给它。
以上面的代码为例,flatMap
的工作流程如下
注意:这里我们假设数据是顺序发布的,没有体现tryEmit的多线程同步逻辑,后面会解释多线程下tryEmit的工作原理。
- 主程序创建
FluxArray
实例,它是一个CorePublisher
。 - 主程序调用
FluxArray.flatMap(f)
对数据进行过滤。 FluxArray
创建FluxFlatMap
实例,它包含了FluxArray
和f
,并且也是一个CorePublisher
。- 主程序创建
BlockingFirstSubscriber
实例。 - 主程序调用
FluxFlatMap.subscribe(BlockingFirstSubscriber)
订阅`FluxFlatMap。 FluxFlatMap
创建FlatMapMain
实例,它包含了BlockingFirstSubscriber
和f
。FluxFlatMap
调用FluxArray.subscribe(FlatMapMain)
订阅FluxArray
。FluxArray
创建Subscription
实例。FluxArray
调用FlatMapMain.onSubscribe(Subscription)
来通知FlatMapMain
订阅成功。FlatMapMain
调用BlockingFirstSubscriber.onSubscribe(FlatMapMain)
来通知BlockingFirstSubscriber
订阅成功,这里FlatMapMain
既是一个Subscriber
,也是一个Subscription
。FlatMapMain
调用Subscription.request(maxConcurrency)
来请求数据,其大小为flatMap
允许的最大并行数。该调用不需要等待BlockingFirstSubscriber
调用FlatMapMain.request(n)
,因为FlatMapMain
需要先将Subscription
发送的数据转换为新的Publisher
,然后才能将新的Publisher
数据发给BlockingFirstSubscriber
,所以转换过程可以提前发生。BlockingFirstSubscriber
调用FlatMapMain.request(n)
来请求固定大小的数据,该调用可发生多次。Subscription
调用FlatMapMain.onNext(value)
来向FlatMapMain
发布数据,该调用可发生多次。FlatMapMain
调用f(value)
来对数据进行转换,生成新的Publisher
,这里我们将其命名为MappedPublisher
。FlatMapMain
生成一个FlatMapInner
,并使用一个数组来管理所有的FlatMapInner
。- 在数据发布结束时,
Subscription
需要调用FlatMapMain.onComplete()
来通知FlatMapMain
数据发布结束。该步骤可以发生在第11步之后的任何时间点,和后续步骤是并行关系。 FlatMapMain
调用MappedPublisher.subscribe(FlatMapInner)
来订阅MappedPublisher
。MappedPublisher
创建Subscription
实例,这里我们将其命名为MappedSubscription
。MappedPublisher
调用FlatMapInner.onSubscribe(MappedSubscription)
来通知FlatMapInner
订阅成功。FlatMapInner
调用MappedSubscription.request(prefetch)
来请求数据,其大小为自定义的预加载数据大小。MappedSubscription
调用FlatMapInner.onNext(value)
来向FlatMapInner
发布数据,该调用可发生多次。FlatMapInner
调用FlatMapMain.tryEmit(value)
来向FlatMapMain
发布数据。FlatMapMain
调用BlockingFirstSubscriber.onNext(value)
来向BlockingFirstSubscriber
发布数据。图中是单线程的情况,如果有多个FlatMapInner
在同时调用FlatMapMain.tryEmit
,FlatMapMain
会将没有获取到锁的数据存储在FlatMapInner
的数据队列中。在下次tryEmit
调用时,FlatMapMain
会将数据队列中的数据全部发布。FlatMapInner
调用MappedSubscription.request(1)
来继续请求数据。- 在数据发布结束时,
MappedSubscription
调用FlatMapInner.onComplete()
来通知FlatMapInner
数据发布结束。 - 将数据发布结束的
FlatMapInner
标记为结束。 FlatMapInner
调用FlatMapMain.innerComplete(FlatMapInner)
来通知FlatMapMain
数据发布结束。- 如果
FlatMapInner
数据队列已经清空且数据发布已结束,将该FlatMapInner
移出数组,即不再接收其数据。 - 如果所有
FlatMapInner
都已经移出数组,调用BlockingFirstSubscriber.onComplete()
来通知BlockingFirstSubscriber
数据发布结束。
下面我们看一下FlatMapMain.tryEmit
在多线程状态下的工作原理。
这里我们假设Subscription
发布了三个元素,每个元素都生成了一个MappedSubscription
且都有一个FlatMapInner
订阅了它,三个FlatMapInner
都在调用FlatMapMain.tryEmit
发布数据。
-
状态1
- 第一个
FlatMapInner
获取了锁,FlatMapMain
从其数据队列中获取数据并发布给Subscriber
。 - 第二个
FlatMapInner
没有获取了锁,数据全部存储到它的数据队列中。 - 第三个
FlatMapInner
没有获取了锁,数据全部存储到它的数据队列中。
- 第一个
-
状态2
- 第一个
FlatMapInner
数据队列中的数据全部发布完成后,FlatMapMain
开始从第二个FlatMapInner读取数据,它也就获取了锁。第一个FlatMapInner
在这之后发布的数据会存储到它的数据队列中。 - 第二个
FlatMapInner
获取了锁,FlatMapMain
从其数据队列中获取数据并发布给Subscriber
。 - 第三个
FlatMapInner
没有获取了锁,数据全部存储到它的数据队列中。
- 第一个
-
状态3
- 第一个
FlatMapInner
没有获取了锁,数据全部存储到它的数据队列中。 - 第二个
FlatMapInner
数据队列中的数据全部发布完成后,FlatMapMain
开始从第三个FlatMapInner读取数据,它也就获取了锁。第二个FlatMapInner
在这之后发布的数据会存储到它的数据队列中。 - 第三个
FlatMapInner
获取了锁,FlatMapMain
从其数据队列中获取数据并发布给Subscriber
。
- 第一个
-
重复
第三个
FlatMapInner
数据队列中的数据全部发布完成后,FlatMapMain
又开始从第一个FlatMapInner读取数据,它也就获取了锁,又回到了状态1。
总结
从tryEmit
的工作原理可以看出,flatMap
并不能保证数据发布的顺序,它基本是按照先收到先发布的规则来实现的。如果想要数据按顺序发布,我们需要用到flatMapSequential
,下一篇我们就介绍一下它的工作原理。