码迷,mamicode.com
首页 > 移动开发 > 详细

Android RxJava使用介绍(四) RxJava的操作符

时间:2016-05-27 12:32:30      阅读:223      评论:0      收藏:0      [点我收藏+]

标签:

本篇文章继续介绍以下类型的操作符

  • Observable Utility Operators(Observable的功能性操作符)
  • Conditional and Boolean Operators(Observable的条件操作符)

Observable Utility Operators(Observable的功能性操作符)

combineLatest操作符

顾名思义,Delay操作符就是让发射数据的时机延后一段时间,这样所有的数据都会依次延后一段时间发射。在Rxjava中将其实现为Delay和DelaySubscription。不同之处在于Delay是延时数据的发射,而DelaySubscription是延时注册Subscriber。流程图如下:

技术分享

下面我们使用Delay和DelaySubscribtion操作符来延迟两个Observable数据的发射,调用例子如下:

private Observable<Long> delayObserver() {
        return createObserver(2).delay(2000, TimeUnit.MILLISECONDS);
    }

    private Observable<Long> delaySubscriptionObserver() {
        return createObserver(2).delaySubscription(2000, TimeUnit.MILLISECONDS);
    }

    private Observable<Long> createObserver(int index) {
        return Observable.create(new Observable.OnSubscribe<Long>() {
            @Override
            public void call(Subscriber<? super Long> subscriber) {
                log("subscrib:" + getCurrentTime());
                for (int i = 1; i <= index; i++) {
                    subscriber.onNext(getCurrentTime());
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).subscribeOn(Schedulers.newThread());
    }

    private long getCurrentTime() {
        return System.currentTimeMillis()/1000;
    }

分别对其进行注册

mLButton.setText("delay");
        mLButton.setOnClickListener(e -> {
            log("start subscrib:" + getCurrentTime());
            delayObserver().subscribe(i -> log("delay:" + (getCurrentTime() - i)));
        });
        mRButton.setText("delaySubscription");
        mRButton.setOnClickListener(e -> {
            log("start subscrib:" + getCurrentTime());
            delaySubscriptionObserver().subscribe(i -> log("delaySubscription:" + i));
        });

运行结果如下。可以看到两个操作符都让我们达到了延迟2秒后再发射数据的目的
技术分享

二、Do

Do操作符就是给Observable的生命周期的各个阶段加上一系列的回调监听,当Observable执行到这个阶段的时候,这些回调就会被触发。在Rxjava实现了很多的doxxx操作符。

DoOnEach可以给Observable加上这样的样一个回调:Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted。
DoOnNext则只有onNext的时候才会被触发。
技术分享技术分享

doOnSubscribe和doOnUnSubscribe则会在Subscriber进行订阅和反订阅的时候触发回调。当一个Observable通过OnError或者OnCompleted结束的时候,会反订阅所有的Subscriber。
技术分享技术分享
DoOnError会在OnError发生的时候触发回调,并将Throwable对象作为参数传进回调函数里;DoOnComplete会在OnCompleted发生的时候触发回调。
技术分享技术分享
DoOnTerminate会在Observable结束前触发回调,无论是正常还是异常终止;finallyDo会在Observable结束后触发回调,无论是正常还是异常终止。
技术分享技术分享
好了,介绍了这么多do的操作符,我们接下来创建两个Observable对象,并分别用上面的一系列do操作符进行注册回调

private Observable<Integer> doOnEachObserver() {
        return Observable.just(1, 2, 3)
                .doOnEach(notification -> log("doOnEach send " + notification.getValue() + " type:" + notification.getKind()))
                .doOnNext(aInteger -> log("doOnNext send " + aInteger))
                .doOnSubscribe(() -> log("on subscribe"))
                .doOnUnsubscribe(() -> log("on unsubscribe\n"))
                .doOnCompleted(() -> log("onCompleted"));

    }

    private Observable<Integer> doOnErrorObserver() {
        return createObserver()
                .doOnEach(notification -> log("doOnEach send " + notification.getValue() + " type:" + notification.getKind()))
                .doOnError(throwable -> log("OnError:" + throwable.getMessage()))
                .doOnTerminate(() -> log("OnTerminate"))
                .finallyDo(() -> log("finallyDo"));
    }

    private Observable<Integer> createObserver() {
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 1; i <= 5; i++) {
                    if (i <= 3) {
                        subscriber.onNext(i);
                    } else {
                        subscriber.onError(new Throwable("num>3"));
                    }
                }
            }
        });
    }

