searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

JAVA Reactor系列(二)—— 核心概念和组件

2023-04-28 10:17:46
51
0

在本部分中,我们将深入了解 Reactor 框架的核心概念, Flux 和 Mono,Scheduler以及常用操作符.

Flux 和 Mono

Flux和Mono是Java Reactor框架中最基本的两个概念。Flux表示0到n个元素的异步序列,而Mono表示一个异步操作的结果。
Flux的特点是它可以发出0到n个元素,并且可以不断地发出元素。因此,它可以被看作是一个流,而不是一个固定大小的集合。Flux中的元素可以是任何类型,如数字、字符串、对象等等。在创建Flux时,可以使用许多不同的方式,例如从集合、数组、文件、网络或任何其他数据源中创建Flux。
Mono表示一个异步操作的结果,它可以包含一个元素或没有元素。Mono可以看作是Flux的一种特殊情况,它只包含一个元素。与Flux一样,Mono也可以从多种不同的数据源中创建,例如网络、文件、数据库等。


如上篇文章中提到的, Reactor框架中的Flux和Mono都是非阻塞的,并且使用异步流的方式来处理数据。在使用Flux和Mono时,需要注意它们的订阅关系。只有在订阅时,才会开始实际的数据处理。当Flux或Mono发送所有元素时,订阅将自动取消。

以下代码给出了Flux创建并订阅的流程, 代码中使用了subscribeOn切换线程,以及基本操作符map, 接下来我们详细介绍这些操作的含义。

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class FluxExample {

    public static void main(String[] args) throws InterruptedException {
        Flux.just("A", "B", "C", "D", "E")
                .log() // 打印日志
                .subscribeOn(Schedulers.parallel()) // 使用并行调度器
                .map(s -> s.toLowerCase()) // 转换为小写
                .subscribe(System.out::println); // 输出结果

        Thread.sleep(1000); // 等待 Flux 执行完成
    }
}

 

Scheduler

Scheduler是Reactor中的一个重要组件,它负责处理异步任务的调度和执行。在Reactor中,Scheduler通常用来控制操作符和订阅操作在哪个线程上执行,从而实现异步编程。
Reactor提供了一些常用的Scheduler实现,比如Schedulers.immediate()、Schedulers.single()、Schedulers.elastic()、Schedulers.parallel()等。不同的Scheduler实现有不同的调度策略和线程池配置,可以根据具体的场景选择不同的实现。
Schedulers.immediate():立即执行当前任务,相当于同步调用。该调度器通常不会被直接使用,而是用作其他调度器的 fallback 选项。
Schedulers.single():使用单个线程来执行任务,如果当前线程正在执行任务,则将任务放入任务队列中等待执行。
Schedulers.elastic():使用一个弹性线程池来执行任务。该调度器会根据需要创建新的线程,最多可以创建无限个线程。空闲的线程会在60秒内自动销毁。
Schedulers.parallel():使用一个固定大小的线程池来执行任务。该调度器可以在初始化时指定线程池的大小,默认为CPU核心数。

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class SchedulerExample {
    public static void main(String[] args) throws InterruptedException {
        Flux.just(1, 2, 3, 4, 5)
                .map(i -> i * 2) // 使用当前线程执行 map 操作
                .publishOn(Schedulers.elastic()) // 使用弹性线程池执行后续操作
                .filter(i -> i > 5)
                .subscribeOn(Schedulers.single()) // 使用单线程池执行订阅操作
                .subscribe(System.out::println);

        Thread.sleep(1000); // 等待 Flux 执行完成
    }
}

 

在上面的代码中,首先使用当前线程执行map操作,然后使用publishOn方法切换到弹性线程池执行后续操作,最后使用subscribeOn方法将订阅操作切换到单线程池执行。

在 Java Reactor 中切换线程是非常重要的,通常使用Java Reactor的网络通信框架都只使用很少的主线程, 需要时刻注意防止阻塞主线程,如果一个操作需要花费大量时间来完成,而不使用线程切换,那么它就会在主线程上执行,导致主线程被阻塞,从而导致程序整体无法响应请求。

subscribeOn与publishOn是Java Reactor中切换线程的两种操作符。subscribeOn用于指定在哪个Scheduler上执行整个数据流的订阅操作,这意味着整个数据流的生产者(例如Flux或Mono)都会在指定的Scheduler上执行。一般情况下,subscribeOn只需要调用一次即可,通常放在操作符链的最前面。
例如,以下代码中使用subscribeOn将整个数据流的订阅操作放在了Schedulers.parallel()上执行:

