searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

JAVA Reactor系列(四)——过滤操作符

2023-07-27 09:46:25
13
0

过滤操作符可以帮助我们过滤掉数据流中不需要的元素,只留下我们感兴趣的元素。本章将详细介绍Reactor中常用的过滤操作符。

filter


filter是最常见的过滤操作符。它接收一个Predicate函数作为参数,该函数会对每个元素进行判断,返回true表示保留,返回false表示过滤掉。

Flux.just(1, 2, 3, 4, 5)
    .filter(i -> i % 2 == 0)
    .subscribe(System.out::println);
 

上例中,filter过滤掉了奇数,只保留了偶数。

distinct


distinct可以过滤掉重复的元素。它会记住每一个已经见过的元素,如果后续元素与之前的元素相同,则会被过滤掉。

Flux.just(1, 2, 1, 3, 2, 4)
    .distinct()
    .subscribe(System.out::println);  
 

上例中,重复的1和2被过滤掉了,只剩下不重复的元素。

distinct默认使用对象的equals方法判断元素是否相同。我们也可以提供一个函数来自定义元素比较逻辑:

Flux.just(new Person("Alice"), new Person("Bob"))
    .distinct(p -> p.getName())
    .subscribe(System.out::println);
 

此时即使Person对象不同,只要名称相同就视为重复元素被过滤。

skip 和 take


skip可以跳过开始的N个元素,take可以只取开始的N个元素,它们可以结合使用来实现切片操作。

 
Flux.just(1, 2, 3, 4, 5)
    .skip(2)
    .take(2)
    .subscribe(System.out::println);
 
 

上例将会跳过前2个元素,然后取2个元素,即结果是3和4。

sample


sample会定期发出数据流中的最新元素。它需要传入一个采样发布周期。

Flux.interval(Duration.ofMillis(100)) 
    .sample(Duration.ofSeconds(1))
    .take(5)
    .subscribe(System.out::println);
 

sample除了基本的定期采样用法外,还支持结合其他发布者作为采样信号。

Flux<Long> interval = Flux.interval(Duration.ofMillis(100));

Flux<String> signal = Flux.just("A", "B", "C");

interval.sample(signal)
        .subscribe(System.out::println); 

上例中,interval作为被采样的序列,每100ms发出一个元素。signal是一个包含A、B、C三个元素的Flux,作为采样发布者。

sample会在signal发出元素时采样interval的最新值并发出。也就是说,当signal发出A时,会取interval的最新值并发出;当signal发出B时,同理取最新值并发出;当signal发出C时,还是取interval的最新值并发出。

interval会持续发出新值,但sample只在signal发出值时进行采样。这种机制可以用来实现定期对数据流的采样,同时采样的时机由另一个发布者控制。

另一个示例是结合switchMap使用sample:

Flux.just("A", "B", "C")
    .switchMap(k -> {
        return doAsyncLookup(k)
                  .sample(Duration.ofMillis(100)); 
    })
    .subscribe(v -> {
        // 使用处理结果 
    });

这里有一个异步查询doAsyncLookup,它返回一个Flux。在switchMap内部,对该Flux加上了sample操作,意思是每100ms对查询结果采样并发出最新值。

这样可以避免查询Flux频繁发出中间结果,而是以固定的频率采样最新的结果进行处理。

sample的这种用法可以实现定期对异步流的采样,非常适合处理异步搜索、网络请求等场景,既可以得到最新的结果,也可以控制发出结果的频率。

总之,sample是一个非常有用的操作符,可以加以灵活运用在复杂的异步流处理场景中。

0条评论
0 / 1000
yurch
7文章数
0粉丝数
yurch
7 文章 | 0 粉丝
原创

JAVA Reactor系列(四)——过滤操作符

2023-07-27 09:46:25
13
0

过滤操作符可以帮助我们过滤掉数据流中不需要的元素,只留下我们感兴趣的元素。本章将详细介绍Reactor中常用的过滤操作符。

filter


filter是最常见的过滤操作符。它接收一个Predicate函数作为参数,该函数会对每个元素进行判断,返回true表示保留,返回false表示过滤掉。

Flux.just(1, 2, 3, 4, 5)
    .filter(i -> i % 2 == 0)
    .subscribe(System.out::println);
 

上例中,filter过滤掉了奇数,只保留了偶数。

distinct


distinct可以过滤掉重复的元素。它会记住每一个已经见过的元素,如果后续元素与之前的元素相同,则会被过滤掉。

Flux.just(1, 2, 1, 3, 2, 4)
    .distinct()
    .subscribe(System.out::println);  
 

上例中,重复的1和2被过滤掉了,只剩下不重复的元素。

distinct默认使用对象的equals方法判断元素是否相同。我们也可以提供一个函数来自定义元素比较逻辑:

Flux.just(new Person("Alice"), new Person("Bob"))
    .distinct(p -> p.getName())
    .subscribe(System.out::println);
 

此时即使Person对象不同,只要名称相同就视为重复元素被过滤。

skip 和 take


skip可以跳过开始的N个元素,take可以只取开始的N个元素,它们可以结合使用来实现切片操作。

 
Flux.just(1, 2, 3, 4, 5)
    .skip(2)
    .take(2)
    .subscribe(System.out::println);
 
 

上例将会跳过前2个元素,然后取2个元素,即结果是3和4。

sample


sample会定期发出数据流中的最新元素。它需要传入一个采样发布周期。

Flux.interval(Duration.ofMillis(100)) 
    .sample(Duration.ofSeconds(1))
    .take(5)
    .subscribe(System.out::println);
 

sample除了基本的定期采样用法外,还支持结合其他发布者作为采样信号。

Flux<Long> interval = Flux.interval(Duration.ofMillis(100));

Flux<String> signal = Flux.just("A", "B", "C");

interval.sample(signal)
        .subscribe(System.out::println); 

上例中,interval作为被采样的序列,每100ms发出一个元素。signal是一个包含A、B、C三个元素的Flux,作为采样发布者。

sample会在signal发出元素时采样interval的最新值并发出。也就是说,当signal发出A时,会取interval的最新值并发出;当signal发出B时,同理取最新值并发出;当signal发出C时,还是取interval的最新值并发出。

interval会持续发出新值,但sample只在signal发出值时进行采样。这种机制可以用来实现定期对数据流的采样,同时采样的时机由另一个发布者控制。

另一个示例是结合switchMap使用sample:

Flux.just("A", "B", "C")
    .switchMap(k -> {
        return doAsyncLookup(k)
                  .sample(Duration.ofMillis(100)); 
    })
    .subscribe(v -> {
        // 使用处理结果 
    });

这里有一个异步查询doAsyncLookup,它返回一个Flux。在switchMap内部,对该Flux加上了sample操作,意思是每100ms对查询结果采样并发出最新值。

这样可以避免查询Flux频繁发出中间结果,而是以固定的频率采样最新的结果进行处理。

sample的这种用法可以实现定期对异步流的采样,非常适合处理异步搜索、网络请求等场景,既可以得到最新的结果,也可以控制发出结果的频率。

总之,sample是一个非常有用的操作符,可以加以灵活运用在复杂的异步流处理场景中。

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0