我们已经在Project Reactor - map的工作原理中讨论了map,filter的工作原理和map很类似,不同是filter不是改变数据,而是忽略不符合条件的数据。
使用方法
这里我们以Flux.filter为例(Mono.filter的工作原理类似), 它的使用方法如下
Flux.just(1, 2).filter(x -> x > 1).blockFirst(); // 2
filter可以判断流中的每个元素是否符合给定条件,如果不符合就会忽略该元素。
这行代码可以被分为三个部分

- 创建一个初始
Publisher,这里是FluxArray。 - 创建一个新的
Publisher作为FluxArray的装饰类并保存filter的判断函数(x -> x > 1),这里是FluxFilter,它使用了装饰器模式。 - 创建一个
Subscriber来订阅FluxFilter,这里是BlockingFirstSubscriber。在FluxFilter被订阅后,它会在FluxArray和BlockingFirstSubscriber之间加入一个FilterSubscriber。FilterSubscriber会作为Processor执行filter的判断函数。
工作原理

注意:这里我们没有考虑ConditionalSubscriber,后续会对其进行单独介绍。
FluxFilter保存了filter的判断函数,在被订阅后,它会创建FilterSubscriber并将判断函数传给它。
以上面的代码为例,filter的工作流程如下

- 主程序创建
FluxArray实例,它是一个CorePublisher。 - 主程序调用
FluxArray.filter(f)对数据进行过滤。 FluxArray创建FluxFilter实例,它包含了FluxArray和f,并且也是一个CorePublisher。- 主程序创建
BlockingFirstSubscriber实例。 - 主程序调用
FluxFilter.subscribe(BlockingFirstSubscriber)订阅FluxFilter。 FluxFilter创建FilterSubscriber实例,它包含了BlockingFirstSubscriber和f。FluxFilter调用FluxArray.subscribe(FilterSubscriber)订阅FluxArray。FluxArray创建Subscription实例。FluxArray调用FilterSubscriber.onSubscribe(Subscription)来通知FilterSubscriber订阅成功。FilterSubscriber调用BlockingFirstSubscriber.onSubscribe(FilterSubscriber)来通知BlockingFirstSubscriber订阅成功,这里FilterSubscriber既是一个Subscriber,也是一个Subscription。BlockingFirstSubscriber调用FilterSubscriber.request(n)来请求固定大小的数据,该调用可发生多次。FilterSubscriber调用Subscription.request(n)来请求固定大小的数据,该调用可发生多次。Subscription调用FilterSubscriber.onNext(value)来向FilterSubscriber发布数据,该调用可发生多次。FilterSubscriber调用f(value)来对数据进行判断,这是filter的核心逻辑。- 如果判断值是true,
FilterSubscriber调用BlockingFirstSubscriber.onNext(value)来向BlockingFirstSubscriber发布数据。 - 如果判断值是false,
FilterSubscriber会忽略当前数据,并调用Subscription.request(1)重新请求一个数据,交互流程会重新回到第13步。 - 在数据发布结束时,
Subscription需要调用FilterSubscriber.onComplete()来通知FilterSubscriber数据发布结束。 FilterSubscriber调用BlockingFirstSubscriber.onComplete()来通知BlockingFirstSubscriber数据发布结束。
总结
map和filter一起已经可以完成很多的功能,但是它们无法用流中的每个元素生成一个新的流并将它们合并起来。要实现这个功能,我们需要flatMap,下一篇我们就介绍一下它的工作原理。