本系列文章索引《响应式Spring的道法术器》
前情提要 Reactor Operators
本节的内容来自我翻译的Reactor 3 参考文档——如何选择操作符。由于部分朋友打开github.io网速比较慢或上不去,贴出来方便大家查阅。
如果一个操作符是专属于
Flux
或Mono
的,那么会给它注明前缀。
公共的操作符没有前缀。如果一个具体的用例涉及多个操作符的组合,这里以方法调用的方式展现,
会以一个点(.)开头,并将参数置于圆括号内,比如:.methodCall(parameter)
。
1)创建一个新序列,它...
T
,我已经有了:just
Optional<T>
:Mono#justOrEmpty(Optional<T>)
null
的 T:Mono#justOrEmpty(T)
T
,且还是由 just
方法返回
Mono#fromSupplier
或用 defer
包装 just
T
,这些元素我可以明确列举出来:Flux#just(T...)
Flux#fromArray
Flux#fromIterable
Flux#range
Stream
提供给每一个订阅:Flux#fromStream(Supplier<Stream>)
Supplier<T>
:Mono#fromSupplier
Mono#fromCallable
,Mono#fromRunnable
CompletableFuture<T>
:Mono#fromFuture
empty
error
Throwable
:error(Supplier<Throwable>)
never
defer
using
Flux#generate
Flux#create
Mono#create
也是异步的,只不过只能发一个)2)对序列进行转化
我想转化一个序列:
map
cast
Flux#index
flatMap
+ 使用一个工厂方法handle
flatMap
+ 一个异步的返回类型为 Publisher
的方法Mono.empty()
Flux#flatMapSequential
(对每个元素的异步任务会立即执行,但会将结果按照原序列顺序排序)Mono#flatMapMany
我想添加一些数据元素到一个现有的序列:
Flux#startWith(T...)
Flux#concatWith(T...)
我想将 Flux
转化为集合(一下都是针对 Flux
的)
collectList
,collectSortedList
collectMap
,collectMultiMap
collect
count
reduce
scan
all
any
hasElements
hasElement
我想合并 publishers...
Flux#concat
或 .concatWith(other)
Flux#concatDelayError
Flux#mergeSequential
Flux#merge
/ .mergeWith(other)
Flux#zip
/ Flux#zipWith
Tuple2
:Mono#zipWith
Mono#zip
Mono<Void>
:Mono#and
Mono<Void>
:Mono#when
Flux#zip
Flux#combineLatest
Flux#first
,Mono#first
,mono.or<br/>(otherMono).or(thirdMono)
,`flux.or(otherFlux).or(thirdFlux)flatMap
,不过“喜新厌旧”):switchMap
switchOnNext
我想重复一个序列:repeat
Flux.interval(duration).flatMap(tick -> myExistingPublisher)
我有一个空序列,但是...
defaultIfEmpty
switchIfEmpty
我有一个序列,但是我对序列的元素值不感兴趣:ignoreElements
Mono
来表示序列已经结束:then
thenEmpty
Mono
:Mono#then(mono)
Mono#thenReturn(T)
Flux
:thenMany
我有一个 Mono 但我想延迟完成...
Mono#delayUntilOther
Mono#delayUntil(Function)
expand(Function)
expandDeep(Function)
3)“窥视”(只读)序列
再不对序列造成改变的情况下,我想:
doOnNext
Flux#doOnComplete
,Mono#doOnSuccess
doOnError
doOnCancel
doOnSubscribe
doOnRequest
doOnTerminate
(Mono的方法可能包含有结果)
doAfterTerminate
Signal
):Flux#doOnEach
doFinally
log
single
对象:doOnEach
single
对象:materialize
dematerialize
log
4)过滤序列
我想过滤一个序列
filter
filterWhen
ofType
ignoreElements
Flux#distinct
Flux#distinctUntilChanged
我只想要一部分序列:
Flux#take(long)
Flux#take(Duration)
Mono
中返回:Flux#next()
request(N)
而不是取消:Flux#limitRequest(long)
Flux#takeLast
Flux#takeUntil
(基于判断条件),Flux#takeUntilOther
(基于对 publisher 的比较)Flux#takeWhile
Flux#elementAt
.takeLast(1)
Flux#last()
Flux#last(T)
Flux#skip(long)
Flux#skip(Duration)
Flux#skipLast
Flux#skipUntil
(基于判断条件),Flux#skipUntilOther
(基于对 publisher 的比较)Flux#skipWhile
Flux#sample(Duration)
sampleFirst
Flux#sample(Publisher)
Flux#sampleTimeout
(每一个元素会触发一个 publisher,如果这个 publisher 不被下一个元素触发的 publisher 覆盖就发出这个元素)Flux#single()
Flux#single(T)
Flux#singleOrEmpty
5)错误处理
我想创建一个错误序列:error
...
Flux
:.concat(Flux.error(e))
Mono
:.then(Mono.error(e))
timeout
error(Supplier<Throwable>)
我想要类似 try/catch 的表达方式:
error
onErrorReturn
Flux
或 Mono
:onErrorResume
.onErrorMap(t -> new RuntimeException(t))
doFinally
using
工厂方法我想从错误中恢复...
onErrorReturn
Publisher
:Flux#onErrorResume
和 Mono#onErrorResume
retry
retryWhen
IllegalStateException
:Flux#onBackpressureError
Flux#onBackpressureDrop
Flux#onBackpressureLatest
Flux#onBackpressureBuffer
Flux#onBackpressureBuffer
带有策略 BufferOverflowStrategy
6) 基于时间的操作
我想将元素转换为带有时间信息的 Tuple2<Long, T>
...
elapsed
timestamp
如果元素间延迟过长则中止序列:timeout
以固定的周期发出元素:Flux#interval
在一个给定的延迟后发出 0
:static Mono.delay
.
Mono#delayElement
,Flux#delayElements
delaySubscription
7)拆分 Flux
我想将一个 Flux<T>
拆分为一个 Flux<Flux<T>>
:
window(int)
window(int, int)
window(Duration)
window(Duration, Duration)
windowTimeout(int, Duration)
windowUntil
cutBefore
变量):.windowUntil(predicate, true)
windowWhile
(不满足条件的元素会被丢弃)window(Publisher)
,windowWhen
我想将一个 Flux<T>
的元素拆分到集合...
List
:buffer(int)
buffer(int, int)
buffer(Duration)
buffer(Duration, Duration)
bufferTimeout(int, Duration)
bufferUntil(Predicate)
.bufferUntil(predicate, true)
bufferWhile(Predicate)
buffer(Publisher)
,bufferWhen
buffer(int, Supplier<C>)
Flux<T>
中具有共同特征的元素分组到子 Flux:groupBy(Function<T,K>)
(注意返回值是 Flux<GroupedFlux<K, T>>
,每一个 GroupedFlux
具有相同的 key 值 K
,可以通过 key()
方法获取)。8)回到同步的世界
我有一个 Flux<T>
,我想:
Flux#blockFirst
Flux#blockFirst(Duration)
Flux#blockLast
Flux#blockLast(Duration)
Iterable<T>
:Flux#toIterable
Stream<T>
:Flux#toStream
Mono<T>
,我想:
Mono#block
Mono#block(Duration)
CompletableFuture<T>
:Mono#toFuture
附2:Reactor 3 之选择合适的操作符——响应式Spring的道法术器
原文地址:http://blog.51cto.com/liukang/2094073