Project Reactor - filter的工作原理

Project Reactor - filter的工作原理

sjmyuan 41 2023-01-19

我们已经在Project Reactor - map的工作原理中讨论了mapfilter的工作原理和map很类似,不同是filter不是改变数据,而是忽略不符合条件的数据。

使用方法

这里我们以Flux.filter为例(Mono.filter的工作原理类似), 它的使用方法如下

Flux.just(1, 2).filter(x -> x > 1).blockFirst(); // 2

filter可以判断流中的每个元素是否符合给定条件,如果不符合就会忽略该元素。

这行代码可以被分为三个部分

  1. 创建一个初始Publisher,这里是FluxArray
  2. 创建一个新的Publisher作为FluxArray的装饰类并保存filter的判断函数(x -> x > 1),这里是FluxFilter,它使用了装饰器模式。
  3. 创建一个Subscriber来订阅FluxFilter,这里是BlockingFirstSubscriber。在FluxFilter被订阅后,它会在FluxArrayBlockingFirstSubscriber之间加入一个FilterSubscriberFilterSubscriber会作为Processor执行filter的判断函数。

工作原理

注意:这里我们没有考虑ConditionalSubscriber,后续会对其进行单独介绍。

FluxFilter保存了filter的判断函数,在被订阅后,它会创建FilterSubscriber并将判断函数传给它。

以上面的代码为例,filter的工作流程如下

  1. 主程序创建FluxArray实例,它是一个CorePublisher
  2. 主程序调用FluxArray.filter(f)对数据进行过滤。
  3. FluxArray创建FluxFilter实例,它包含了FluxArrayf,并且也是一个CorePublisher
  4. 主程序创建BlockingFirstSubscriber实例。
  5. 主程序调用FluxFilter.subscribe(BlockingFirstSubscriber)订阅FluxFilter
  6. FluxFilter创建FilterSubscriber实例,它包含了BlockingFirstSubscriberf
  7. FluxFilter调用FluxArray.subscribe(FilterSubscriber)订阅FluxArray
  8. FluxArray创建Subscription实例。
  9. FluxArray调用FilterSubscriber.onSubscribe(Subscription)来通知FilterSubscriber订阅成功。
  10. FilterSubscriber调用BlockingFirstSubscriber.onSubscribe(FilterSubscriber)来通知BlockingFirstSubscriber订阅成功,这里FilterSubscriber既是一个Subscriber,也是一个Subscription
  11. BlockingFirstSubscriber调用FilterSubscriber.request(n)来请求固定大小的数据,该调用可发生多次。
  12. FilterSubscriber调用Subscription.request(n)来请求固定大小的数据,该调用可发生多次。
  13. Subscription调用FilterSubscriber.onNext(value)来向FilterSubscriber发布数据,该调用可发生多次。
  14. FilterSubscriber调用f(value)来对数据进行判断,这是filter的核心逻辑。
  15. 如果判断值是true, FilterSubscriber调用BlockingFirstSubscriber.onNext(value)来向BlockingFirstSubscriber发布数据。
  16. 如果判断值是false, FilterSubscriber会忽略当前数据,并调用Subscription.request(1)重新请求一个数据,交互流程会重新回到第13步。
  17. 在数据发布结束时,Subscription需要调用FilterSubscriber.onComplete()来通知FilterSubscriber数据发布结束。
  18. FilterSubscriber调用BlockingFirstSubscriber.onComplete()来通知BlockingFirstSubscriber数据发布结束。

总结

mapfilter一起已经可以完成很多的功能,但是它们无法用流中的每个元素生成一个新的流并将它们合并起来。要实现这个功能,我们需要flatMap,下一篇我们就介绍一下它的工作原理。