Flux.range(1, 10)
    .log()
    .subscribeOn(Schedulers.parallel())
    .subscribe(System.out::println);


在上述代码中,当我们订阅这个数据流时,数据的产生和处理都会在Schedulers.parallel()上执行,即使用并行的线程池。
而publishOn则是用于指定在数据流的处理过程中,在哪个Scheduler上执行后续的操作。与subscribeOn不同的是,publishOn可以被多次调用,用于针对每个操作符指定不同的Scheduler。
例如,以下代码中使用publishOn将map操作放在了Schedulers.single()上执行:

Flux.range(1, 10)
    .log()
    .publishOn(Schedulers.single())
    .map(i -> i * 2)
    .subscribe(System.out::println);


在上述代码中,数据的产生依然在默认的线程上执行,但是在map操作之后,后续的操作将会在Schedulers.single()上执行,即使用单线程的线程池。
需要注意的是,如果一个数据流中既使用了subscribeOn又使用了publishOn,则后者会覆盖前者。因此,在操作符链中使用subscribeOn和publishOn时需要注意顺序和位置的影响。

 

map操作符

map 操作符是Java Reactor中最基础和常用的操作符之一,它可以将一个 Flux 或 Mono 中的每个元素映射为一个新的元素,然后再将这些新元素发布到一个新的 Flux 或Mono中,例如:
Flux.just("A", "B", "C")
    .map(s -> s.toLowerCase())
    .subscribe(System.out::println);
在这个例子中,Flux.just("A", "B", "C") 创建了一个包含 "A", "B", "C" 三个元素的 Flux,map 操作符将这三个元素分别转换为小写字母并返回一个新的 Flux,最后 subscribe 方法订阅这个新的 Flux 并输出结果。
map 操作符接收一个 Function 函数作为参数,该函数接收一个元素并返回一个新的元素,例如在上面的例子中,s -> s.toLowerCase() 接收一个字符串元素并返回一个小写字母的字符串。

现在回到第一节中的代码

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class FluxExample {

    public static void main(String[] args) throws InterruptedException {
        Flux.just("A", "B", "C", "D", "E")
                .log() // 打印日志
                .subscribeOn(Schedulers.parallel()) // 使用并行调度器
                .map(s -> s.toLowerCase()) // 转换为小写
                .subscribe(System.out::println); // 输出结果

        Thread.sleep(1000); // 等待 Flux 执行完成
    }
}

我们创建了一个 Flux 对象,并使用 just() 方法添加了一些元素。然后,使用 subscribeOn() 方法指定了 Flux 执行的调度器为并行调度器。这样可以在多个线程上执行 Flux 的元素。接着,使用 map() 方法将元素转换为小写,并使用 subscribe() 方法订阅 Flux 并输出结果。最后使用 Thread.sleep() 方法等待 Flux 执行完成。

过程中,使用 log() 方法打印了 Flux 执行过程中的一些日志信息。

 

这段代码会输出如下结果:

[parallel-1] INFO reactor.Flux.Just.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[parallel-1] INFO reactor.Flux.Just.1 - | request(unbounded)
[parallel-1] INFO reactor.Flux.Just.1 - | onNext(A)
a
[parallel-1] INFO reactor.Flux.Just.1 - | onNext(B)
b
[parallel-1] INFO reactor.Flux.Just.1 - | onNext(C)
c
[parallel-1] INFO reactor.Flux.Just.1 - | onNext(D)
d
[parallel-1] INFO reactor.Flux.Just.1 - | onNext(E)
e
[parallel-1] INFO reactor.Flux.Just.1 - | onComplete()


其中,每行日志的含义为:

| onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription):表示订阅开始,参数为订阅类型及订阅的对象。
| request(unbounded):表示请求下一个元素,参数为请求的数量,这里是不限量请求。
| onNext(A):表示下一个元素为 A。
a:表示对应元素经过小写转换后的结果。
| onComplete():表示订阅完成,没有更多的元素可以发出了

Java Reactor中还有许多基础操作符帮助我们方便地实现各类操作, 大致可以分为转换操作符, 过滤操作符, 组合操作符, 辅助操作符等, map就是最典型的转换操作符。 在下一章中我们会展开讲解更多的操作符。

 

