标签:介绍 基于 cti 并且 layer obs https turn 登录
目录
需求了解:
在使用 RxJava
开发的过程中,很多时候需要结合多个条件或者数据的逻辑判断,比如登录功能的表单验证,实时数据比对等。这个时候我们就需要使用 RxJava 的结合操作符来完成这一需求,Rx中提供了丰富的结合操作处理的操作方法。
可用于组合多个Observables的操作方法:
当 Observables 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据。
CombineLatest
操作符行为类似于zip
,但是只有当原始的Observable中的每一个都发射了一条数据时 zip 才发射数据。 CombineLatest
则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时, CombineLatest 使用一 个函数结合它们最近发射的数据,然后发射这个函数的返回值。
解析: combineLatest
操作符可以结合多个Observable,可以接收 2-9 个Observable对象, 在其中原始Observables的任何一个发射了一条数据时, CombineLatest 使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。此外combineLatest
操作符还有一些接收 Iterable , 数组方式的变体,以及其他指定参数combiner、bufferSize、和combineLatestDelayError方法等变体,在此就不在详细展开了,有兴趣的可以查看官方的相关API文档了解。
实例代码:
// Observables 创建
Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
Observable<Long> observable2 = Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS);
Observable<Long> observable3 = Observable.intervalRange(100, 5, 1, 1, TimeUnit.SECONDS);
// 1. combineLatest(ObservableSource, ObservableSource [支持2-9个参数]..., BiFunction)
// 结合多个Observable, 当他们其中任意一个发射了数据时,使用函数结合他们最近发射的一项数据
Observable.combineLatest(observable1, observable2, new BiFunction<Long, Long, String>() {
@Override
public String apply(Long t1, Long t2) throws Exception {
System.out.println("--> apply(1) t1 = " + t1 + ", t2 = " + t2);
if (t1 + t2 == 10) {
return "Success"; // 满足一定条件,返回指定的字符串
}
return t1 + t2 + ""; // 计算所有数据的和并转换为字符串
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String t) throws Exception {
System.out.println("----> accept combineLatest(1): " + t);
}
});
System.out.println("--------------------------------------------------------");
// 2. combineLatest(T1, T2, T3, Function)
// Observables的结合
Observable.combineLatest(observable1, observable2, observable3, new Function3<Long, Long, Long, String>() {
@Override
public String apply(Long t1, Long t2, Long t3) throws Exception {
System.out.println("--> apply(2): t1 = " + t1 + ", t2 = " + t2 + ", t3 = " + t3);
return t1 + t2 + t3 + ""; // 计算所有数据的和并转换为字符串
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String t) throws Exception {
System.out.println("--> accept(2): " + t);
}
});
输出:
--> apply(1) t1 = 1, t2 = 1
----> accept combineLatest(1): 2
--> apply(1) t1 = 2, t2 = 1
----> accept combineLatest(1): 3
--> apply(1) t1 = 3, t2 = 1
----> accept combineLatest(1): 4
--> apply(1) t1 = 3, t2 = 2
----> accept combineLatest(1): 5
--> apply(1) t1 = 4, t2 = 2
----> accept combineLatest(1): 6
--> apply(1) t1 = 4, t2 = 3
----> accept combineLatest(1): 7
--> apply(1) t1 = 5, t2 = 3
----> accept combineLatest(1): 8
--> apply(1) t1 = 5, t2 = 4
----> accept combineLatest(1): 9
--> apply(1) t1 = 5, t2 = 5
----> accept combineLatest(1): Success
--------------------------------------------------------
--> apply(2): t1 = 1, t2 = 1, t3 = 100
--> accept(2): 102
--> apply(2): t1 = 2, t2 = 1, t3 = 100
--> accept(2): 103
--> apply(2): t1 = 2, t2 = 1, t3 = 101
--> accept(2): 104
--> apply(2): t1 = 2, t2 = 2, t3 = 101
--> accept(2): 105
--> apply(2): t1 = 3, t2 = 2, t3 = 101
--> accept(2): 106
--> apply(2): t1 = 3, t2 = 2, t3 = 102
--> accept(2): 107
--> apply(2): t1 = 4, t2 = 2, t3 = 102
--> accept(2): 108
--> apply(2): t1 = 4, t2 = 2, t3 = 103
--> accept(2): 109
--> apply(2): t1 = 5, t2 = 2, t3 = 103
--> accept(2): 110
--> apply(2): t1 = 5, t2 = 3, t3 = 103
--> accept(2): 111
--> apply(2): t1 = 5, t2 = 3, t3 = 104
--> accept(2): 112
--> apply(2): t1 = 5, t2 = 4, t3 = 104
--> accept(2): 113
--> apply(2): t1 = 5, t2 = 5, t3 = 104
--> accept(2): 114
任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。
Join
操作符结合两个Observable发射的数据,基于时间窗口(你定义的针对每条数据特定的原则)选择待集合的数据项。你将这些时间窗口实现为一些Observables,它们的生命周期从任何一条Observable发射的每一条数据开始。当这个定义时间窗口的Observable发射了一条数据或者完成时,与这条数据关联的窗口也会关闭。只要这条数据的窗口是打开的,它将继续结合其它Observable发射的任何数据项。你定义一个用于结合数据的函数。
解析: join(other, leftEnd, rightEnd, resultSelector)
相关参数的解析
注意: 这是源Observable和目标Observable发射数据在任意一个基于时间窗口的有效期内才会接收到组合数据,这就意味着可能有数据丢失的情况,在其中一个已经发射完所有数据,并且没有处于时间窗口的数据情况,另一个Observable的数据发射将不会收到组合数据。
示例代码:
// Observable的创建
Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);
// 1. join(other, leftEnd, rightEnd, resultSelector)
// other: 目标组合的Observable
// leftEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是源Observable发射数据的有效期
// rightEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是目标Observable发射数据的有效期
// resultSelector: 接收源Observable和目标Observable发射的数据项, 处理后的数据返回给观察者对象
sourceObservable.join(targetObservable, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long t) throws Exception {
System.out.println("-----> t1 is emitter: " + t);
return Observable.timer(1000, TimeUnit.MILLISECONDS); // 源Observable发射数据的有效期为1000毫秒
}
}, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long t) throws Exception {
System.out.println("-----> t2 is emitter: " + t);
return Observable.timer(1000, TimeUnit.MILLISECONDS); // 目标Observable发射数据的有效期为1000毫秒
}
}, new BiFunction<Long, Long, String>() {
@Override
public String apply(Long t1, Long t2) throws Exception {
return "t1 = " + t1 + ", t2 = " + t2; // 对数据进行组合后返回和观察者
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String t) throws Exception {
System.out.println("--> accept(1): " + t);
}
});
System.in.read();
输出:
-----> t1 is emitter: 1
-----> t2 is emitter: 10
--> accept(1): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> accept(1): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> accept(1): t1 = 3, t2 = 10
-----> t2 is emitter: 11
--> accept(1): t1 = 1, t2 = 11
--> accept(1): t1 = 2, t2 = 11
--> accept(1): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> accept(1): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> accept(1): t1 = 5, t2 = 11
-----> t2 is emitter: 12
--> accept(1): t1 = 3, t2 = 12
--> accept(1): t1 = 4, t2 = 12
--> accept(1): t1 = 5, t2 = 12
-----> t2 is emitter: 13
--> accept(1): t1 = 5, t2 = 13
-----> t2 is emitter: 14 // 此时源t1中已经没有数据还处于时间窗口有效期内
groupJoin
groupJoin
操作符与 join
相同,只是参数传递有所区别。groupJoin(other, leftEnd, rightEnd, resultSelector) 中的resultSelector
可以将原始数据转换为 Observable 类型的数据发送给观察者。
示例代码:
// Observable的创建
Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);
// 2. groupJoin(other, leftEnd, rightEnd, resultSelector)
// groupJoin操作符与join相同,只是参数传递有所区别。
// resultSelector可以将原始数据转换为Observable类型的数据发送给观察者。
sourceObservable.groupJoin(targetObservable, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long t) throws Exception {
System.out.println("-----> t1 is emitter: " + t);
return Observable.timer(1000, TimeUnit.MILLISECONDS); // 源Observable发射数据的有效期为1000毫秒
}
}, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long t) throws Exception {
System.out.println("-----> t2 is emitter: " + t);
return Observable.timer(1000, TimeUnit.MILLISECONDS); // 目标Observable发射数据的有效期为1000毫秒
}
}, new BiFunction<Long, Observable<Long>, Observable<String>>() {
@Override
public Observable<String> apply(Long t1, Observable<Long> t2) throws Exception {
System.out.println("--> apply(2) combine: " + t1); // 结合操作
return t2.map(new Function<Long, String>() {
@Override
public String apply(Long t) throws Exception {
System.out.println("-----> apply(2) operation: " + t);
return "t1 = " + t1 + ", t2 = " + t;
}
});
}
}).subscribe(new Consumer<Observable<String>>() {
@Override
public void accept(Observable<String> stringObservable) throws Exception {
stringObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String t) throws Exception {
System.out.println("--> accept(2): " + t);
}
});
}
});
输出:
-----> t1 is emitter: 1
--> apply(2) combine: 1
-----> t2 is emitter: 10
-----> apply(2) operation: 10
--> accept(2): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> apply(2) combine: 2
-----> apply(2) operation: 10
--> accept(2): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> apply(2) combine: 3
-----> apply(2) operation: 10
--> accept(2): t1 = 3, t2 = 10
-----> t2 is emitter: 11
-----> apply(2) operation: 11
--> accept(2): t1 = 1, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 2, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> apply(2) combine: 4
-----> apply(2) operation: 11
--> accept(2): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> apply(2) combine: 5
-----> apply(2) operation: 11
--> accept(2): t1 = 5, t2 = 11
-----> t2 is emitter: 12
-----> apply(2) operation: 12
--> accept(2): t1 = 3, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 4, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 5, t2 = 12
-----> t2 is emitter: 13
-----> apply(2) operation: 13
--> accept(2): t1 = 5, t2 = 13
-----> t2 is emitter: 14
Javadoc: groupJoin(other, leftEnd, rightEnd, resultSelector)
合并多个Observables的发射物。
使用 Merge
操作符你可以将多个Observables的输出合并,就好像它们是一个单个的 Observable 一样。
Merge 可能会让合并的Observables发射的数据交错(有一个类似的操作符 Concat
不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物),任何一个原始Observable的 onError
通知会被立即传递给观察者,而且会终止合并后的Observable。
除了传递多个Observable给 merge ,你还可以传递一个Observable列表 List ,数组,甚至是一个发射Observable序列的Observable, merge 将合并它们的输出作为单个Observable的输出。
如果你传递一个发射Observables序列的Observable,你可以指定 merge 应该同时订阅的 Observable 的最大数量。一旦达到订阅数的限制,它将不再订阅原始Observable发射的任何其它Observable,直到某个已经订阅的Observable发射了 onCompleted 通知。
示例代码:
// 创建Observable对象
Observable<Integer> odd = Observable.just(1, 3, 5);
Observable<Integer> even = Observable.just(2, 4, 6);
Observable<Integer> big = Observable.just(188888, 688888, 888888);
// 创建list对象
List<Observable<Integer>> list = new ArrayList<>();
list.add(odd);
list.add(even);
list.add(big);
// 创建Array对象
Observable<Integer>[] observables = new Observable[3];
observables[0] = odd;
observables[1] = even;
observables[2] = big;
// 创建发射Observable序列的Observable
Observable<ObservableSource<Integer>> sources = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {
@Override
public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
emitter.onNext(Observable.just(1));
emitter.onNext(Observable.just(1, 2));
emitter.onNext(Observable.just(1, 2, 3));
emitter.onNext(Observable.just(1, 2, 3, 4));
emitter.onNext(Observable.just(1, 2, 3, 4, 5));
emitter.onComplete();
}
});
// 1. merge(ObservableSource source1, ObservableSource source2, ..., ObservableSource source4)
// 可接受 2-4 个Observable对象进行merge
Observable.merge(odd, even)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(1): " + integer);
}
});
System.out.println("-----------------------------------------------");
// 2. merge(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int bufferSize)
// 可选参数, maxConcurrency: 最大的并发处理数, bufferSize: 缓存的数量(从每个内部观察资源预取的项数)
// 接受一个Observable的列表List
Observable.merge(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(2): " + integer);
}
});
System.out.println("-----------------------------------------------");
// 3. mergeArray(int maxConcurrency, int bufferSize, ObservableSource<? extends T>... sources)
// 可选参数, maxConcurrency: 最大的并发处理数, bufferSize: 缓存的数量(从每个内部观察资源预取的项数)
// 接受一个Observable的数组Array
Observable.mergeArray(observables)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(3): " + integer);
}
});
System.out.println("-----------------------------------------------");
// 4. merge(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency)
// 可选参数, maxConcurrency: 最大的并发处理数
// 接受一个发射Observable序列的Observable
Observable.merge(sources)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(4): " + integer);
}
});
System.out.println("-----------------------------------------------");
// 5. mergeWith(other)
// merge 是静态方法, mergeWith 是对象方法: Observable.merge(odd,even) 等价于 odd.mergeWith(even)
odd.mergeWith(even)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(5): " + integer);
}
});
输出:
--> accept(1): 1
--> accept(1): 3
--> accept(1): 5
--> accept(1): 2
--> accept(1): 4
--> accept(1): 6
-----------------------------------------------
--> accept(2): 1
--> accept(2): 3
--> accept(2): 5
--> accept(2): 2
--> accept(2): 4
--> accept(2): 6
--> accept(2): 188888
--> accept(2): 688888
--> accept(2): 888888
-----------------------------------------------
--> accept(3): 1
--> accept(3): 3
--> accept(3): 5
--> accept(3): 2
--> accept(3): 4
--> accept(3): 6
--> accept(3): 188888
--> accept(3): 688888
--> accept(3): 888888
-----------------------------------------------
--> accept(4): 1
--> accept(4): 1
--> accept(4): 2
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 5
-----------------------------------------------
--> accept(5): 1
--> accept(5): 3
--> accept(5): 5
--> accept(5): 2
--> accept(5): 4
--> accept(5): 6
Javadoc: merge(source1, ... , source4)
Javadoc: merge(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArray(int maxConcurrency, int bufferSize, ObservableSource... sources)
Javadoc: merge(ObservableSourcesources, int maxConcurrency)
如果传递给 merge 的任何一个的Observable发射了 onError
通知终止了, merge 操作符生成的Observable也会立即以onError
通知终止。如果你想让它继续发射数据,在最后才报告错误,可以使用 mergeDelayError
。
MergeDelayError
操作符,mergeDelayError 在合并与交错输出的使用上与 merge
相同,区别在于它会保留 onError
通知直到其他没有Error的Observable所有的数据发射完成,在那时它才会把onError
传递给观察者。
注意: 如果有多个原始Observable出现了Error
, 这些Error通知会被合并成一个 CompositeException
,保留在CompositeException 内部的 List<Throwable> exceptions
中,但是如果只有一个原始Observable出现了Error,则不会生成 CompositeException ,只会发送这个Error通知。
由于MergeDelayError
使用上和merge
相同 ,所以这里就不做详细分析了,这里就简单描述其中的一种的使用实例。
实例代码:
// 创建有Error的Observable序列的Observable
Observable<ObservableSource<Integer>> DelayErrorObservable = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {
@Override
public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
emitter.onNext(Observable.just(1));
emitter.onNext(Observable.error(new Exception("Error Test1"))); // 发射一个Error的通知的Observable
emitter.onNext(Observable.just(2, 3));
emitter.onNext(Observable.error(new Exception("Error Test2"))); // 发射一个Error的通知的Observable
emitter.onNext(Observable.just(4, 5, 6));
emitter.onComplete();
}
});
// 6. mergeDelayError
// 保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者
Observable.mergeDelayError(DelayErrorObservable)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(6)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(6): " + integer);
}
@Override
public void onError(Throwable e) {
// 判断是否是CompositeException对象(发生多个Observable出现Error时会发送的对象)
if (e instanceof CompositeException) {
CompositeException compositeException = (CompositeException) e;
List<Throwable> exceptions = compositeException.getExceptions();
System.out.println("--> onError(6): " + exceptions);
} else {
System.out.println("--> onError(6): " + e);
}
}
@Override
public void onComplete() {
System.out.println("--> onComplete(6)");
}
});
输出:
--> onSubscribe(6)
--> onNext(6): 1
--> onNext(6): 2
--> onNext(6): 3
--> onNext(6): 4
--> onNext(6): 5
--> onNext(6): 6
--> onError(6): [java.lang.Exception: Error Test1, java.lang.Exception: Error Test2]
Javadoc: mergeDelayError(source1, … , source4)
Javadoc: mergeDelayError(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArrayDelayError(int maxConcurrency, int bufferSize, ObservableSource… sources)
Javadoc: mergeDelayError(ObservableSource sources, int maxConcurrency)
通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个 结合体 发射单个数据项。
Zip
操作符与 Merge
类似,都是合并多个Observables的数据,返回一个Obversable,主要不同的是它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。 它只发射与发射数据项最少的那个Observable一样多的数据。
解析:
Zip
操作符与 Merge
的使用上基本一致,主要不同的是 zip 发射的数据取决于发射数据项最少的那个Observable并且按照严格的顺序去结合数据。zip
与对象方法 zipWith
,可以传递一个Observable列表 List ,数组,甚至是一个发射Observable序列的Observable。使用上在此就不做详细的展开了,可参照上面的 Merge
使用方法,下面就针对 zip
的特性实现一个简单的实例。
实例代码:
// 创建Observable
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(1, 2, 3, 4, 5, 6);
// zip(sources)
// 可接受2-9个参数的Observable,对其进行顺序合并操作,最终合并的数据项取决于最少的数据项的Observable
Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer t1, Integer t2) throws Exception {
System.out.println("--> apply: t1 = " + t1 + ", t2 = " + t2);
return t1 + t2 + "";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("--> accept: " + s); // 最终接受observable1全部数据项与observable2相同数量顺序部分数据
}
});
输出:
--> apply: t1 = 1, t2 = 1
--> accept: 2
--> apply: t1 = 2, t2 = 2
--> accept: 4
--> apply: t1 = 3, t2 = 3
--> accept: 6
Javadoc: zip( source1, source2, ... , source9, zipper )
Javadoc: zip( Iterable sources, Function zipper )
Javadoc: zipIterable(Iterable<ObservableSource> sources, Function<Object[],R> zipper, boolean delayError, int bufferSize)
Javadoc: zipArray( Function<Object[]> zipper, boolean delayError, int bufferSize, ObservableSource... sources )
Javadoc: zip( ObservableSource<ObservableSource> sources, Function<Object[]> zipper )
在数据序列的开头插入一条指定的数据项或者数据序列。
如果你想要一个Observable在发射数据之前先发射一个指定的数据或者数据序列(可以是单个数据、数组、列表,Observable中的数据),可以使 用 StartWith
操作符。(如果你想一个Observable发射的数据末尾追加一个数据序列可以使用 Concat
操作符。)
实例代码:
// 创建列表List
List<Integer> lists = new ArrayList<>();
lists.add(999);
lists.add(9999);
lists.add(99999);
// 创建数组Array
Integer[] arrays = new Integer[3];
arrays[0] = 999;
arrays[1] = 9999;
arrays[2] = 9999;
// 1. startWith(item)
// 在Observable数据发射前发射item数据项
Observable.just(1, 2, 3)
.startWith(999)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(1): " + integer);
}
});
System.out.println("-----------------------------------------");
// 2. startWith(Iterable items)
// 在Observable数据发射前发射items列表中的数据序列
Observable.just(1, 2, 3)
.startWith(lists)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(2): " + integer);
}
});
System.out.println("-----------------------------------------");
// 3. startWithArray(items)
// 在Observable数据发射前发射items数组中的数据序列
Observable.just(1, 2, 3)
.startWithArray(arrays)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(3): " + integer);
}
});
System.out.println("-----------------------------------------");
// 4. startWith(ObservableSource other)
// 在Observable数据发射前发射other中的数据序列
Observable.just(1, 2, 3)
.startWith(Observable.just(999, 9999, 99999))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(4): " + integer);
}
});
输出:
--> accept(1): 999
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
-----------------------------------------
--> accept(2): 999
--> accept(2): 9999
--> accept(2): 99999
--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
-----------------------------------------
--> accept(3): 999
--> accept(3): 9999
--> accept(3): 9999
--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
-----------------------------------------
--> accept(4): 999
--> accept(4): 9999
--> accept(4): 99999
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
Javadoc: startWith(item)
Javadoc: startWith(Iterable items)
Javadoc: startWithArray(items)
Javadoc: startWith(ObservableSource other)
将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些 Observables最近发射的数据项。
switchOnNext
订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个, switchOnNext
发射的这个新Observable并取消订阅前一个发射数据的旧Observable,开始发射最新的Observable发射的数据。
注意: 当原始Observables发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅之前的那个Observable。这意味着,在 后来那个Observable产生之后到它开始发射数据之前的这段时间里,前一个Observable发射 的数据将被丢弃(就像图例上的那个黄色圆圈一样)。
当Observables
发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据,如果Observables中的Observable有 Error
异常,将保留 onError
通知直到其他没有Error的Observable所有的数据发射完成,在那时它才会把 onError 传递给观察者。
注意: 如果有多个原始Observable出现了Error
, 这些Error通知会被合并成一个 CompositeException
,保留在CompositeException 内部的 List<Throwable> exceptions
中,但是如果只有一个原始Observable出现了Error,则不会生成 CompositeException ,只会发送这个Error通知。
实例代码:
// 创建Observable
Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS);
// 创建发射Observable序列的Observable
Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {
@Override
public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
emitter.onNext(observable1);
Thread.sleep(1000);
// 此时发射一个新的observable2,将会取消订阅observable1
emitter.onNext(observable2);
emitter.onComplete();
}
});
// 创建发射含有Error通知的Observable序列的Observable
Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {
@Override
public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
emitter.onNext(observable1);
emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 发射一个发射Error通知的Observable
emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 发射一个发射Error通知的Observable
Thread.sleep(1000);
// 此时发射一个新的observable2,将会取消订阅observable1
emitter.onNext(observable2);
emitter.onComplete();
}
});
// 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
// 可选参数 bufferSize: 缓存数据项大小
// 接受一个发射Observable序列的Observable类型的sources,
// 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据
Observable.switchOnNext(sources)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long integer) throws Exception {
System.out.println("--> accept(1): " + integer);
}
});
System.in.read();
System.out.println("--------------------------------------------------------------------");
// 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
// 可选参数 prefetch: 与读取数据项大小
// 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据,
// 保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者
Observable.switchOnNextDelayError(sourcesError)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(2)");
}
@Override
public void onNext(Long t) {
System.out.println("--> onNext(2): " + t);
}
@Override
public void onError(Throwable e) {
// 判断是否是CompositeException对象(发生多个Observable出现Error时会发送的对象)
if (e instanceof CompositeException) {
CompositeException compositeException = (CompositeException) e;
List<Throwable> exceptions = compositeException.getExceptions();
System.out.println("--> onError(2): " + exceptions);
} else {
System.out.println("--> onError(2): " + e);
}
}
@Override
public void onComplete() {
System.out.println("--> onComplete(2)");
}
});
System.in.read();
输出:
--> accept(1): 1
--> accept(1): 2
--> accept(1): 10
--> accept(1): 11
--> accept(1): 12
--> accept(1): 13
--> accept(1): 14
--------------------------------------------------------------------
--> onSubscribe(2)
--> onNext(2): 10
--> onNext(2): 11
--> onNext(2): 12
--> onNext(2): 13
--> onNext(2): 14
--> onError(2): [java.lang.Exception: Error Test1!, java.lang.Exception: Error Test2!]
Javadoc: switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
Javadoc: switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
Rxjava 的合并操作符能够同时处理多个被观察者,并发送相应的事件通知以及数据。常常应用于多业务合并处理场景,比如表单的联动验证,网络交互性数据的校验等,rxjava的合并操作符能够很好的去实现和处理。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例
实例代码:
标签:介绍 基于 cti 并且 layer obs https turn 登录
原文地址:https://www.cnblogs.com/jiangming-blogs/p/12127299.html