分别进行订阅

mLButton.setText("do");
        mLButton.setOnClickListener(e -> doOnEachObserver().subscribe(i -> log("do:" + i)));
        mRButton.setText("doOnError");
        mRButton.setOnClickListener(e -> doOnErrorObserver().subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                log("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                log("subscriber onError:" + e.getMessage());
            }

            @Override
            public void onNext(Integer integer) {
                log("subscriber onNext:" + integer);
            }
        }));

运行结果如下所示。可以看到各个回调的监听都被依次触发。
技术分享

三、Meterialize

Meterialize操作符将OnNext/OnError/OnComplete都转化为一个Notification对象并按照原来的顺序发射出来,而DeMeterialize则是执行相反的过程。

下面我们使用这两个操作符来处理两个Observable对象

private Observable<Notification<Integer>> meterializeObserver() {
        return Observable.just(1, 2, 3).materialize();
    }

    private Observable<Integer> deMeterializeObserver() {
        return eterializeObserver().dematerialize();
    }

分别进行订阅

mLButton.setText("meterialize");
mLButton.setOnClickListener(e -> meterializeObserver().subscribe(i -> log("meterialize:" + i.getValue() + " type" + i.getKind())));
mRButton.setText("deMeterialize");
mRButton.setOnClickListener(e -> deMeterializeObserver().subscribe(i->log("deMeterialize:"+i)));

运行结果如下所示,可以看到onComplete也被meterialize包装后发射了出来,onError也同样。
技术分享

四、SubscribOn/ObserverOn

这两个操作符在前面的例子中我们已经使用过多次了,使用起来十分方便。在android开发中,相信大家一定都遇到过不能在主线程修改UI的问题,所以不得不使用Handler、AsyncTask等来更新UI界面。使用SubscribOn和ObserverOn操作符,各种线程的问题都将变得十分地简单。

SubscribOn用来指定Observable在哪个线程上运行,我们可以指定在IO线程上运行也可以让其新开一个线程运行,当然也可以在当前线程上运行。一般来讲会指定在各种后台线程而不是主线程上运行,就如同AsyncTask的doInBackground一样。
技术分享
ObserverOn用来指定观察者所运行的线程,也就是发射出的数据在那个线程上使用。在android中,如果我们要修改UI界面,观察者就必须在主线程上运行,就如同AsyncTask的onPostExecute。
技术分享
下面创建两个Observable并使用ObserverOn和SubscribOn使Observable和观察者运行在不同的线程上。

private Observable<Integer> observerOnObserver() {
        return createObserver()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.newThread());
    }

    private Observable<Integer> subscribeOnObserver() {
        return createObserver()
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.immediate());
    }

    private Observable<Integer> createObserver() {
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                log("on subscrib:" + Thread.currentThread().getName());
                subscriber.onNext(1);
                subscriber.onCompleted();
            }
        });
    }

分别进行订阅

mLButton.setText("observerOn");
mLButton.setOnClickListener(e -> observerOnObserver().subscribe(i -> log("observerOn:" + Thread.currentThread().getName())));
mRButton.setText("subscribeOn");
mRButton.setOnClickListener(e -> subscribeOnObserver().subscribe(i -> log("subscribeOn:" + Thread.currentThread().getName())));

运行结果如下
技术分享

五、TimeInterval\TimeStamp

