1.引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2.Flux 和 Mono
Flux 和 Mono 是 Reactor 中的两个基本概念。Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用。Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。Flux 和 Mono 之间可以进行转换。对一个 Flux 序列进行计数操作,得到的结果是一个 Mono<Long>对象。把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。
可以简单理解
Mono:一个object 对象
Flux:List 集合
3.创建Flux
// just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
Flux.just("hahaha","heheha").subscribe(System.out::println);
// fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
Flux.fromArray(new Integer[] {1,2,3}).subscribe(System.out::println);
// empty():创建一个不包含任何元素,只发布结束消息的序列。
Flux.empty().subscribe(System.out::println);
// error(Throwable error):创建一个只包含错误消息的序列。
Flux.error(new RuntimeException()).subscribe(System.out::println);
// never():创建一个不包含任何消息通知的序列。
Flux.never().subscribe(System.out::println);
// range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
Flux.range(1, 10).subscribe(System.out::println);
// interval(Duration period)和 interval(Duration delay, Duration period):
// 创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。
// 除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
// 如果发现没有输出,因为这个方法是异步的,可能是 还没有到输出的时间,主线程就已经关闭了
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
// intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,
// 只不过该方法通过毫秒数来指定时间间隔和延迟时间。
// 使用 generate() 方法生成 Flux 序列
Flux.generate(e -> {
e.next("1111");
e.complete();
}).subscribe(System.out::println);
final Random random = new Random();
Flux.generate(HashMap::new,(map, sink) -> {
int i = random.nextInt(100);
map.put(i,i);
sink.next(i);
if(map.size() == 10){
// 执行这个方法代表着结束流的生成
sink.complete();
}
return map;
}).subscribe(System.out::println);
// 使用create() 方法生成 Flux 序列
Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
sink.next(i);
}
sink.complete();
}).subscribe(System.out::println);
4.创建Mono
// fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():
// 分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
Mono.fromSupplier(() -> "hello world").subscribe(System.out::println);
// delay(Duration duration)和 delayMillis(long duration):
// 创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
Mono.delay(Duration.ofSeconds(1)).subscribe(System.out::println);
// 每太理解这个方法有什么作用
// ignoreElements(Publisher<T> source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
Mono<String> originalMono = Mono.just("Hello, World!");
// 直接在 originalMono 上调用 ignoreElements()
Mono<String> ignoredMono = Mono.ignoreElements(originalMono);
ignoredMono.subscribe(
System.out::println,
error -> System.out.println("Error: " + error),
() -> System.out.println("Completed")
);
// create() 方法
Mono.create(e -> e.success(111)).subscribe(System.out::println);