组合操作符可以将多个流组合在一起,实现比较复杂的组合逻辑。本章将详细介绍Reactor中常用的组合操作符。
merge
merge可以将多个流合并成一个流。它接受多个Flow作为参数,将它们合并后按顺序发出元素。
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("D", "E", "F");
Flux.merge(flux1, flux2)
.subscribe(System.out::println);
上例中,merge了两个Flow,元素按顺序发出,结果是A B C D E F。
mergeSequential
mergeSequential与merge类似,不同的是它会先等待flux1全部发出元素后,再发出flux2中的元素。元素在各自的Flow内是有序的,但不同Flow之间无顺序。
Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(100));
Flux<String> flux2 = Flux.just("D", "E", "F");
Flux.mergeSequential(flux1, flux2)
.subscribe(System.out::println);
上例中,会先发出完整的flux1,然后再发出flux2。
zip
zip可以将多个流组合,它会将从每个流中取出一个元素,将这些元素按顺序组成一个新元素,然后发出。
Flux.just("A", "B", "C")
.zipWith(Flux.just(1, 2, 3))
.subscribe(tuple -> {
System.out.println(tuple.getT1() + ":" + tuple.getT2());
});
上例中,将两个流的元素配对后发出,结果是A:1 B:2 C:3。
zip也支持更多流的组合,每个元素会包含对应流中取出的所有元素。
combineLatest
combineLatest会在任意一个流有新元素时,取出每个流最新的一个元素组合成新元素发出。
Flux.just("A", "B", "C")
.combineLatest(Flux.just(1, 2, 3), (s, i) -> s + ":" + i)
.subscribe(System.out::println);
上例中,当第二个流有新元素时,会取出两个流最新的元素组合。结果是A:1 B:1 B:2 C:2 C:3。
concat
concat可以将多个流线性连续,当前一个流完成后,再发出下一个流。
Flux.concat(flux1, flux2, flux3)
.subscribe(System.out::println);
concat的元素顺序为flux1的所有元素,然后flux2的所有元素,最后flux3的所有元素。
startWith 和 endWith
startWith可以在流开始之前插入一些数据,endWith可以在流结束之后插入数据。
Flux.just("A", "B", "C")
.startWith("0")
.endWith("D")
.subscribe(System.out::println);
上例中,在流开始前插入了"0",结尾插入了"D",因此结果是0 A B C D。
then
then结束前一个流,继续发出另一个流。
Flux.just("A", "B", "C")
.then(Mono.just("D"))
.subscribe(System.out::println);
上例中,then会在第一个流完成后开始发出第二个流, 只有第二个流会继续传到下游。
repeat 和 retry
repeat可以重复发出某个流中的元素:
Flux.just("A", "B", "C")
.repeat(3)
.subscribe(System.out::println);
上例中,repeat(3)会重复整个流3次。
retry可以在出现错误时重新发出流中的元素:
Flux.just("A", "B")
.concatWith(Flux.error(new RuntimeException()))
.retry(1)
.subscribe(System.out::println);
上例中,retry(1)会在异常后重新发出流一次。