深入探究Project Reactor

一、Project Reactor 简介


// 以下是 import 部分
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;

public class ReactorDemo {
    public static void main(String[] args) throws InterruptedException {
        Flux flux = Flux.just("Hello", "World", "Reactor");
        flux.subscribe(System.out::println);
    }
}

Project Reactor 是一个基于 Reactive Stream API 之上的 reactive 库。Reactive Stream 是一个规范,用于通讯无阻塞背压异步流的交互。Project Reactor 实现了 Reactive Stream API 规范,同时针对 Java 8 的特性也进行了进一步的扩展,支持 Reactor 的 Mono 和 Flux API,让使用者能够方便地使用 Java 8 的 Stream 相关 API 进行数据处理。

Reactor 的核心构建模块为 Mono 和 Flux,该模块提供了类似于Java Stream 的编程模型,支持生成、处理序列数据。其中 Flux 表示的是包含 0 到 N 个数据元素的异步序列,而 Mono 则表示包含 0 或者 1 个数据元素的异步序列,也就是可以返回数据也可以返回值为空,同时在底层实现上,这两个类也提供了流式操作 API,例如:map、filter、reduce 等。

以下是 Flux 的示例代码:


Flux.just("Hello", "World", "Reactor")
    .map(s -> s.concat(", "))
    .zipWith(Flux.range(1, Integer.MAX_VALUE), (s, index) -> index + ". " + s)
    .subscribe(System.out::print);
// Output: 1. Hello, 2. World, 3. Reactor, 

在上述代码中,我们先通过 Flux.just 初始化了一个有三个元素的 Flux 对象,接着使用 map 进行字符串拼接,通过 zipWith 将两个 Flux 对象先后对应地组合在一起,最后通过 subscribe 方法将结果输出。

二、Project Reactor 编程模型

1. Flux 模型

Flux 表示包含 0 到 N 个数据元素的异步序列,可以用于推送数据到一个订阅者流中。Flux 支持两种方式的数据推送:订阅和异步执行。当数据源被订阅时,Flux 将按照订阅者指定的方式推送数据。当没有订阅者时,数据源不会进行推送。异步执行时,Flux 采用 Reactor 提供的 Schedulers 线程调度进行异步处理。

以下是 Flux 的基本用法:


// 以下是 import 部分
import reactor.core.publisher.Flux;

public class FluxDemo {
    public static void main(String[] args) {
        Flux.just("Hello", "World", "Reactor")
            .subscribe(System.out::println);
    }
}

这段代码演示了如何在控制台打印 Flux 数据流中的元素。在 subscribe 方法执行后,Flux 将立即开始推送在 just 方法中指定的数据。

2. Mono 模型

Mono 则表示一个包含 0 或 1 个数据元素的异步序列,同样也可以用于推送数据到一个订阅者流中。它和 Flux 唯一的区别在于数据元素的数量。

以下是 Mono 的基本用法:


// 以下是 import 部分
import reactor.core.publisher.Mono;

public class MonoDemo {
    public static void main(String[] args) {
        Mono.just("Hello Reactor")
            .subscribe(System.out::println);
    }
}

这段代码演示了如何在控制台打印 Mono 数据流中的元素。在 subscribe 方法执行后,Mono 将立即开始推送在 just 方法中指定的数据。

三、Project Reactor 批处理

当需要对数据流进行批处理时,可以使用 buffer 操作符将多个元素合并到一个 List 中,形成一个批次,然后把这个 List 推给下游流。如下所示:


// 以下是 import 部分
import reactor.core.publisher.Flux;

public class BufferDemo {
    public static void main(String[] args) {
        Flux.range(1, 10)
            .buffer(3)
            .subscribe(System.out::println);
    }
}

这个例子的中 buffer(3) 操作符创建了一个可以包含 3 个元素的 List。Flux 已经发出了 10 个元素,因此在推送完所有元素之前,将生成三个 List。输出如下:


[1, 2, 3]
[4, 5, 6]
[7, 8, 9]
[10]

四、Project Reactor 选择线程

Reactor 提供了众多的线程池支持,以便于让开发者可以自定义线程池,通过配置不同的线程池来实现多线程编程。以下是官网提供的线程选择策略,针对不同的场景可以选择不同的策略:

  • boundedElastic:一个支持生命周期的线程池(回收线程),默认可以使用 Integer.MAX_VALUE 个线程,其中每个线程都是预定义的,以保证线程可以在几纳秒内就可以被重用。此线程池适用于执行计算密集型任务,如单独线程的映射,同步 IO 或 CPU 绑定的阻塞。此线程池不支持任务队列,每次提交任务都会立即执行。
  • elastic:使用无界队列的线程池,每次提交任务都会创建一个新的线程。此线程池适用于执行 CPU 密集型任务,因为这样可以保证线程不会被阻塞。
  • single:使用一个线程处理任务,适用于执行多个任务中的串行计算,并使得任务在相同的线程上运行。此线程池适用于执行串行计算,如 Stream -> Stream。
  • immediate:直接在订阅线程上执行任务。

以下是使用 elastic 线程池的代码示例:


