Project Reactor是基于Reactive Streams设计的,要了解map的工作原理,我们需要先了解Reactive Streams。
Reactive Streams
组成部分

Reactive Streams是一个规范, 它包含了四个部分
- Publisher
发布者,在收到订阅者的请求时才发布数据,且总数据大小可能是未知的。 - Subscriber
订阅者,可以请求多次数据,请求时可以根据自身处理能力决定数据的大小。 - Subscription
订阅,代表着发布者和订阅者之间订阅关系。发布者与订阅者的交互都是通过订阅来实现。 - Processor
数据处理器,既是发布者也是订阅者,可以用来对发布的数据进行转换或对错误进行处理。

工作原理
Publisher, Subscriber和Subscription之间的交互流程如下

- 主程序创建
Publiser实例。 - 主程序创建
Subscriber实例。 - 主程序调用
Publisher.subscribe(Subscriber)订阅Publisher。 Publisher创建Subscription。Publisher调用Subscriber.onSubscribe(Subscription)来通知Subscriber订阅成功。Subscriber调用Subscription.request(n)来请求固定大小的数据,该调用可发生多次。Subscription调用Subscriber.onNext(value)来向Subscriber发布数据,该调用可发生多次。- 在不需要订阅时,
Subscriber需要调用Subscription.cancel()来取消订阅。 - 在数据发布结束时,
Subscription需要调用Subscriber.onComplete()来通知Subscriber数据发布结束。 - 在数据发布出错时,
Subscription需要调用Subscriber.onError()来通知Subscriber数据发布错误。
map
使用方法
这里我们以Flux.map为例(Mono.map的工作原理类似), 它的使用方法如下
Flux.just(1).map(x -> x + 1).blockFirst(); // 2
map可以对流中的每个元素进行计算并用计算结果替换原来的元素。
这行代码可以被分为三个部分

- 创建一个初始
Publisher,这里是FluxJust。 - 创建一个新的
Publisher作为FluxJust的装饰类并保存map的转换函数(x -> x+1),这里是FluxMap,它使用了装饰器模式。 - 创建一个
Subscriber来订阅FluxMap,这里是BlockingFirstSubscriber。在FluxMap被订阅后,它会在FluxJust和BlockingFirstSubscriber之间加入一个MapSubscriber,MapSubscriber会作为Processor执行map的转换函数。
工作原理

注意:为了突出重点,这里进行了简化,忽略了一些对理解map工作原理不重要的类,例如InnerOperator,InternalFluxOperator。
可以看到MapSubscriber并没有严格的实现Processor,它直接继承了Subscription,而不是Publisher。FluxMap.subscribe()在每次被调用时都会生成一个新的MapSubscriber实例,因此MapSubscriber永远都只会有一个Subscriber,那么直接继承Subscription就可以简化生成Subscription的步骤。
FluxMap的作用是增加易用性。如果我们只有MapSubscriber,伪代码就会变成下面这样
Subscriber subscriber = new BlockingFirstSubscriber();
MapSubscriber mapSubscriber = new MapSubscriber(subscriber, x -> x + 1);
Flux<Int> flux = Flux.just(1);
flux.subscribe(mapSubscriber);
如果我们要进行两次map,伪代码会变成
Subscriber subscriber = new BlockingFirstSubscriber();
MapSubscriber mapSubscriber1 = new MapSubscriber(subscriber, x -> x + 1);
MapSubscriber mapSubscriber2 = new MapSubscriber(mapSubscriber1, x -> x + 2);
Flux<Int> flux = Flux.just(1);
flux.subscribe(mapSubscriber2);
我们既要嵌套更多的MapSubscriber,又要修改flux.subscribe()的入参,无法解耦Publisher和Subscriber。引入FluxMap后,我们只需要增加更多的FluxMap,不需要修改其他地方。
以上面的代码为例,map的工作流程如下

- 主程序创建
FluxJust实例,它是一个CorePublisher。 - 主程序调用
FluxJust.map(f)对数据进行转换。 FluxJust创建FluxMap实例,它包含了FluxJust和f,并且也是一个CorePublisher。- 主程序创建
BlockingFirstSubscriber实例。 - 主程序调用
FluxMap.subscribe(BlockingFirstSubscriber)订阅FluxMap。 FluxMap创建MapSubscriber实例,它包含了BlockingFirstSubscriber和f。FluxMap调用FluxJust.subscribe(MapSubscriber)订阅FluxJust。FluxJust创建Subscription实例。FluxJust调用MapSubscriber.onSubscribe(Subscription)来通知MapSubscriber订阅成功。MapSubscriber调用BlockingFirstSubscriber.onSubscribe(MapSubscriber)来通知BlockingFirstSubscriber订阅成功,这里MapSubscriber既是一个Subscriber,也是一个Subscription。BlockingFirstSubscriber调用MapSubscriber.request(n)来请求固定大小的数据,该调用可发生多次。MapSubscriber调用Subscription.request(n)来请求固定大小的数据,该调用可发生多次。Subscription调用MapSubscriber.onNext(value)来向MapSubscriber发布数据,该调用可发生多次。MapSubscriber调用f(value)来对数据进行转换,这是map的核心逻辑。MapSubscriber调用BlockingFirstSubscriber.onNext(mappedValue)来向BlockingFirstSubscriber发布转换后的数据,该调用可发生多次。- 在数据发布结束时,
Subscription需要调用MapSubscriber.onComplete()来通知MapSubscriber数据发布结束。 MapSubscriber调用BlockingFirstSubscriber.onComplete()来通知BlockingFirstSubscriber数据发布结束。
总结
map是Reactor中最基本的流操作方法,掌握它的工作原理可以帮助我们更好的理解其他操作,下一篇我们会在其基础上讲解filter的工作原理。