标签:
在实际开发过程中,实现一个功能,需要通过多个线程来处理。比如,房间温控监视仪需要监视不同房间的温度,温度传感器会实时传递并保存至数据库,意味着我们需要处理多个Observables。在这种情况下,该如何处理这些Observables呢?本篇博客,我们将学习组合操作符,了解如何同时处理多个Observables来创建我们想要的Observable。
List<Student> list_0 = new ArrayList<>(); List<Student> list_1 = new ArrayList<>(); list_0.add(new Student("Merge-A11", "20", "1101")); list_0.add(new Student("Merge-A12", "23", "1102")); list_0.add(new Student("Merge-A13", "22", "1103")); list_0.add(new Student("Merge-A14", "21", "1104")); list_0.add(new Student("Merge-A15", "20", "1105")); list_1.add(new Student("Merge-B11", "20", "1101")); list_1.add(new Student("Merge-B12", "23", "1102")); list_1.add(new Student("Merge-B13", "22", "1103")); Observable<Student> obs_0 = Observable.from(list_0); Observable<Student> obs_1 = Observable.from(list_1); Observable<Student> obsMerge = Observable.merge(obs_0, obs_1); obsMerge.subscribe(new Observer<Student>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Student student) { mAdaStu.addData(student); } });
Observable<Student> obs_stu = Observable.from(mLists); Observable<Long> obs_long = Observable.interval(1, TimeUnit.SECONDS); Observable.zip(obs_stu, obs_long, new Func2<Student, Long, Student>() { @Override public Student call(Student student, Long aLong) { return updateTitle(student, aLong); } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Student>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Student student) { mAdaStu.addData(student); } }); private Student updateTitle(Student stu, Long time) { stu.setName(time + " - " + stu.getName()); return stu; }
Observable<Student> obs_stu = Observable.interval(1, TimeUnit.SECONDS) .map(new Func1<Long, Student>() { @Override public Student call(Long aLong) { return mLists.get(aLong.intValue()); } }); Observable<Long> obs_time = Observable.interval(1, TimeUnit.SECONDS); obs_stu.join(obs_time, student -> Observable.timer(2,TimeUnit.SECONDS), time -> Observable.interval(2, TimeUnit.SECONDS), this::updateTitle) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Student>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Student student) { mAdaStu.addData(student); mListsAdd.add(student); int position = mListsAdd.size() - 1; mAdaStu.addDataByPosition(position, student); rvCombining.smoothScrollToPosition(position); } });
combineLatest操作符流程图:
Observable<Student> obs_stu = Observable.interval(1, TimeUnit.SECONDS) .map(position -> mLists.get(position.intValue())); Observable<Long> obs_time = Observable.interval(1500, TimeUnit.MILLISECONDS); Observable.combineLatest(obs_stu, obs_time, this::updateTitle) . observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Student>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Student student) { mAdaStu.addData(student); } });
And,Then和When操作符的流程图如下:
Observable<Student> obs_stu = Observable.from(mLists); Observable<Long> obs_time = Observable.interval(1, TimeUnit.SECONDS); Pattern2<Student, Long> pattern = JoinObservable.from(obs_stu).and(obs_time); Plan0<Student> plan = pattern.then(this::updateTitle); JoinObservable .when(plan) .toObservable() .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Student>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Student student) { mAdaStu.addData(student); } });
Subscription sub = Observable.interval(1, TimeUnit.SECONDS) .map(position -> mLists.get(position.intValue())) .startWith(mLists.get(2)) . observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Student>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Student student) { mAdaStu.addData(student); } });
正如示例代码中,调用.startWith(mLists.get(2))函数,欲将第三个数据优先从数列中发出。而在实际效果图上,数据A13优先被观察者接收,恰恰印证了这一点。
标签:
原文地址:http://blog.csdn.net/io_field/article/details/51418233