标签:
在实际开发过程中,实现一个功能,需要通过多个线程来处理。比如,房间温控监视仪需要监视不同房间的温度,温度传感器会实时传递并保存至数据库,意味着我们需要处理多个Observables。在这种情况下,该如何处理这些Observables呢?本篇博客,我们将学习组合操作符,了解如何同时处理多个Observables来创建我们想要的Observable。
List<Student> list_0 = new ArrayList<>();
List<Student> list_1 = new ArrayList<>();
list_0.add(new Student("Merge-A11", "20", "1101"));
list_0.add(new Student("Merge-A12", "23", "1102"));
list_0.add(new Student("Merge-A13", "22", "1103"));
list_0.add(new Student("Merge-A14", "21", "1104"));
list_0.add(new Student("Merge-A15", "20", "1105"));
list_1.add(new Student("Merge-B11", "20", "1101"));
list_1.add(new Student("Merge-B12", "23", "1102"));
list_1.add(new Student("Merge-B13", "22", "1103"));
Observable<Student> obs_0 = Observable.from(list_0);
Observable<Student> obs_1 = Observable.from(list_1);
Observable<Student> obsMerge = Observable.merge(obs_0, obs_1);
obsMerge.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
}); Observable<Student> obs_stu = Observable.from(mLists);
Observable<Long> obs_long = Observable.interval(1, TimeUnit.SECONDS);
Observable.zip(obs_stu, obs_long, new Func2<Student, Long, Student>() {
@Override
public Student call(Student student, Long aLong) {
return updateTitle(student, aLong);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
});
private Student updateTitle(Student stu, Long time) {
stu.setName(time + " - " + stu.getName());
return stu;
} Observable<Student> obs_stu = Observable.interval(1, TimeUnit.SECONDS)
.map(new Func1<Long, Student>() {
@Override
public Student call(Long aLong) {
return mLists.get(aLong.intValue());
}
});
Observable<Long> obs_time = Observable.interval(1, TimeUnit.SECONDS);
obs_stu.join(obs_time,
student -> Observable.timer(2,TimeUnit.SECONDS),
time -> Observable.interval(2, TimeUnit.SECONDS),
this::updateTitle)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student student) {
mAdaStu.addData(student);
mListsAdd.add(student);
int position = mListsAdd.size() - 1;
mAdaStu.addDataByPosition(position, student);
rvCombining.smoothScrollToPosition(position);
}
});combineLatest操作符流程图:
Observable<Student> obs_stu = Observable.interval(1, TimeUnit.SECONDS)
.map(position -> mLists.get(position.intValue()));
Observable<Long> obs_time =
Observable.interval(1500, TimeUnit.MILLISECONDS);
Observable.combineLatest(obs_stu,
obs_time,
this::updateTitle)
. observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
}); And,Then和When操作符的流程图如下:
Observable<Student> obs_stu = Observable.from(mLists);
Observable<Long> obs_time = Observable.interval(1, TimeUnit.SECONDS);
Pattern2<Student, Long> pattern = JoinObservable.from(obs_stu).and(obs_time);
Plan0<Student> plan = pattern.then(this::updateTitle);
JoinObservable
.when(plan)
.toObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
}); Subscription sub = Observable.interval(1, TimeUnit.SECONDS)
.map(position -> mLists.get(position.intValue()))
.startWith(mLists.get(2))
. observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Student>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student student) {
mAdaStu.addData(student);
}
});正如示例代码中,调用.startWith(mLists.get(2))函数,欲将第三个数据优先从数列中发出。而在实际效果图上,数据A13优先被观察者接收,恰恰印证了这一点。
标签:
原文地址:http://blog.csdn.net/io_field/article/details/51418233