TimeInterval会拦截发射出来的数据,取代为前后两个发射两个数据的间隔时间。对于第一个发射的数据,其时间间隔为订阅后到首次发射的间隔。
技术分享
TimeStamp会将每个数据项给重新包装一下,加上了一个时间戳来标明每次发射的时间
技术分享
下面使用这两个操作符来处理两个Observable对象

 private Observable<TimeInterval<Integer>> timeIntervalObserver() {
        return createObserver().timeInterval();
    }

    private Observable<Timestamped<Integer>> timeStampObserver() {
        return createObserver().timestamp();
    }

    private Observable<Integer> createObserver() {
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i <= 3; i++) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.newThread());
    }
mLButton.setText("timeInterval");
mLButton.setOnClickListener(e -> timeIntervalObserver().subscribe(i -> log("timeInterval:" + i.getValue()+"-"+i.getIntervalInMilliseconds())));
mRButton.setText("timeStamp");
mRButton.setOnClickListener(e -> timeStampObserver().subscribe(i -> log("timeStamp:" + i.getValue()+"-"+i.getTimestampMillis())));

运行结果如下所示。
技术分享

六、Timeout
Timeout操作符给Observable加上超时时间,每发射一个数据后就重置计时器,当超过预定的时间还没有发射下一个数据,就抛出一个超时的异常。
Rxjava将Timeout实现为很多不同功能的操作符,比如说超时后用一个备用的Observable继续发射数据等。
技术分享技术分享
下面我们创建一个Observable,逐渐加大间隔地发射数据,并使用timeout加上超时的限制。

private Observable<Integer> timeoutObserver() {
        return createObserver().timeout(200, TimeUnit.MILLISECONDS);
    }

    private Observable<Integer> timeoutobserverObserver() {
        return createObserver().timeout(200, TimeUnit.MILLISECONDS, Observable.just(5, 6));
    }

    private Observable<Integer> createObserver() {
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i <= 3; i++) {
                    try {
                        Thread.sleep(i * 100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        });
    }
mLButton.setText("timeout");
        mLButton.setOnClickListener(e -> timeoutObserver().subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                log(e);
            }

            @Override
            public void onNext(Integer integer) {
                log("timeout:" + integer);
            }
        }));
        mRButton.setText("timeoutobserver");
        mRButton.setOnClickListener(e -> timeoutobserverObserver().subscribe(i -> log(i)));

运行结果
技术分享

七、Using

Using操作符创建一个在Observable生命周期内存活的资源,也可以这样理解:我们创建一个资源并使用它,用一个Observable来限制这个资源的使用时间,当这个Observable终止的时候,这个资源就会被销毁。

Using需要使用三个参数,分别是:
1.创建这个一次性资源的函数
2.创建Observable的函数
技术分享
下面我们定义了一个Animal类,并使用Using来控制其创建和释放。

private Observable<Long> usingObserver() {
        return Observable.using(() -> new Animal(), i -> Observable.timer(5000,TimeUnit.MILLISECONDS), o -> o.relase());
    }

    private class Animal {
        Subscriber subscriber = new Subscriber() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {
                log("animal eat");
            }
        };

        public Animal() {
            log("create animal");
            Observable.interval(1000, TimeUnit.MILLISECONDS)
                    .subscribe(subscriber);
        }

        public void relase() {
            log("animal released");
            subscriber.unsubscribe();
        }
    }

订阅

