转换操作符是Reactor中最常用的一类操作符,它可以将数据流中的元素进行转换或组合生成新的元素。本章将详细介绍Reactor中的几个典型转换操作符。
map
map是最常见的转换操作符,它可以将数据流中的每个元素映射成一个新的元素。map接受一个Function函数作为参数,将原元素映射为新元素:
Flux.just(1, 2, 3)
.map(i -> i * 2)
.subscribe(System.out::println);
上例中,map操作将每个元素乘以2。
map支持异步转换,如果Function返回Mono或Flux,则map会对其进行订阅并将结果展开:
Mono.just("hello")
.map(word -> Mono.just(word.toUpperCase()))
.subscribe(System.out::println);
上例中,map中的Function返回了一个Mono,map会订阅该Mono并将其结果展开,因此打印出"HELLO"。
flatMap
flatMap与map类似,不同的是其mapping function的返回值必须是Mono或Flux,flatMap会将其展开为数据流。这可以用来实现各种嵌套结构的展开。
Flux.just("A", "B", "C")
.flatMap(letter -> Flux.just(letter, letter.toLowerCase()))
.subscribe(System.out::println);
上例中,flatMap将每个元素转换为一个包含两个元素的Flux,并展开这个Flux,因此结果是A a B b C c。
flatMap的mapping function如果返回Flux或Mono,则flatMap会等待它完成再继续处理数据流。这也意味着flatMap内部是异步的。
concatMap
concatMap与flatMap类似,不同的是它要求mapping function返回的Flux严格按顺序执行,前一个Flux完成后才会执行下一个Flux。
Flux.just("A", "B", "C")
.concatMap(letter -> Flux.just(letter, letter.toLowerCase())
.delayElements(Duration.ofMillis(100)))
.subscribe(System.out::println);
上例中,每个Flux之间有100ms的延迟,因此结果会严格按A a B b C c的顺序打印。
switchMap
switchMap与concatMap类似,不同的是它会取消前一个内部Flux的订阅,当一个新的内部Flux到达时,会停止处理前一个,直接开始处理新的。
Flux.just("A", "B", "C")
.switchMap(letter -> Flux.just(letter, letter.toLowerCase())
.delayElements(Duration.ofMillis(100)))
.subscribe(System.out::println);
上例中,当B到达时,会停止处理A的Flux,直接开始处理B,所以只会打印B b C c。
switchMap适合用在需要取消前一个异步操作的场景。
groupBy
groupBy可以将数据流中的元素按关键字分组,返回一个Flux<GroupedFlux>,每个GroupedFlux代表一个分组,可以继续对每个分组进行转换操作。
Flux.just("A", "B", "C", "D")
.groupBy(s -> s.toLowerCase().charAt(0))
.flatMap(gf -> gf.map(s -> s.toUpperCase()))
.subscribe(System.out::println);
上例中,先按第一个字符分组,然后对每个分组转换为大写。结果是A C B D。
collectList
collectList可以收集数据流中的所有元素到一个List中,最后返回一个Mono<List>。
Flux.just("A", "B", "C")
.collectList()
.subscribe(list -> {
list.forEach(System.out::println);
});
上例收集了所有元素到list中,然后遍历打印。
collectMap
collectMap与collectList类似,不同的是它将数据收集到一个Map中,key可以通过参数指定。
Flux.just("A", "B", "C")
.collectMap(s -> s.toLowerCase())
.subscribe(map -> {
map.forEach((k, v) -> {
System.out.println(k + ":" + v);
});
});
上例中,key为每个元素的小写形式。结果打印出a:A、b:B、c:C。
buffer
buffer可以将数据流中的元素缓冲到List中,每当列表达到指定大小时就发出这个List。它可以实现流的批处理。
Flux.just("A", "B", "C", "D", "E")
.buffer(3)
.subscribe(list -> {
list.forEach(System.out::println);
});
上例中,buffer大小为3。结果会打印出两次,一次是"A" "B" "C",一次是"D" "E"。
window
window与buffer类似,不同的是它发出的不是List而是Flux,每个Flux包含buffer时指定大小的元素。
Flux.just("A", "B", "C", "D", "E")
.window(3)
.flatMap(flux -> flux.collectList())
.subscribe(list -> {
list.forEach(System.out::println);
});
上例中,每个Flux包含3个元素,然后收集到List中打印。
reduce
reduce可以实现流的聚合操作。它接收一个初始值和一个BiFunction函数,该函数接受两个参数并返回一个新值。reduce会利用该函数不断聚合流中的元素。
Flux.just(1, 2, 3, 4)
.reduce(0, (total, element) -> total + element)
.subscribe(System.out::println);
上例中reduce将元素进行求和,最终得到结果10。
scan
scan与reduce类似,不同的是它会发出流中的每一步聚合结果,而reduce只返回最终结果。
Flux.just(1, 2, 3, 4)
.scan(0, (total, element) -> total + element)
.subscribe(System.out::println);
上例中scan会打印出1、3、6、10。
以上介绍了Reactor中一些常用的转换操作符。转换操作符可以将数据流中的元素进行转换、组合、分组、聚合等处理,是构建复杂流水线的基础。后续几章会继续介绍其他种类的操作符。