// 以下是 import 部分
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class SchedulerDemo {
    public static void main(String[] args) {
        Flux.just("Hello", "World", "Reactor")
            .publishOn(Schedulers.elastic())
            .map(String::toUpperCase)
            .subscribe(System.out::println);
    }
}

在上述示例中,通过 publishOn 方法选择 Schedulers.elastic() 线程池,使用其线程池进行数据处理,然后通过 map 方法将数据进行处理、转化为大写,最后输出结果。

五、Project Reactor 反应式API

Project Reactor 提供了 Flux 和 Mono 类,这些类提供了一组反应式的 API,用于处理异步数据流,并提供了相关的方法进行数据处理。

以下代码示例使用反应式 API 计算 Stream 数据的平均值:


// 以下是 import 部分
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.stream.Stream;

public class ReactiveAPIDemo {
    public static void main(String[] args) {
        Flux intervalFlux = Flux.interval(Duration.ofSeconds(1));
        Stream inputStream = Stream.of(5.0f, 6.0f, 7.0f);
        Flux.fromStream(inputStream)
                .zipWith(intervalFlux, (i, t) -> i)
                .buffer(3)
                .map(buf -> {
                    float sum = buf.stream().reduce(Float::sum).orElse(0f);
                    return sum / buf.size();
                })
                .subscribe(System.out::println);
    }
}

在上面的代码示例中我们首先创建了自动发出递增长整数序列的 Flux,接着使用 zipWith 操作符将其和 Stream 数据进行组合,每 3 个组成一组。最后,我们使用 map 函数计算组的平均值,并将组对象返回。最终输出结果如下:


6.0

六、结语

通过本文介绍,我们已经学习了 Project Reactor 的核心构建模块 Flux 和 Mono 的使用方法,以及 Project Reactor 的编程模型和批处理。通过调整选择线程池的位置可以控制代码的并发程度,提高代码的处理效率。同时通过之前的示例我们也学习了反应式 API 的使用,我们可以在项目中通过这个 API 实现多线程异步计算,优化项目性能,提高代码质量和可读性。

原创文章,作者:小蓝,如若转载,请注明出处:https://www.506064.com/n/228713.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
小蓝小蓝
上一篇 2024-12-10 12:08
下一篇 2024-12-10 12:08

相关推荐

  • 深入解析Vue3 defineExpose

    Vue 3在开发过程中引入了新的API `defineExpose`。在以前的版本中,我们经常使用 `$attrs` 和` $listeners` 实现父组件与子组件之间的通信,但…

    编程 2025-04-25
  • 深入理解byte转int

    一、字节与比特 在讨论byte转int之前,我们需要了解字节和比特的概念。字节是计算机存储单位的一种,通常表示8个比特(bit),即1字节=8比特。比特是计算机中最小的数据单位,是…

    编程 2025-04-25
  • 深入理解Flutter StreamBuilder

    一、什么是Flutter StreamBuilder? Flutter StreamBuilder是Flutter框架中的一个内置小部件,它可以监测数据流(Stream)中数据的变…

    编程 2025-04-25
  • 深入探讨OpenCV版本

    OpenCV是一个用于计算机视觉应用程序的开源库。它是由英特尔公司创建的,现已由Willow Garage管理。OpenCV旨在提供一个易于使用的计算机视觉和机器学习基础架构,以实…

    编程 2025-04-25
  • 深入了解scala-maven-plugin

    一、简介 Scala-maven-plugin 是一个创造和管理 Scala 项目的maven插件,它可以自动生成基本项目结构、依赖配置、Scala文件等。使用它可以使我们专注于代…

    编程 2025-04-25
  • 深入了解LaTeX的脚注(latexfootnote)

    一、基本介绍 LaTeX作为一种排版软件,具有各种各样的功能,其中脚注(footnote)是一个十分重要的功能之一。在LaTeX中,脚注是用命令latexfootnote来实现的。…

    编程 2025-04-25
  • 深入理解Python字符串r

    一、r字符串的基本概念 r字符串(raw字符串)是指在Python中,以字母r为前缀的字符串。r字符串中的反斜杠(\)不会被转义,而是被当作普通字符处理,这使得r字符串可以非常方便…

    编程 2025-04-25
  • 深入了解Python包

    一、包的概念 Python中一个程序就是一个模块,而一个模块可以引入另一个模块,这样就形成了包。包就是有多个模块组成的一个大模块,也可以看做是一个文件夹。包可以有效地组织代码和数据…

    编程 2025-04-25
  • 深入剖析MapStruct未生成实现类问题

    一、MapStruct简介 MapStruct是一个Java bean映射器,它通过注解和代码生成来在Java bean之间转换成本类代码,实现类型安全,简单而不失灵活。 作为一个…

    编程 2025-04-25
  • 深入探讨冯诺依曼原理

    一、原理概述 冯诺依曼原理,又称“存储程序控制原理”,是指计算机的程序和数据都存储在同一个存储器中,并且通过一个统一的总线来传输数据。这个原理的提出,是计算机科学发展中的重大进展,…

    编程 2025-04-25

发表回复

登录后才能评论