(此处)折叠或打开
Observable<Long> observable = usingObserver();
        Subscriber subscriber = new Subscriber() {
            @Override
            public void onCompleted() {
                log("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                log("onError");
            }

            @Override
            public void onNext(Object o) {
                log("onNext"+o);
            }
        };
        mLButton.setText("using");
        mLButton.setOnClickListener(e -> observable.subscribe(subscriber));
        mRButton.setText("unSubscrib");
        mRButton.setOnClickListener(e -> subscriber.unsubscribe());

运行结果如下。在订阅了几秒之后,对其进行反订阅,Observable就会终止从而触发Animal的释放。
技术分享
关于辅助操作符就到这里了,本文中的demo程序见https://github.com/Chaoba/RxJavaDemo

上面内容参考:http://blog.chinaunix.net/uid-20771867-id-5206187.html


Conditional and Boolean Operators(Observable的条件操作符)

一、All/Amb

All操作符根据一个函数对源Observable发射的所有数据进行判断,最终返回的结果就是这个判断结果。这个函数使用发射的数据作为参数,内部判断所有的数据是否满足我们定义好的判断条件,如果全部都满足则返回true,否则就返回false。
技术分享
Amb操作符可以将至多9个Observable结合起来,让他们竞争。哪个Observable首先发射了数据(包括onError和onComplete)就会继续发射这个Observable的数据,其他的Observable所发射的数据都会别丢弃。
技术分享
下面使用这两个操作符,对于all操作符我们做了这样的限制,初次使用的时候tag为false,会创建6个数字的Observable,以后都会创建5个数字的Observable。

private Observable<Boolean> allObserver() {
        Observable<Integer> just;
        if (tag) {
            just = Observable.just(1, 2, 3, 4, 5);
        } else {
            just = Observable.just(1, 2, 3, 4, 5, 6);
        }
        tag = true;
        return just.all(integer -> integer < 6);
    }

    private Observable<Integer> ambObserver() {
        Observable<Integer> delay3 = Observable.just(1, 2, 3).delay(3000, TimeUnit.MILLISECONDS);
        Observable<Integer> delay2 = Observable.just(4, 5, 6).delay(2000, TimeUnit.MILLISECONDS);
        Observable<Integer> delay1 = Observable.just(7, 8, 9).delay(1000, TimeUnit.MILLISECONDS);
        return Observable.amb(delay1, delay2, delay3);
    }

分别进订阅

mLButton.setText("all");
mLButton.setOnClickListener(e -> allObserver().subscribe(i -> log("all:" + i)));
mRButton.setText("amb");
mRButton.setOnClickListener(e -> ambObserver().subscribe(i -> log("amb:" + i)));

运行结果如下所示。第一次返回的6个数据的Observable不满足所有都小于6的条件,所以结果是false,以后的都满足条件,所以结果是true。使用amb操作符的Observable,第一个发射的数据的是7,所以输出了7,8,9,其他的数据都丢弃了。
技术分享

二、Contains、IsEmpty
Contains操作符用来判断源Observable所发射的数据是否包含某一个数据,如果包含会返回true,如果源Observable已经结束了却还没有发射这个数据则返回false。
IsEmpty操作符用来判断源Observable是否发射过数据,如果发射过就会返回false,如果源Observable已经结束了却还没有发射这个数据则返回true。
技术分享技术分享
用这两个操作符来判断一下两个Observable对象是否包含某个数据及是否为空

private Observable<Boolean> containsObserver() {
        if (tag) {
            return Observable.just(1, 2, 3).contains(3);
        }
        tag = true;
        return Observable.just(1, 2, 3).contains(4);
    }

    private Observable<Boolean> defaultObserver() {
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onCompleted();
            }
        }).isEmpty();
    }

分别进行订阅

mLButton.setText("contains");
mLButton.setOnClickListener(e -> containsObserver().subscribe(i -> log("contains:" + i)));
mRButton.setText("isEmpty");
mRButton.setOnClickListener(e -> defaultObserver().subscribe(i -> log("isEmpty:" + i)));

运行结果如下
技术分享

三、DefaultIfEmpty
DefaultIfEmpty操作符会判断源Observable是否发射数据,如果源Observable发射了数据则正常发射这些数据,如果没有则发射一个默认的数据
技术分享
下面我们用这个操作符来处理一个空的和一个非空的Observable,如果为空的话就返回默认值10

private Observable<Integer> emptyObserver() {
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onCompleted();
            }
        }).defaultIfEmpty(10);
    }

    private Observable<Integer> notEmptyObserver() {
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onCompleted();
            }
        }).defaultIfEmpty(10);
    }

分别进行订阅

