merge
和concat
都可以把多个流合并成一个流,不同的是merge
会同时订阅所有接收到的流,而concat
是按顺序一个一个的订阅。
例如我们有三个流a1
,a2
和a3
merge
会同时订阅这三个流,并将接收到的数据在不保证顺序的情况下发布给订阅者。concat
会先订阅a1
,将a1
的数据按照接收顺序发布给订阅者。当a1
数据发布结束后订阅a2
,当a2
数据发布结束后再订阅a3
。- Project Reactor - flatMapSequential的工作原理中的
mergeSequential
会和merge
一样同时订阅这三个流,但它会先发布a1
的数据,再发布a2
的数据,最后发布a3
的数据。这就要求每个流都有一个数据缓存,而不像concat
只需要一个数据缓存。
下面我们来介绍这两个方法的工作原理。
merge
使用方法
这里我们以Flux.merge
为例, 它的使用方法如下
场景1:合并一个流中的所有流
Flux.merge(Flux.just(Flux.just(1,1), Flux.just(2,2))).blockFirst(); //1,1,2,2
它等价于
Flux.just(Flux.just(1,1), Flux.just(2,2)).flatMap(x -> x).blockFirst(); //1,1,2,2
而在代码实现上,它也复用了Project Reactor - flatMap的工作原理。
场景2:合并一个数组中的所有流
Flux.merge(Flux.just(1,1), Flux.just(2,2)).blockFirst(); //1,1,2,2
和场景1不同的是,该场景直接以Vargs接收多个Publisher
,它们在函数中以数组形式存在。
工作原理
这里我们只讨论场景2的工作原理,Project Reactor引入了FluxMerge
对这种场景做了优化,它省去了flatMap
中 FluxFlatMap
的订阅操作,直接用FluxMapMain
订阅FluxArray.ArraySubscription
以场景2的代码为例,其工作流程如下
这里FluxMerge
和FluxFlatMap
不同的是,它没有订阅Publihser数组,而是直接构建了ArraySubscription
来通知FlatMapMain
订阅成功。我在这里只列出了前9步,后续步骤和flatMap
一致,就不再赘述。
- 主程序创建
Publisher
数组,它的每一个元素都是一个CorePublisher
。 - 主程序调用
Flux.merge
对Publisher
数组中的CorePublisher
进行合并。 - 主程序创建
FluxMerge
实例,它包含了Publisher
数组,并且也是一个CorePublisher
。 - 主程序创建
BlockingFirstSubscriber
实例。 - 主程序调用
FluxMerge.subscribe(BlockingFirstSubscriber)
订阅FluxMerge
。 FluxMerge
创建FlatMapMain
实例,它包含了BlockingFirstSubscriber
和转换函数f
, 这里f
是identity
函数,该函数会原样返回输入值。FluxMerge
创建ArraySubscription
实例,它包含了FlatMapMain
和Publisher
数组。FluxMerge
调用FlatMapMain.onSubscribe(ArraySubscription)
来通知FlatMapMain
订阅成功。注意,这里FlatMapMain
并不需要发起订阅请求。
concat
这里我们以Flux.concat
为例,它有三种不同的实现方式,对应三种使用场景。
场景1:合并一个数组中的所有流
使用方法
Flux.concat(Flux.just(1,1), Flux.just(2,2)).blockFirst(); //1,1,2,2
这里concat
以Vargs接收多个Publisher
, 而Vargs在函数内部以数组的形式存在。
工作原理
为了充分利用数组长度固定,易于遍历的特点,Project Reactor引入了FluxConcatArray
和ConcatArraySubscriber
来支持该场景。其类图如下
以上述代码为例,其工作流程如下
- 主程序创建
Publisher
数组,它的每一个元素都是一个CorePublisher
。 - 主程序调用
Flux.concat(Publisher array)
对Publisher
数组中的CorePublisher
进行合并。 Flux.concat
创建FluxConcatArray
实例,它包含了Publisher
数组。- 主程序创建
BlockingFirstSubscriber
实例。 - 主程序调用
FluxConcatArray.subscribe(BlockingFirstSubscriber)
订阅FluxConcatArray
。 FluxConcatArray
创建ConcatArraySubscriber
实例,它包含了BlockingFirstSubscriber
和Publisher
数组。FluxConcatArray
调用ConcatArraySubscriber.onComplete()
来通知ConcatArraySubscriber
数据发布结束。大家可能注意到,这里ConcatArraySubscriber
并未请求任何数据却直接收到了onComplete
。这样做的原因是ConcatArraySubscriber
已经拿到了Publisher
数组,不需要再像flatMap
中的FlatMapMain
那样等待数据发布。ConcatArraySubscriber
从Publisher
数组中获取一个Publihser
。ConcatArraySubscriber
调用Publisher.subscribe(ConcatArraySubscriber)
来订阅该Publisher
。Publisher
创建Subscription
实例。Publisher
调用ConcatArraySubscriber.onSubscribe(Subscription)
来通知ConcatArraySubscriber
订阅成功。- 如果该
Publisher
是数组中的第一个Publisher
,ConcatArraySubscriber
调用BlockingFirstSubscriber.onSubscribe(ConcatArraySubscriber)
来通知其订阅成功。 BlockingFirstSubscriber
调用ConcatArraySubscriber.request(n)
来请求固定大小的数据,该调用可发生多次且可在第12步后任何时间点发生。ConcatArraySubscriber
调用Subscription.request(remain)
来请求数据,其大小为BlockingFirstSubscriber
请求大小减去已发布数据大小。Subscription
调用ConcatArraySubscriber.onNext(value)
来向ConcatArraySubscriber
发布数据,该调用可发生多次。ConcatArraySubscriber
调用BlockingFirstSubscriber.onNext(value)
来向BlockingFirstSubscriber
发布数据。- 在数据发布结束时,
Subscription
调用ConcatArraySubscriber.onComplete()
来通知ConcatArraySubscriber
数据发布结束。回到步骤8,直到ConcatArraySubscriber
的Publisher
数组遍历完成。 - 如果
Publihser
数组遍历完成且Subscription
数据发布结束,调用BlockingFirstSubscriber.onComplete()
来通知BlockingFirstSubscriber
数据发布结束。
场景2:合并一个可迭代对象中的所有流
使用方法
import io.vavr.collection.List
Flux.concat(List.of(Flux.just(1,1), Flux.just(2,2))).blockFirst(); //1,1,2,2
这里我们使用了Vavr的List
,它是一个Iterable
。
工作原理
Iterable
的遍历和数组不同,Project Reactor 引入了FluxConcatIterable
和ConcatIterableSubscriber
来支持该场景。其类图如下
以上述代码为例,其工作流程如下
- 主程序创建
Publisher
列表,它的每一个元素都是一个CorePublisher
。 - 主程序调用
Flux.concat(Publisher list)
对Publisher
列表中的CorePublisher
进行合并。 Flux.concat
创建FluxConcatIterable
示例,它包含了Publisher
列表。- 主程序创建
BlockingFirstSubscriber
实例。 - 主程序调用
FluxConcatIterable.subscribe(BlockingFirstSubscriber)
订阅FluxConcatIterable
。 FluxConcatIterable
创建ConcatIterableSubscriber
实例,它包含了BlockingFirstSubscriber
和Publisher
列表。FluxConcatIterable
调用BlockingFirstSubscriber.onSubscribe(ConcatIterableSubscriber)
来通知其订阅成功。BlockingFirstSubscriber
调用ConcatIterableSubscriber.request(n)
来请求固定大小的数据,该调用可发生多次且可在第7步后任何时间点发生。FluxConcatIterable
调用ConcatIterableSubscriber.onComplete()
来通知ConcatIterableSubscriber
数据发布结束。大家可能注意到,这里ConcatIterableSubscriber
并未请求任何数据却直接收到了onComplete
。这样做的原因是ConcatIterableSubscriber
已经拿到了Publisher
列表,不需要再像flatMap
中的FlatMapMain
那样等待数据发布。ConcatIterableSubscriber
从Publisher
列表中获取一个Publihser
。ConcatIterableSubscriber
调用Publisher.subscribe(ConcatIterableSubscriber)
来订阅该Publisher
。Publisher
创建Subscription
实例。Publisher
调用ConcatIterableSubscriber.onSubscribe(Subscription)
来通知ConcatIterableSubscriber
订阅成功。ConcatIterableSubscriber
调用Subscription.request(remain)
来请求数据,其大小为BlockingFirstSubscriber
请求大小减去已发布数据大小。Subscription
调用ConcatIterableSubscriber.onNext(value)
来向ConcatIterableSubscriber
发布数据,该调用可发生多次。ConcatIterableSubscriber
调用BlockingFirstSubscriber.onNext(value)
来向BlockingFirstSubscriber
发布数据。- 在数据发布结束时,
Subscription
调用ConcatIterableSubscriber.onComplete()
来通知ConcatIterableSubscriber
数据发布结束。回到步骤10,直到ConcatIterableSubscriber
的Publisher
列表遍历完成。 - 如果
Publihser
列表遍历完成且Subscription
数据发布结束,调用BlockingFirstSubscriber.onComplete()
来通知BlockingFirstSubscriber
数据发布结束。
场景3:合并一个流中的所有流
使用方法
Flux.concat(Flux.just(Flux.just(1,1), Flux.just(2,2))).blockFirst(); //1,1,2,2
这里concat
接收的是一个流,它的每一个元素都是我们想要合并的流。它是concatMap
的一个特殊实现,上面的代码等价于
Flux.just(Flux.just(1,1), Flux.just(2,2)).concatMap(x -> x).blockFirst(); //1,1,2,2
工作原理
这里我们直接讲解concatMap
的实现原理,其类图如下
以上面的代码为例,其工作流程如下
- 主程序创建
FluxArray
实例,它的每一个元素都是一个CorePublisher
。 - 主程序调用
Flux.concat(FluxArray)
对FluxArray
中的CorePublisher
进行合并。 Flux.concat
创建FluxConcatMap
示例,它包含了FluxArray
和一个转换函数f
,在上述代码中,f
是identity
函数。- 主程序创建
BlockingFirstSubscriber
实例。 - 主程序调用
FluxConcatMap.subscribe(BlockingFirstSubscriber)
订阅FluxConcatMap
。 FluxConcatMap
创建ConcatMapImmediate
实例,它包含了BlockingFirstSubscriber
和f
。ConcatMapImmediate
创建ConcatMapInner
实例,它用来订阅FluxArray
中的每一个CorePublisher
。FluxConcatMap
调用FluxArray.subscribe(ConcatMapImmediate)
订阅FluxArray
。FluxArray
创建Subscription
实例。FluxArray
调用ConcatMapImmediate.onSubscribe(Subscription)
来通知ConcatMapImmediate
订阅成功。ConcatMapImmediate
调用BlockingFirstSubscriber.onSubscribe(ConcatMapImmediate)
来通知BlockingFirstSubscriber
订阅成功,这里ConcatMapImmediate
既是一个Subscriber
,也是一个Subscription
。ConcatMapImmediate
调用Subscription.request(prefetch)
来请求数据,其大小为数据预加载大小。该调用采取预加载的模式,接收的数据会先存储到ConcatMapImmediate
的数据队列中, 不需要等待BlockingFirstSubscriber
调用ConcatMapImmediate.request(n)
。BlockingFirstSubscriber
调用ConcatMapImmediate.request(n)
来请求固定大小的数据,该调用可发生多次且可在步骤11后的任意时间发生。Subscription
调用ConcatMapImmediate.onNext(value)
来向ConcatMapImmediate
发布数据,该调用可发生多次。- 在数据发布结束时,
Subscription
需要调用ConcatMapImmediate.onComplete()
来通知ConcatMapImmediate
数据发布结束。该步骤可以发生在第11步之后的任何时间点,和后续步骤是并行关系 ConcatMapImmediate
将接收到的数据放入数据队列。ConcatMapImmediate
从数据队列中获取一个元素,ConcatMapImmediate
调用f(value)
来对数据进行转换,在上述代码中转换后的数据还是value
, 它是一个Publisher
,这里我们将其命名为MappedPublisher
。ConcatMapImmediate
调用MappedPublisher.subscribe(ConcatMapInner)
,注意这里不会生成新的ConcatMapInner
实例,而是复用同一个实例。MappedPublisher
创建Subscription
实例,这里我们将其命名为MappedSubscription
。MappedPublisher
调用ConcatMapInner.onSubscribe(MappedSubscription)
来通知ConcatMapInner
订阅成功。ConcatMapInner
调用MappedSubscription.request(remain)
来请求数据,其大小为BlockingFirstSubscriber
请求大小减去已发布数据大小。MappedSubscription
调用ConcatMapInner.onNext(value)
来向ConcatMapInner
发布数据,该调用可发生多次。ConcatMapInner
调用ConcatMapImmediate.innerNext(value)
来向ConcatMapImmediate
发布数据。ConcatMapImmediate
调用BlockingFirstSubscriber.onNext(value)
来向BlockingFirstSubscriber
发布数据。- 在数据发布结束时,
MappedSubscription
调用ConcatMapInner.onComplete()
来通知ConcatMapInner
数据发布结束。回到步骤17,直到ConcatMapImmediate
数据队列为空。 - 如果数据队列为空且
Subscription
数据发布结束,调用BlockingFirstSubscriber.onComplete()
来通知BlockingFirstSubscriber
数据发布结束。
不同时间点的状态变化
以场景3为例,我们看一下Flux.concat
在不同时间点的状态变化。
这里我们假设Subscription
发布了三个元素,每个元素都会生成一个MappedSubscription
。
- 状态1
ConcatMapImmediate
从数据队列中获取第一个元素,并生成对应的MappedSubscription
。ConcatMapInner
订阅该MappedSubscription
并向Subscriber
转发数据,直到数据发布结束。ConcatMapImmediate
将第二个元素存储到它的数据队列中。ConcatMapImmediate
将第三个元素存储到它的数据队列中。
- 状态2
- 第一个
MappedSubscription
数据发布结束。 ConcatMapImmediate
从数据队列中获取第二个元素,并生成对应的MappedSubscription
。ConcatMapInner
订阅该MappedSubscription
并向Subscriber
转发数据,直到数据发布结束。ConcatMapImmediate
将第三个元素存储到它的数据队列中。
- 第一个
- 状态3
- 第一个
MappedSubscription
数据发布结束。 - 第二个
MappedSubscription
数据发布结束。 ConcatMapImmediate
从数据队列中获取第三个元素,并生成对应的MappedSubscription
。ConcatMapInner
订阅该MappedSubscription
并向Subscriber
转发数据,直到数据发布结束。
- 第一个
总结
merge
和concat
没有看起来那么简单,它们针对不同的数据源有不同的实现。下一篇我们讲解reduce
,看看它和concat
一起使用会有什么效果。