searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

JAVA Reactor系列(五)——组合操作符

2023-07-28 02:07:55
24
0

组合操作符可以将多个流组合在一起,实现比较复杂的组合逻辑。本章将详细介绍Reactor中常用的组合操作符。

merge


merge可以将多个流合并成一个流。它接受多个Flow作为参数,将它们合并后按顺序发出元素。

Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("D", "E", "F");

Flux.merge(flux1, flux2)
    .subscribe(System.out::println);

上例中,merge了两个Flow,元素按顺序发出,结果是A B C D E F。

mergeSequential


mergeSequential与merge类似,不同的是它会先等待flux1全部发出元素后,再发出flux2中的元素。元素在各自的Flow内是有序的,但不同Flow之间无顺序。

 
Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(100));
Flux<String> flux2 = Flux.just("D", "E", "F"); 

Flux.mergeSequential(flux1, flux2)
    .subscribe(System.out::println);
 

上例中,会先发出完整的flux1,然后再发出flux2。

zip


zip可以将多个流组合,它会将从每个流中取出一个元素,将这些元素按顺序组成一个新元素,然后发出。

 
Flux.just("A", "B", "C")
    .zipWith(Flux.just(1, 2, 3)) 
    .subscribe(tuple -> {
        System.out.println(tuple.getT1() + ":" + tuple.getT2());
    });
 
 

上例中,将两个流的元素配对后发出,结果是A:1 B:2 C:3。

zip也支持更多流的组合,每个元素会包含对应流中取出的所有元素。

combineLatest


combineLatest会在任意一个流有新元素时,取出每个流最新的一个元素组合成新元素发出。

 
Flux.just("A", "B", "C")
    .combineLatest(Flux.just(1, 2, 3), (s, i) -> s + ":" + i)
    .subscribe(System.out::println);  
 
 

上例中,当第二个流有新元素时,会取出两个流最新的元素组合。结果是A:1 B:1 B:2 C:2 C:3。

concat


concat可以将多个流线性连续,当前一个流完成后,再发出下一个流。

 
Flux.concat(flux1, flux2, flux3)
    .subscribe(System.out::println);
 

concat的元素顺序为flux1的所有元素,然后flux2的所有元素,最后flux3的所有元素。

startWith 和 endWith


startWith可以在流开始之前插入一些数据,endWith可以在流结束之后插入数据。

 
Flux.just("A", "B", "C")
    .startWith("0")
    .endWith("D")
    .subscribe(System.out::println);
 

上例中,在流开始前插入了"0",结尾插入了"D",因此结果是0 A B C D。

 

then 


then结束前一个流,继续发出另一个流。

 
Flux.just("A", "B", "C")
    .then(Mono.just("D"))
    .subscribe(System.out::println);
 

上例中,then会在第一个流完成后开始发出第二个流, 只有第二个流会继续传到下游。

repeat 和 retry


repeat可以重复发出某个流中的元素:

 
Flux.just("A", "B", "C")
    .repeat(3)
    .subscribe(System.out::println);
 
 

上例中,repeat(3)会重复整个流3次。

retry可以在出现错误时重新发出流中的元素:

Flux.just("A", "B")
    .concatWith(Flux.error(new RuntimeException()))
    .retry(1)
    .subscribe(System.out::println);
 

上例中,retry(1)会在异常后重新发出流一次。

0条评论
0 / 1000
yurch
7文章数
0粉丝数
yurch
7 文章 | 0 粉丝
原创

JAVA Reactor系列(五)——组合操作符

2023-07-28 02:07:55
24
0

组合操作符可以将多个流组合在一起,实现比较复杂的组合逻辑。本章将详细介绍Reactor中常用的组合操作符。

merge


merge可以将多个流合并成一个流。它接受多个Flow作为参数,将它们合并后按顺序发出元素。

Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("D", "E", "F");

Flux.merge(flux1, flux2)
    .subscribe(System.out::println);

上例中,merge了两个Flow,元素按顺序发出,结果是A B C D E F。

mergeSequential


mergeSequential与merge类似,不同的是它会先等待flux1全部发出元素后,再发出flux2中的元素。元素在各自的Flow内是有序的,但不同Flow之间无顺序。

 
Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(100));
Flux<String> flux2 = Flux.just("D", "E", "F"); 

Flux.mergeSequential(flux1, flux2)
    .subscribe(System.out::println);
 

上例中,会先发出完整的flux1,然后再发出flux2。

zip


zip可以将多个流组合,它会将从每个流中取出一个元素,将这些元素按顺序组成一个新元素,然后发出。

 
Flux.just("A", "B", "C")
    .zipWith(Flux.just(1, 2, 3)) 
    .subscribe(tuple -> {
        System.out.println(tuple.getT1() + ":" + tuple.getT2());
    });
 
 

上例中,将两个流的元素配对后发出,结果是A:1 B:2 C:3。

zip也支持更多流的组合,每个元素会包含对应流中取出的所有元素。

combineLatest


combineLatest会在任意一个流有新元素时,取出每个流最新的一个元素组合成新元素发出。

 
Flux.just("A", "B", "C")
    .combineLatest(Flux.just(1, 2, 3), (s, i) -> s + ":" + i)
    .subscribe(System.out::println);  
 
 

上例中,当第二个流有新元素时,会取出两个流最新的元素组合。结果是A:1 B:1 B:2 C:2 C:3。

concat


concat可以将多个流线性连续,当前一个流完成后,再发出下一个流。

 
Flux.concat(flux1, flux2, flux3)
    .subscribe(System.out::println);
 

concat的元素顺序为flux1的所有元素,然后flux2的所有元素,最后flux3的所有元素。

startWith 和 endWith


startWith可以在流开始之前插入一些数据,endWith可以在流结束之后插入数据。

 
Flux.just("A", "B", "C")
    .startWith("0")
    .endWith("D")
    .subscribe(System.out::println);
 

上例中,在流开始前插入了"0",结尾插入了"D",因此结果是0 A B C D。

 

then 


then结束前一个流,继续发出另一个流。

 
Flux.just("A", "B", "C")
    .then(Mono.just("D"))
    .subscribe(System.out::println);
 

上例中,then会在第一个流完成后开始发出第二个流, 只有第二个流会继续传到下游。

repeat 和 retry


repeat可以重复发出某个流中的元素:

 
Flux.just("A", "B", "C")
    .repeat(3)
    .subscribe(System.out::println);
 
 

上例中,repeat(3)会重复整个流3次。

retry可以在出现错误时重新发出流中的元素:

Flux.just("A", "B")
    .concatWith(Flux.error(new RuntimeException()))
    .retry(1)
    .subscribe(System.out::println);
 

上例中,retry(1)会在异常后重新发出流一次。

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0