mLButton.setText("empty");
mLButton.setOnClickListener(e -> emptyObserver().subscribe(i -> log("empty:" + i)));
mRButton.setText("notEmpty");
mRButton.setOnClickListener(e -> notEmptyObserver().subscribe(i -> log("notEmpty:" + i)));

运行结果如下
技术分享

四、SequenceEqual
SequenceEqual操作符用来判断两个Observable发射的数据序列是否相同(发射的数据相同,数据的序列相同,结束的状态相同),如果相同返回true,否则返回false
技术分享
下面用SequenceEqual分别来判断两个相同的和不相同的Observable

private Observable<Boolean> equalObserver() {
        return Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3));
    }

    private Observable<Boolean> notEqualObserver() {
        return Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2));
    }

分别进行订阅

mLButton.setText("equal");
mLButton.setOnClickListener(e -> equalObserver().subscribe(i -> log("equal:" + i)));
mRButton.setText("notequal");
mRButton.setOnClickListener(e -> notEqualObserver().subscribe(i -> log("notequal:" + i)));

运行结果如下
技术分享

五、SkipUntil、SkipWhile

这两个操作符都是根据条件来跳过一些数据,不同之处在于SkipUnitl是根据一个标志Observable来判断的,当这个标志Observable没有发射数据的时候,所有源Observable发射的数据都会被跳过;当标志Observable发射了一个数据,则开始正常地发射数据。SkipWhile则是根据一个函数来判断是否跳过数据,当函数返回值为true的时候则一直跳过源Observable发射的数据;当函数返回false的时候则开始正常发射数据。
技术分享技术分享
下面使用这两个操作符来跳过一些数据项。

private Observable<Long> skipUntilObserver() {
        return Observable.interval(1, TimeUnit.SECONDS).skipUntil(Observable.timer(3, TimeUnit.SECONDS));
    }

    private Observable<Long> skipWhileObserver() {
        return Observable.interval(1, TimeUnit.SECONDS).skipWhile(aLong -> aLong < 5);
    }

分别进行订阅

mLButton.setText("skipUntil");
mLButton.setOnClickListener(e -> skipUntilObserver().subscribe(i -> log("skipUntil:" + i)));
mRButton.setText("skipWhile");
mRButton.setOnClickListener(e -> skipWhileObserver().subscribe(i -> log("skipWhile:" + i)));

运行结果如下
技术分享

六、TakeUntil、TakeWhile

TakeUntil和TakeWhile操作符可以说和SkipUnitl和SkipWhile操作符是完全相反的功能。TakeUntil也是使用一个标志Observable是否发射数据来判断,当标志Observable没有发射数据时,正常发射数据,而一旦标志Observable发射过了数据则后面的数据都会被丢弃。TakeWhile则是根据一个函数来判断是否发射数据,当函数返回值为true的时候正常发射数据;当函数返回false的时候丢弃所有后面的数据。

技术分享技术分享

下面使用这两个操作符来take两个Observable发射的数据

private Observable<Long> takeUntilObserver() {
        return Observable.interval(1, TimeUnit.SECONDS).takeUntil(Observable.timer(3, TimeUnit.SECONDS));
    }

    private Observable<Long> takeWhileObserver() {
        return Observable.interval(1, TimeUnit.SECONDS).takeWhile(aLong -> aLong < 5);
    }

分别进行订阅

mLButton.setText("takeUntil");
mLButton.setOnClickListener(e -> takeUntilObserver().subscribe(i -> log("takeUntil:" + i)));
mRButton.setText("takeWhile");
mRButton.setOnClickListener(e -> takeWhileObserver().subscribe(i -> log("takeWhile:" + i)));

运行结果如下
技术分享

关于条件和布尔操作符就到这了,本文中所有的demo程序见:https://github.com/Chaoba/RxJavaDemo

Android RxJava使用介绍(四) RxJava的操作符

标签:

原文地址:http://blog.csdn.net/jdsjlzx/article/details/51489061

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!