过滤操作符可以帮助我们过滤掉数据流中不需要的元素,只留下我们感兴趣的元素。本章将详细介绍Reactor中常用的过滤操作符。
filter
filter是最常见的过滤操作符。它接收一个Predicate函数作为参数,该函数会对每个元素进行判断,返回true表示保留,返回false表示过滤掉。
Flux.just(1, 2, 3, 4, 5)
.filter(i -> i % 2 == 0)
.subscribe(System.out::println);
上例中,filter过滤掉了奇数,只保留了偶数。
distinct
distinct可以过滤掉重复的元素。它会记住每一个已经见过的元素,如果后续元素与之前的元素相同,则会被过滤掉。
Flux.just(1, 2, 1, 3, 2, 4)
.distinct()
.subscribe(System.out::println);
上例中,重复的1和2被过滤掉了,只剩下不重复的元素。
distinct默认使用对象的equals方法判断元素是否相同。我们也可以提供一个函数来自定义元素比较逻辑:
Flux.just(new Person("Alice"), new Person("Bob"))
.distinct(p -> p.getName())
.subscribe(System.out::println);
此时即使Person对象不同,只要名称相同就视为重复元素被过滤。
skip 和 take
skip可以跳过开始的N个元素,take可以只取开始的N个元素,它们可以结合使用来实现切片操作。
Flux.just(1, 2, 3, 4, 5)
.skip(2)
.take(2)
.subscribe(System.out::println);
上例将会跳过前2个元素,然后取2个元素,即结果是3和4。
sample
sample会定期发出数据流中的最新元素。它需要传入一个采样发布周期。
Flux.interval(Duration.ofMillis(100))
.sample(Duration.ofSeconds(1))
.take(5)
.subscribe(System.out::println);
sample除了基本的定期采样用法外,还支持结合其他发布者作为采样信号。
Flux<Long> interval = Flux.interval(Duration.ofMillis(100));
Flux<String> signal = Flux.just("A", "B", "C");
interval.sample(signal)
.subscribe(System.out::println);
上例中,interval作为被采样的序列,每100ms发出一个元素。signal是一个包含A、B、C三个元素的Flux,作为采样发布者。
sample会在signal发出元素时采样interval的最新值并发出。也就是说,当signal发出A时,会取interval的最新值并发出;当signal发出B时,同理取最新值并发出;当signal发出C时,还是取interval的最新值并发出。
interval会持续发出新值,但sample只在signal发出值时进行采样。这种机制可以用来实现定期对数据流的采样,同时采样的时机由另一个发布者控制。
另一个示例是结合switchMap使用sample:
Flux.just("A", "B", "C")
.switchMap(k -> {
return doAsyncLookup(k)
.sample(Duration.ofMillis(100));
})
.subscribe(v -> {
// 使用处理结果
});
这里有一个异步查询doAsyncLookup,它返回一个Flux。在switchMap内部,对该Flux加上了sample操作,意思是每100ms对查询结果采样并发出最新值。
这样可以避免查询Flux频繁发出中间结果,而是以固定的频率采样最新的结果进行处理。
sample的这种用法可以实现定期对异步流的采样,非常适合处理异步搜索、网络请求等场景,既可以得到最新的结果,也可以控制发出结果的频率。
总之,sample是一个非常有用的操作符,可以加以灵活运用在复杂的异步流处理场景中。