0条评论
0 / 1000
yurch
7文章数
0粉丝数
yurch
7 文章 | 0 粉丝
原创

JAVA Reactor系列(二)—— 核心概念和组件

2023-04-28 10:17:46
51
0

在本部分中,我们将深入了解 Reactor 框架的核心概念, Flux 和 Mono,Scheduler以及常用操作符.

Flux 和 Mono

Flux和Mono是Java Reactor框架中最基本的两个概念。Flux表示0到n个元素的异步序列,而Mono表示一个异步操作的结果。
Flux的特点是它可以发出0到n个元素,并且可以不断地发出元素。因此,它可以被看作是一个流,而不是一个固定大小的集合。Flux中的元素可以是任何类型,如数字、字符串、对象等等。在创建Flux时,可以使用许多不同的方式,例如从集合、数组、文件、网络或任何其他数据源中创建Flux。
Mono表示一个异步操作的结果,它可以包含一个元素或没有元素。Mono可以看作是Flux的一种特殊情况,它只包含一个元素。与Flux一样,Mono也可以从多种不同的数据源中创建,例如网络、文件、数据库等。


如上篇文章中提到的, Reactor框架中的Flux和Mono都是非阻塞的,并且使用异步流的方式来处理数据。在使用Flux和Mono时,需要注意它们的订阅关系。只有在订阅时,才会开始实际的数据处理。当Flux或Mono发送所有元素时,订阅将自动取消。

以下代码给出了Flux创建并订阅的流程, 代码中使用了subscribeOn切换线程,以及基本操作符map, 接下来我们详细介绍这些操作的含义。

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class FluxExample {

    public static void main(String[] args) throws InterruptedException {
        Flux.just("A", "B", "C", "D", "E")
                .log() // 打印日志
                .subscribeOn(Schedulers.parallel()) // 使用并行调度器
                .map(s -> s.toLowerCase()) // 转换为小写
                .subscribe(System.out::println); // 输出结果

        Thread.sleep(1000); // 等待 Flux 执行完成
    }
}

 

Scheduler

Scheduler是Reactor中的一个重要组件,它负责处理异步任务的调度和执行。在Reactor中,Scheduler通常用来控制操作符和订阅操作在哪个线程上执行,从而实现异步编程。
Reactor提供了一些常用的Scheduler实现,比如Schedulers.immediate()、Schedulers.single()、Schedulers.elastic()、Schedulers.parallel()等。不同的Scheduler实现有不同的调度策略和线程池配置,可以根据具体的场景选择不同的实现。
Schedulers.immediate():立即执行当前任务,相当于同步调用。该调度器通常不会被直接使用,而是用作其他调度器的 fallback 选项。
Schedulers.single():使用单个线程来执行任务,如果当前线程正在执行任务,则将任务放入任务队列中等待执行。
Schedulers.elastic():使用一个弹性线程池来执行任务。该调度器会根据需要创建新的线程,最多可以创建无限个线程。空闲的线程会在60秒内自动销毁。
Schedulers.parallel():使用一个固定大小的线程池来执行任务。该调度器可以在初始化时指定线程池的大小,默认为CPU核心数。

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class SchedulerExample {
    public static void main(String[] args) throws InterruptedException {
        Flux.just(1, 2, 3, 4, 5)
                .map(i -> i * 2) // 使用当前线程执行 map 操作
                .publishOn(Schedulers.elastic()) // 使用弹性线程池执行后续操作
                .filter(i -> i > 5)
                .subscribeOn(Schedulers.single()) // 使用单线程池执行订阅操作
                .subscribe(System.out::println);

        Thread.sleep(1000); // 等待 Flux 执行完成
    }
}

 

在上面的代码中,首先使用当前线程执行map操作,然后使用publishOn方法切换到弹性线程池执行后续操作,最后使用subscribeOn方法将订阅操作切换到单线程池执行。

在 Java Reactor 中切换线程是非常重要的,通常使用Java Reactor的网络通信框架都只使用很少的主线程, 需要时刻注意防止阻塞主线程,如果一个操作需要花费大量时间来完成,而不使用线程切换,那么它就会在主线程上执行,导致主线程被阻塞,从而导致程序整体无法响应请求。

subscribeOn与publishOn是Java Reactor中切换线程的两种操作符。subscribeOn用于指定在哪个Scheduler上执行整个数据流的订阅操作,这意味着整个数据流的生产者(例如Flux或Mono)都会在指定的Scheduler上执行。一般情况下,subscribeOn只需要调用一次即可,通常放在操作符链的最前面。
例如,以下代码中使用subscribeOn将整个数据流的订阅操作放在了Schedulers.parallel()上执行:

