searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

JAVA Reactor系列(三)—— 转换操作符

2023-07-27 09:44:33
46
0

转换操作符是Reactor中最常用的一类操作符,它可以将数据流中的元素进行转换或组合生成新的元素。本章将详细介绍Reactor中的几个典型转换操作符。

map


map是最常见的转换操作符,它可以将数据流中的每个元素映射成一个新的元素。map接受一个Function函数作为参数,将原元素映射为新元素:

Flux.just(1, 2, 3)
    .map(i -> i * 2) 
    .subscribe(System.out::println);

上例中,map操作将每个元素乘以2。

map支持异步转换,如果Function返回Mono或Flux,则map会对其进行订阅并将结果展开:

 
Mono.just("hello")
    .map(word -> Mono.just(word.toUpperCase()))
    .subscribe(System.out::println);
 
 

上例中,map中的Function返回了一个Mono,map会订阅该Mono并将其结果展开,因此打印出"HELLO"。

 

flatMap


flatMap与map类似,不同的是其mapping function的返回值必须是Mono或Flux,flatMap会将其展开为数据流。这可以用来实现各种嵌套结构的展开。

 
Flux.just("A", "B", "C")
    .flatMap(letter -> Flux.just(letter, letter.toLowerCase())) 
    .subscribe(System.out::println);  
 
 

上例中,flatMap将每个元素转换为一个包含两个元素的Flux,并展开这个Flux,因此结果是A a B b C c。

flatMap的mapping function如果返回Flux或Mono,则flatMap会等待它完成再继续处理数据流。这也意味着flatMap内部是异步的。

concatMap


concatMap与flatMap类似,不同的是它要求mapping function返回的Flux严格按顺序执行,前一个Flux完成后才会执行下一个Flux。

 
Flux.just("A", "B", "C")
    .concatMap(letter -> Flux.just(letter, letter.toLowerCase())
                              .delayElements(Duration.ofMillis(100)))
    .subscribe(System.out::println);
 
 

上例中,每个Flux之间有100ms的延迟,因此结果会严格按A a B b C c的顺序打印。

 

switchMap


switchMap与concatMap类似,不同的是它会取消前一个内部Flux的订阅,当一个新的内部Flux到达时,会停止处理前一个,直接开始处理新的。

 
Flux.just("A", "B", "C")
    .switchMap(letter -> Flux.just(letter, letter.toLowerCase())
                             .delayElements(Duration.ofMillis(100)))
    .subscribe(System.out::println);   
 
 

上例中,当B到达时,会停止处理A的Flux,直接开始处理B,所以只会打印B b C c。

switchMap适合用在需要取消前一个异步操作的场景。

groupBy


groupBy可以将数据流中的元素按关键字分组,返回一个Flux<GroupedFlux>,每个GroupedFlux代表一个分组,可以继续对每个分组进行转换操作。

 
Flux.just("A", "B", "C", "D")
    .groupBy(s -> s.toLowerCase().charAt(0)) 
    .flatMap(gf -> gf.map(s -> s.toUpperCase()))
    .subscribe(System.out::println);
 
 

上例中,先按第一个字符分组,然后对每个分组转换为大写。结果是A C B D。

collectList


collectList可以收集数据流中的所有元素到一个List中,最后返回一个Mono<List>。

 
Flux.just("A", "B", "C")
    .collectList()
    .subscribe(list -> {
        list.forEach(System.out::println); 
    });
 
 

上例收集了所有元素到list中,然后遍历打印。

collectMap


collectMap与collectList类似,不同的是它将数据收集到一个Map中,key可以通过参数指定。

 
 
Flux.just("A", "B", "C")
    .collectMap(s -> s.toLowerCase())
    .subscribe(map -> {
        map.forEach((k, v) -> {
            System.out.println(k + ":" + v);
        });
    }); 
 

上例中,key为每个元素的小写形式。结果打印出a:A、b:B、c:C。

buffer


buffer可以将数据流中的元素缓冲到List中,每当列表达到指定大小时就发出这个List。它可以实现流的批处理。

 
 Flux.just("A", "B", "C", "D", "E")
    .buffer(3) 
    .subscribe(list -> {
        list.forEach(System.out::println);
    });
 
 

上例中,buffer大小为3。结果会打印出两次,一次是"A" "B" "C",一次是"D" "E"。

window


window与buffer类似,不同的是它发出的不是List而是Flux,每个Flux包含buffer时指定大小的元素。

 
Flux.just("A", "B", "C", "D", "E")
    .window(3)
    .flatMap(flux -> flux.collectList()) 
    .subscribe(list -> {
        list.forEach(System.out::println); 
    });
 
 

上例中,每个Flux包含3个元素,然后收集到List中打印。

reduce


reduce可以实现流的聚合操作。它接收一个初始值和一个BiFunction函数,该函数接受两个参数并返回一个新值。reduce会利用该函数不断聚合流中的元素。

 
Flux.just(1, 2, 3, 4)
    .reduce(0, (total, element) -> total + element)
    .subscribe(System.out::println);  
 

上例中reduce将元素进行求和,最终得到结果10。

 

scan


scan与reduce类似,不同的是它会发出流中的每一步聚合结果,而reduce只返回最终结果。

 
Flux.just(1, 2, 3, 4)
    .scan(0, (total, element) -> total + element) 
    .subscribe(System.out::println);
 

上例中scan会打印出1、3、6、10。

 

以上介绍了Reactor中一些常用的转换操作符。转换操作符可以将数据流中的元素进行转换、组合、分组、聚合等处理,是构建复杂流水线的基础。后续几章会继续介绍其他种类的操作符。

