我们已经在Project Reactor - map的工作原理中讨论了map
,filter
的工作原理和map
很类似,不同是filter
不是改变数据,而是忽略不符合条件的数据。
使用方法
这里我们以Flux.filter
为例(Mono.filter
的工作原理类似), 它的使用方法如下
Flux.just(1, 2).filter(x -> x > 1).blockFirst(); // 2
filter
可以判断流中的每个元素是否符合给定条件,如果不符合就会忽略该元素。
这行代码可以被分为三个部分
- 创建一个初始
Publisher
,这里是FluxArray
。 - 创建一个新的
Publisher
作为FluxArray
的装饰类并保存filter
的判断函数(x -> x > 1
),这里是FluxFilter
,它使用了装饰器模式。 - 创建一个
Subscriber
来订阅FluxFilter
,这里是BlockingFirstSubscriber
。在FluxFilter
被订阅后,它会在FluxArray
和BlockingFirstSubscriber
之间加入一个FilterSubscriber
。FilterSubscriber
会作为Processor
执行filter
的判断函数。
工作原理
注意:这里我们没有考虑ConditionalSubscriber,后续会对其进行单独介绍。
FluxFilter
保存了filter
的判断函数,在被订阅后,它会创建FilterSubscriber
并将判断函数传给它。
以上面的代码为例,filter
的工作流程如下
- 主程序创建
FluxArray
实例,它是一个CorePublisher
。 - 主程序调用
FluxArray.filter(f)
对数据进行过滤。 FluxArray
创建FluxFilter
实例,它包含了FluxArray
和f
,并且也是一个CorePublisher
。- 主程序创建
BlockingFirstSubscriber
实例。 - 主程序调用
FluxFilter.subscribe(BlockingFirstSubscriber)
订阅FluxFilter
。 FluxFilter
创建FilterSubscriber
实例,它包含了BlockingFirstSubscriber
和f
。FluxFilter
调用FluxArray.subscribe(FilterSubscriber)
订阅FluxArray
。FluxArray
创建Subscription
实例。FluxArray
调用FilterSubscriber.onSubscribe(Subscription)
来通知FilterSubscriber
订阅成功。FilterSubscriber
调用BlockingFirstSubscriber.onSubscribe(FilterSubscriber)
来通知BlockingFirstSubscriber
订阅成功,这里FilterSubscriber
既是一个Subscriber
,也是一个Subscription
。BlockingFirstSubscriber
调用FilterSubscriber.request(n)
来请求固定大小的数据,该调用可发生多次。FilterSubscriber
调用Subscription.request(n)
来请求固定大小的数据,该调用可发生多次。Subscription
调用FilterSubscriber.onNext(value)
来向FilterSubscriber
发布数据,该调用可发生多次。FilterSubscriber
调用f(value)
来对数据进行判断,这是filter
的核心逻辑。- 如果判断值是true,
FilterSubscriber
调用BlockingFirstSubscriber.onNext(value)
来向BlockingFirstSubscriber
发布数据。 - 如果判断值是false,
FilterSubscriber
会忽略当前数据,并调用Subscription.request(1)
重新请求一个数据,交互流程会重新回到第13步。 - 在数据发布结束时,
Subscription
需要调用FilterSubscriber.onComplete()
来通知FilterSubscriber
数据发布结束。 FilterSubscriber
调用BlockingFirstSubscriber.onComplete()
来通知BlockingFirstSubscriber
数据发布结束。
总结
map
和filter
一起已经可以完成很多的功能,但是它们无法用流中的每个元素生成一个新的流并将它们合并起来。要实现这个功能,我们需要flatMap
,下一篇我们就介绍一下它的工作原理。