Flux.range(1, 10)
    .log()
    .subscribeOn(Schedulers.parallel())
    .subscribe(System.out::println);


在上述代码中,当我们订阅这个数据流时,数据的产生和处理都会在Schedulers.parallel()上执行,即使用并行的线程池。
而publishOn则是用于指定在数据流的处理过程中,在哪个Scheduler上执行后续的操作。与subscribeOn不同的是,publishOn可以被多次调用,用于针对每个操作符指定不同的Scheduler。
例如,以下代码中使用publishOn将map操作放在了Schedulers.single()上执行:

Flux.range(1, 10)
    .log()
    .publishOn(Schedulers.single())
    .map(i -> i * 2)
    .subscribe(System.out::println);


在上述代码中,数据的产生依然在默认的线程上执行,但是在map操作之后,后续的操作将会在Schedulers.single()上执行,即使用单线程的线程池。
需要注意的是,如果一个数据流中既使用了subscribeOn又使用了publishOn,则后者会覆盖前者。因此,在操作符链中使用subscribeOn和publishOn时需要注意顺序和位置的影响。

 

map操作符

map 操作符是Java Reactor中最基础和常用的操作符之一,它可以将一个 Flux 或 Mono 中的每个元素映射为一个新的元素,然后再将这些新元素发布到一个新的 Flux 或Mono中,例如:
Flux.just("A", "B", "C")
    .map(s -> s.toLowerCase())
    .subscribe(System.out::println);
在这个例子中,Flux.just("A", "B", "C") 创建了一个包含 "A", "B", "C" 三个元素的 Flux,map 操作符将这三个元素分别转换为小写字母并返回一个新的 Flux,最后 subscribe 方法订阅这个新的 Flux 并输出结果。
map 操作符接收一个 Function 函数作为参数,该函数接收一个元素并返回一个新的元素,例如在上面的例子中,s -> s.toLowerCase() 接收一个字符串元素并返回一个小写字母的字符串。

现在回到第一节中的代码

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class FluxExample {

    public static void main(String[] args) throws InterruptedException {
        Flux.just("A", "B", "C", "D", "E")
                .log() // 打印日志
                .subscribeOn(Schedulers.parallel()) // 使用并行调度器
                .map(s -> s.toLowerCase()) // 转换为小写
                .subscribe(System.out::println); // 输出结果

        Thread.sleep(1000); // 等待 Flux 执行完成
    }
}

我们创建了一个 Flux 对象,并使用 just() 方法添加了一些元素。然后,使用 subscribeOn() 方法指定了 Flux 执行的调度器为并行调度器。这样可以在多个线程上执行 Flux 的元素。接着,使用 map() 方法将元素转换为小写,并使用 subscribe() 方法订阅 Flux 并输出结果。最后使用 Thread.sleep() 方法等待 Flux 执行完成。

过程中,使用 log() 方法打印了 Flux 执行过程中的一些日志信息。

 

这段代码会输出如下结果:

[parallel-1] INFO reactor.Flux.Just.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[parallel-1] INFO reactor.Flux.Just.1 - | request(unbounded)
[parallel-1] INFO reactor.Flux.Just.1 - | onNext(A)
a
[parallel-1] INFO reactor.Flux.Just.1 - | onNext(B)
b
[parallel-1] INFO reactor.Flux.Just.1 - | onNext(C)
c
[parallel-1] INFO reactor.Flux.Just.1 - | onNext(D)
d
[parallel-1] INFO reactor.Flux.Just.1 - | onNext(E)
e
[parallel-1] INFO reactor.Flux.Just.1 - | onComplete()


其中,每行日志的含义为:

| onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription):表示订阅开始,参数为订阅类型及订阅的对象。
| request(unbounded):表示请求下一个元素,参数为请求的数量,这里是不限量请求。
| onNext(A):表示下一个元素为 A。
a:表示对应元素经过小写转换后的结果。
| onComplete():表示订阅完成,没有更多的元素可以发出了

Java Reactor中还有许多基础操作符帮助我们方便地实现各类操作, 大致可以分为转换操作符, 过滤操作符, 组合操作符, 辅助操作符等, map就是最典型的转换操作符。 在下一章中我们会展开讲解更多的操作符。

 

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0