0条评论
0 / 1000
yurch
7文章数
0粉丝数
yurch
7 文章 | 0 粉丝
原创

JAVA Reactor系列(三)—— 转换操作符

2023-07-27 09:44:33
46
0

转换操作符是Reactor中最常用的一类操作符,它可以将数据流中的元素进行转换或组合生成新的元素。本章将详细介绍Reactor中的几个典型转换操作符。

map


map是最常见的转换操作符,它可以将数据流中的每个元素映射成一个新的元素。map接受一个Function函数作为参数,将原元素映射为新元素:

Flux.just(1, 2, 3)
    .map(i -> i * 2) 
    .subscribe(System.out::println);

上例中,map操作将每个元素乘以2。

map支持异步转换,如果Function返回Mono或Flux,则map会对其进行订阅并将结果展开:

 
Mono.just("hello")
    .map(word -> Mono.just(word.toUpperCase()))
    .subscribe(System.out::println);
 
 

上例中,map中的Function返回了一个Mono,map会订阅该Mono并将其结果展开,因此打印出"HELLO"。

 

flatMap


flatMap与map类似,不同的是其mapping function的返回值必须是Mono或Flux,flatMap会将其展开为数据流。这可以用来实现各种嵌套结构的展开。

 
Flux.just("A", "B", "C")
    .flatMap(letter -> Flux.just(letter, letter.toLowerCase())) 
    .subscribe(System.out::println);  
 
 

上例中,flatMap将每个元素转换为一个包含两个元素的Flux,并展开这个Flux,因此结果是A a B b C c。

flatMap的mapping function如果返回Flux或Mono,则flatMap会等待它完成再继续处理数据流。这也意味着flatMap内部是异步的。

concatMap


concatMap与flatMap类似,不同的是它要求mapping function返回的Flux严格按顺序执行,前一个Flux完成后才会执行下一个Flux。

 
Flux.just("A", "B", "C")
    .concatMap(letter -> Flux.just(letter, letter.toLowerCase())
                              .delayElements(Duration.ofMillis(100)))
    .subscribe(System.out::println);
 
 

上例中,每个Flux之间有100ms的延迟,因此结果会严格按A a B b C c的顺序打印。

 

switchMap


switchMap与concatMap类似,不同的是它会取消前一个内部Flux的订阅,当一个新的内部Flux到达时,会停止处理前一个,直接开始处理新的。

 
Flux.just("A", "B", "C")
    .switchMap(letter -> Flux.just(letter, letter.toLowerCase())
                             .delayElements(Duration.ofMillis(100)))
    .subscribe(System.out::println);   
 
 

上例中,当B到达时,会停止处理A的Flux,直接开始处理B,所以只会打印B b C c。

switchMap适合用在需要取消前一个异步操作的场景。

groupBy


groupBy可以将数据流中的元素按关键字分组,返回一个Flux<GroupedFlux>,每个GroupedFlux代表一个分组,可以继续对每个分组进行转换操作。

 
Flux.just("A", "B", "C", "D")
    .groupBy(s -> s.toLowerCase().charAt(0)) 
    .flatMap(gf -> gf.map(s -> s.toUpperCase()))
    .subscribe(System.out::println);
 
 

上例中,先按第一个字符分组,然后对每个分组转换为大写。结果是A C B D。

collectList


collectList可以收集数据流中的所有元素到一个List中,最后返回一个Mono<List>。

 
Flux.just("A", "B", "C")
    .collectList()
    .subscribe(list -> {
        list.forEach(System.out::println); 
    });
 
 

上例收集了所有元素到list中,然后遍历打印。

collectMap


collectMap与collectList类似,不同的是它将数据收集到一个Map中,key可以通过参数指定。

 
 
Flux.just("A", "B", "C")
    .collectMap(s -> s.toLowerCase())
    .subscribe(map -> {
        map.forEach((k, v) -> {
            System.out.println(k + ":" + v);
        });
    }); 
 

上例中,key为每个元素的小写形式。结果打印出a:A、b:B、c:C。

buffer


buffer可以将数据流中的元素缓冲到List中,每当列表达到指定大小时就发出这个List。它可以实现流的批处理。

 
 Flux.just("A", "B", "C", "D", "E")
    .buffer(3) 
    .subscribe(list -> {
        list.forEach(System.out::println);
    });
 
 

上例中,buffer大小为3。结果会打印出两次,一次是"A" "B" "C",一次是"D" "E"。

window


window与buffer类似,不同的是它发出的不是List而是Flux,每个Flux包含buffer时指定大小的元素。

 
Flux.just("A", "B", "C", "D", "E")
    .window(3)
    .flatMap(flux -> flux.collectList()) 
    .subscribe(list -> {
        list.forEach(System.out::println); 
    });
 
 

上例中,每个Flux包含3个元素,然后收集到List中打印。

reduce


reduce可以实现流的聚合操作。它接收一个初始值和一个BiFunction函数,该函数接受两个参数并返回一个新值。reduce会利用该函数不断聚合流中的元素。

 
Flux.just(1, 2, 3, 4)
    .reduce(0, (total, element) -> total + element)
    .subscribe(System.out::println);  
 

上例中reduce将元素进行求和,最终得到结果10。

 

scan


scan与reduce类似,不同的是它会发出流中的每一步聚合结果,而reduce只返回最终结果。

 
Flux.just(1, 2, 3, 4)
    .scan(0, (total, element) -> total + element) 
    .subscribe(System.out::println);
 

上例中scan会打印出1、3、6、10。

 

以上介绍了Reactor中一些常用的转换操作符。转换操作符可以将数据流中的元素进行转换、组合、分组、聚合等处理,是构建复杂流水线的基础。后续几章会继续介绍其他种类的操作符。

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0