标签:
Filter
—> 只发射通过了谓词测试的数据项Filter
操作符使用你指定的一个谓词函数测试数据项,只有通过测试的数据才会被发射。
示例代码
Observable.just(1, 2, 3, 4, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer item) {
return( item < 4 );
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
输出:
Next: 1
Next: 2
Next: 3
Sequence complete.
filter
默认不在任何特定的调度器上执行。
OfType
—> ofType是filter操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。ofType
默认不在任何特定的调度器上指定。
示例代码:
Observable.just(1,"sb",0.1f).ofType(String.class).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
输出:
sb
Take
—> 只发射开始的N项数据
使用Take
操作符让你可以修改Observable
的行为,只返回前面的N
项数据,然后发射完成通知,忽略剩余的数据。
如果你对一个Observable
使用take(n)
(或它的同义词limit(n)
)操作符,而那个Observable
发射的数据少于N项,那么take操作生成的Observable
不会抛异常或发射onError通知,在完成前它只会发射相同的少量数据。
示例代码:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.take(4)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
输出:
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
take(int)
默认不任何特定的调度器上执行。
take
的这个变体接受一个时长而不是数量参数。它会丢发射Observable
开始的那段时间发射的数据,时长和时间单位通过参数指定。
take
的这个变体默认在computation
调度器上执行,但是你可以使用第三个参数指定其它的调度器。
TakeLast
—> 只发射最后N个元素takeLast
操作符是把源Observable
产生的结果的后n
项提交给订阅者,提交时机是Observable
发布onCompleted
通知之时。
示例代码:
Observable.just(1,2,3,4,5,6,7).takeLast(2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
输出:
Next: 6
Next: 7
Sequence complete.
TakeLastBuffer
—> 将最后的N项数据当做单个数据发射它和takeLast
类似,,唯一的不同是它把所有的数据项收集到一个List
再发射,而不是依次发射一个。
示例代码:
Observable.just(1,2,3,4).takeLastBuffer(2).subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
String s = "";
for(Integer str : integers){
s = s +str +",";
}
System.out.println(s);
}
});
输出:
I/System.out: 3,4,
Skip
—> 跳过开始的N项数据抑制Observable
发射的前N
项数据
使用Skip
操作符,你可以忽略Observable
发射的前N
项数据,只保留之后的数据。
skip
的这个变体默认不在任何特定的调度器上执行。
示例代码
Observable.just(1,2,3,4).skip(1).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});
输出
2
3
4
skip
的这个变体接受一个时长而不是数量参数。它会丢弃原始Observable
开始的那段时间发射的数据,时长和时间单位通过参数指定。
skip
的这个变体默认在computation
调度器上执行,但是你可以使用第三个参数指定其它的调度器。
SkipLast
—> 跳过后面的N项数据抑制Observable
发射的后N
项数据
使用SkipLast
操作符修改原始Observable
,你可以忽略Observable
发射的后N
项数据,只保留前面的数据。
使用SkipLast
操作符,你可以忽略原始Observable
发射的后N
项数据,只保留之前的数据。注意:这个机制是这样实现的:延迟原始Observable
发射的任何数据项,直到它发射了N
项数据。
示例代码:
Observable.just(1,2,3,4).skipLast(1).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});
输出
1
2
3
skipLast
的这个变体默认不在任何特定的调度器上执行。
还有一个skipLast
变体接受一个时长而不是数量参数。它会丢弃在原始Observable
的生命周期内最后一段时间内发射的数据。时长和时间单位通过参数指定。
注意:这个机制是这样实现的:延迟原始Observable
发射的任何数据项,直到自这次发射之后过了给定的时长。
skipLast
的这个变体默认在computation
调度器上执行,但是你可以使用第三个参数指定其它的调度器。
Distinct
—> 过滤掉重复数据Distinct
的过滤规则是:只允许还没有发射过的数据项通过。
在某些实现中,有一些变体允许你调整判定两个数据不同(distinct
)的标准。还有一些实现只比较一项数据和它的直接前驱,因此只会从序列中过滤掉连续重复的数据。
distinct()
示例代码:
Observable.just(1, 2, 1, 1, 2, 3)
.distinct()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
输出
Next: 1
Next: 2
Next: 3
Sequence complete.
distinct(Func1)
这个操作符有一个变体接受一个函数。这个函数根据原始Observable
发射的数据项产生一个Key
,然后,比较这些Key
而不是数据本身,来判定两个数据是否是不同的。
示例代码:
Observable.just(1,2,3,4,5,6).distinct(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer%3;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});
输出: 1% 3= 1 , 2%3 =2 ,3%3 = 0 , 4%3 = 1 , 5%3=2 ,6%3 = 0 ,后面三个和前面三个的值重复去掉
I/System.out: 1
I/System.out: 2
I/System.out: 3
DistinctUntilChanged
—> 过滤掉连续重复的数据DistinctUntilChanged()
示例代码:
Observable.just(1,2,2,2,5,6).distinctUntilChanged().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});
输出:
1
2
5
6
DistinctUntilChanged(Func1)
和distinct(Func1)
一样,根据一个函数产生的Key
判定两个相邻的数据项是不是不同的。
示例代码
Observable.just(1,2,2,2,5,11).distinctUntilChanged(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer %2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});
输出
1
2
5
distinct
和distinctUntilChanged
默认不在任何特定的调度器上执行。
ElementAt
—> 发射第N项数据
ElementAt
操作符获取原始Observable
发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。
RxJava
将这个操作符实现为elementAt
,给它传递一个基于0
的索引值,它会发射原始Observable
数据序列对应索引位置的值,如果你传递给elementAt
的值为5
,那么它会发射第六
项的数据。
如果你传递的是一个负数
,或者原始Observable
的数据项数小于index+1
,将会抛出一个IndexOutOfBoundsException
异常。
示例代码:
Observable.just(1,2,3,4,5,6).elementAt(2)
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("Next:" + integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("Error:" + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed!");
}
});
输出:
Next:3
completed!
ElementAtOrDefault
—> 发射第N项数据,如果索引值大于数据项数,它会发射一个默认值(通过额外的参数指定)RxJava
还实现了elementAtOrDefault
操作符。与elementAt
的区别是,如果索引值大于数据项数,它会发射一个默认值(通过额外的参数指定),而不是抛出异常。但是如果你传递一个负数索引值,它仍然会抛出一个IndexOutOfBoundsException
异常。
示例代码:
Observable.just(1,2,3,4,5,6).elementAtOrDefault(13,999).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});
输出:
999
elementA
t和elementAtOrDefault
默认不在任何特定的调度器上执行。
First
—> 只发射第一项数据如果你只对Observable
发射的第一项数据,或者满足某个条件的第一项数据感兴趣,你可以使用First
操作符。
在某些实现中,First
没有实现为一个返回Observable
的过滤操作符,而是实现为一个在当时就发射原始Observable
指定数据项的阻塞函数。在这些实现中,如果你想要的是一个过滤操作符,最好使用Take(1)
或者ElementAt(0)
。
在一些实现中还有一个Single
操作符。它的行为与First
类似,但为了确保只发射单个值,它会等待原始Observable
终止(否则,不是发射那个值,而是以一个错误通知终止)。你可以使用它从原始Observable
获取第一项数据,而且也确保只发射一项数据。
在RxJava
中,这个操作符被实现为first,firstOrDefault
和takeFirst
。
可能容易混淆,BlockingObservable
也有名叫first
和firstOrDefault
的操作符,它们会阻塞并返回值,不是立即返回一个Observable
。
还有几个其它的操作符执行类似的功能。
First()
示例代码:
Observable.just(1, 2, 3)
.first()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
输出:
Next: 1
Sequence complete.
First(Func1)
传递一个谓词函数给first,然后发射这个函数判定为true的第一项数据。
示例代码:
Observable.just(1,2,3,4,5,6).first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>3;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});
输出
4
FirstOrDefault(T)
firstOrDefault
与first
类似,但是在Observable
没有发射任何数据时发射一个你在参数中指定的默认值。
示例代码:
Observable.empty().firstOrDefault("fuck you").subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println(o+"");
}
});
输出
fuck you
FirstOrDefault(T, Func1)
传递一个谓词函数给firstOrDefault,然后发射这个函数判定为true的第一项数据,如果没有数据通过了谓词测试就发射一个默认值。
示例代码:
Observable.just(1,2,3,4,5,6).firstOrDefault(99, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer == 4;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});
输出:
4
TakeFirst
—> 返回一个可观察到的发射仅由源观测中满足指定条件发射的第一个项目。takeFirst
操作符类似于take
操作符,同时也类似于first
操作符,都是获取源Observable
产生的结果列表中符合指定条件的前一个或多个,与first
操作符不同的是,first
操作符如果获取不到数据,则会抛出NoSuchElementException
异常,而takeFirst
则会返回一个空的Observable
,该Observable
只有onCompleted
通知而没有onNext
通知。
示例代码:
Observable.just(1,2,3,4,5,6,7).takeFirst(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
//获取数值大于3的数据
return integer>3;
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
输出
Next: 4
Sequence complete.
Single
—> single操作符也与first类似Single()
single
操作符也与first类似,但是如果原始Observable
在完成之前不是正好发射一次数据,它会抛出一个NoSuchElementException
。
示例代码:
Observable.just(1,2).single().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("===============>"+integer+"");
}
});
输出
rx.exceptions.OnErrorNotImplementedException: Sequence contains too many elements
Single(Func1)
single
的变体接受一个谓词函数,发射满足条件的单个值,如果不是正好只有一个数据项满足条件,会以错误通知终止。
示例代码:
Observable.just(1,2,3,4,5,6).single(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>5; // 输出值为6
return integer>3; // 报错 Sequence contains too many elements
return integer>6; // 报错 Sequence contains no elements
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});
输出:
return integer>5; // 输出值为6
return integer>3; // 报错 Sequence contains too many elements
return integer>6; // 报错 Sequence contains no elements
singleOrDefault(T)
和firstOrDefault
类似,但是如果原始Observable发射超过一个的数据,会以错误通知终止。
示例代码:
Observable.just("1","233").singleOrDefault("fuck you two").subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.i("sss",o+"");
}
});
输出:
rx.exceptions.OnErrorNotImplementedException: Sequence contains too many elements
和firstOrDefault(T, Func1)
类似,如果没有数据满足条件,返回默认值;如果有多个数据满足条件,以错误通知终止。
示例代码
Observable.just(1,2,3,4,5,6,7,8).singleOrDefault(666, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer s) {
return s>4; //rx.exceptions.OnErrorNotImplementedException: Sequence contains too many elements
return s>12; // 666
return s>7; // 8
}
}).subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.i("sss",o+"");
}
});
Last
—> 只发射最后一项(或者满足某个条件的最后一项)数据
如果你只对Observable
发射的最后一项数据,或者满足某个条件的最后一项数据感兴趣,你可以使用Last
操作符。
在某些实现中,Last
没有实现为一个返回Observable
的过滤操作符,而是实现为一个在当时就发射原始Observable
指定数据项的阻塞函数。在这些实现中,如果你想要的是一个过滤操作符,最好使用TakeLast(1)。
在RxJava
中的实现是last
和lastOrDefault
。
可能容易混淆,BlockingObservable
也有名叫last
和lastOrDefault
的操作符,它们会阻塞并返回值,不是立即返回一个Observable
。
last()
只发射最后一项数据,使用没有参数的last操作符。
示例代码:
Observable.just(1, 2, 3)
.last()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
输出:
Next: 3
Sequence complete.
last(Func1)
这个版本的last
也是接受一个谓词函数,返回一个发射原始Observable
中满足条件的最后一项数据的Observable
。
示例代码:
Observable.just(1,2,3,4,5,6,7,8).last(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer < 6;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer o) {
Log.i("sss",o+"");
tv.setText(""+o);
}
});
输出:
5
lastOrDefault(T)
lastOrDefault
与last
类似,不同的是,如果原始Observable
没有发射任何值,它发射你指定的默认值。
示例代码:
Observable.empty().lastOrDefault(99).subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.i("sss",o+"");
}
});
输出:
99
lastOrDefault(T,Fun1)
示例代码:
Observable.just(1,2,3,4,5,6,7,8,9).lastOrDefault(99, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 10;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer o) {
Log.i("sss",o+"");
}
});
输出:
999
Sample
—> 定期发射Observable最近发射的数据项Sample
操作符定时查看一个Observable
,然后发射自上次采样以来它最近发射的数据。
在某些实现中,有一个ThrottleFirst
操作符的功能类似,但不是发射采样期间的最近的数据,而是发射在那段时间内的第一项数据。
RxJava
将这个操作符实现为sample
和throttleLast
。
注意:如果自上次采样以来,原始Observable
没有发射任何数据,这个操作返回的Observable
在那段时间内也不会发射任何数据。
sample
的这个变体每当第二个Observable
发射一个数据(或者当它终止)时就对原始Observable
进行采样。第二个Observable
通过参数传递给sample
。
sample
的这个变体默认不在任何特定的调度器上执行。
示例代码:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if(subscriber.isUnsubscribed()) return;
try {
//前8个数字产生的时间间隔为1秒,后一个间隔为3秒
for (int i = 1; i < 9; i++) {
subscriber.onNext(i);
Thread.sleep(1000);
}
Thread.sleep(2000);
subscriber.onNext(9);
subscriber.onCompleted();
} catch(Exception e){
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread())
.sample(2200, TimeUnit.MILLISECONDS) //采样间隔时间为2200毫秒
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
输出:
Next: 3
Next: 5
Next: 7
Next: 8
Next: 9
Sequence complete.
sample
(别名throttleLast
)的一个变体按照你参数中指定的时间间隔定时采样(TimeUnit
指定时间单位)。
sample
的这个变体默认在computation
调度器上执行,但是你可以使用第三个参数指定其它的调度器。
ThrottleFirst
—> throttleFirst与throttleLast/sample不同,在每个采样周期内,它总是发射原始Observable的第一项数据,而不是最近的一项。
throttleFirst
操作符默认在computation
调度器上执行,但是你可以使用第三个参数指定其它的调度器。
示例代码:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if(subscriber.isUnsubscribed()) return;
try {
//前8个数字产生的时间间隔为1秒,后一个间隔为3秒
for (int i = 1; i < 9; i++) {
subscriber.onNext(i);
Thread.sleep(1000);
}
Thread.sleep(2000);
subscriber.onNext(9);
subscriber.onCompleted();
} catch(Exception e){
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread())
.throttleFirst(2200, TimeUnit.MILLISECONDS) //采样间隔时间为2200毫秒
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
输出:
Next: 1
Next: 4
Next: 7
Next: 9
Sequence complete.
ThrottleWithTimeout(?) or Debounce(?)
—> 只有当Observable在指定的时间后还没有发射数据时,才发射一个数据Debounce
操作符会过滤掉发射速率过快的数据项。
RxJava
将这个操作符实现为throttleWithTimeout
和debounce
。
注意:这个操作符会接着最后一项数据发射原始Observable
的onCompleted
通知,即使这个通知发生在你指定的时间窗口内(从最后一项数据的发射算起)。也就是说,onCompleted
通知不会触发限流。
throtleWithTimeout/debounce
的一个变体根据你指定的时间间隔进行限流,时间单位通过TimeUnit
参数指定。
这种操作符默认在computation
调度器上执行,但是你可以通过第三个参数指定。
示例代码:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if(subscriber.isUnsubscribed()) return;
try {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
Thread.sleep(i * 100);
}
subscriber.onCompleted();
}catch(Exception e){
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread())
.debounce(400, TimeUnit.MILLISECONDS)
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("Next:" + integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("Error:" + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed!");
}
});
输出:
Next:4
Next:5
Next:6
Next:7
Next:8
Next:9
completed!
debounce
操作符的一个变体通过对原始Observable
的每一项应用一个函数进行限流,这个函数返回一个Observable
。如果原始Observable
在这个新生成的Observable
终止之前发射了另一个数据,debounce
会抑制(suppress
)这个数据项。
debounce
的这个变体默认不在任何特定的调度器上执行。
Timeout
—> 如果在一个指定的时间段后还没发射数据,就发射一个异常
timeout(long,TimeUnit)
如果原始Observable
过了指定的一段时长没有发射任何数据,Timeout
操作符会以一个onError
通知终止这个Observable
。
第一个变体接受一个时长参数,每当原始Observable
发射了一项数据,timeout
就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable
没有发射另一项数据,timeout
就抛出TimeoutException
,以一个错误通知终止Observable
。
这个timeout
默认在computation
调度器上执行,你可以通过参数指定其它的调度器。
示例代码:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if(subscriber.isUnsubscribed()) return;
try {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
Thread.sleep(i*100);
}
subscriber.onCompleted();
}catch(Exception e){
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread())
.timeout(300,TimeUnit.MILLISECONDS)
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("Next:" + integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("Error:" + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed!");
}
});
输出:
Next:0
Next:1
Next:2
Next:3
Next:4
Error:null
timeout(long,TimeUnit,Observable)
这个版本的timeout
在超时时会切换到使用一个你指定的备用的Observable
,而不是发错误通知。它也默认在computation
调度器上执行。
示例代码:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if(subscriber.isUnsubscribed()) return;
try {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
Thread.sleep(i*100);
}
subscriber.onCompleted();
}catch(Exception e){
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread())
.timeout(300,TimeUnit.MILLISECONDS,Observable.just(555))
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("Next:" + integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("Error:" + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed!");
}
});
输出:
Next:0
Next:1
Next:2
Next:3
Next:555
completed!
timeout(Func1)
这个版本的timeout
使用一个函数针对原始Observable
的每一项返回一个Observable
,如果当这个Observable
终止时原始Observable
还没有发射另一项数据,就会认为是超时了,timeout
就抛出TimeoutException
,以一个错误通知终止Observable
。
这个timeout
默认在immediate
调度器上执行。
这个版本的timeout
同时指定超时时长和备用的Observable
。它默认在immediate
调度器上执行。
这个版本的time
除了给每一项设置超时,还可以单独给第一项设置一个超时。它默认在immediate
调度器上执行。
同上,但是同时可以指定一个备用的Observable
。它默认在immediate
调度器上执行。
IgnoreElements
—> 丢弃所有的正常数据,只发射错误或完成通知不发射任何数据,只发射Observable
的终止通知
IgnoreElements
操作符抑制原始Observable
发射的所有数据,只允许它的终止通知(onError
或onCompleted
)通过。
如果你不关心一个Observable
发射的数据,但是希望在它完成时或遇到错误终止时收到通知,你可以对Observable
使用ignoreElements
操作符,它会确保永远不会调用观察者的onNext()
方法。
RxJava
将这个操作符实现为ignoreElements
。
示例代码:
Observable.just(1,2,3,4,5,6,7,8).ignoreElements()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
输出:
Sequence complete.
RxJava 学习笔记(七) --- Filtering 过滤操作
标签:
原文地址:http://blog.csdn.net/urrjdg/article/details/51889738