码迷,mamicode.com
首页 > 编程语言 > 详细

Rxjava2 Observable的创建详解及实例

时间:2019-12-31 23:15:33      阅读:72      评论:0      收藏:0      [点我收藏+]

标签:exe   error:   有序   react   部分   测试   重要   ide   版本   

简要:

几种主要的需求

  • 直接创建一个Observable(创建操作)
  • 组合多个Observable(组合操作)
  • 对Observable发射的数据执行变换操作(变换操作)
  • 从Observable发射的数据中取特定的值(过滤操作)
  • 转发Observable的部分值(条件/布尔/过滤操作)
  • 对Observable发射的数据序列求值(算术/聚合操作)

创建Observable的各种方式

  • create():使用一个函数从头创建一个Observable
  • defer():只有当订阅者订阅才创建Observable;为每个订阅创建一个新的 Observable
  • empty() :创建一个什么都不做直接通知完成的Observable
  • never():创建一个不发射任何数据的Observable
  • error():—创建一个什么都不做直接通知错误的Observable
  • just():将一个或多个对象转换成发射这个或这些对象的一个Observable
  • from():将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
  • repeat():创建一个重复发射指定数据或数据序列的Observable
  • repeatWhen() :创建一个重复发射指定数据或数据序列的Observable,它依赖于另一 个Observable发射的数据
  • repeatUntil():根据条件(函数BooleanSupplier)判断是否需要继续订阅
  • range():创建一个发射指定范围的整数序列的Observable
  • interval():创建一个按照给定的时间间隔发射整数序列的Observable
  • timer():—创建一个在给定的延时之后发射单个数据的Observable

1. Create

使用 Create 操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数可以调用观察者的 onNextonErroronCompleted 方法,当发生订阅的时候会自动调用观察者的 onSubscribe 方法。

通过 Subscribe 进行Observable 与 Observer 的订阅,其中 subscribe 方法可以接收一个完整通知参数的 Observer 对象,也可以接收部分通知参数的 Consumer(接收数据) 或者 Action (仅接收通知) 对象。

技术图片

实例代码:

    // 创建Observable(被观察者)
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {

        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("Hello");
            emitter.onNext("World");
            emitter.onComplete();
        }
    });
    
    // 创建Observer(观察者), 可以接受所有通知
    Observer<String> observer = new Observer<String>() {

        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe");
        }

        public void onNext(String t) {
            System.out.println("--> onNext = " + t);
        }

        public void onError(Throwable e) {
            System.out.println("--> onError");
        }

        public void onComplete() {
            System.out.println("--> onComplete");
        }
    };
    
    // 创建只接受 onNext(item) 通知的Consumer(观察者)
    Consumer<String> nextConsumer = new Consumer<String>() {

        @Override
        public void accept(String t) throws Exception {
            System.out.println("--> accept nextConsumer: " + t);
        }
    };
    
    // 创建只接受 onError(Throwable) 通知的Consumer(观察者)
    Consumer<Throwable> errorConsumer = new Consumer<Throwable>() {

        @Override
        public void accept(Throwable t) throws Exception {
            System.out.println("-- accept errorConsumer: " + t);
        }
    };
    
    // 创建只接受 onComplete() 通知的Action(观察者)
    Action completedAction = new Action() {
        
        @Override
        public void run() throws Exception {
            System.out.println("--> run completedAction");
        }
    };
    
    // 创建只接受 onSubscribe 通知的Consumer(观察者)
    Consumer<Disposable> onSubscribeComsumer = new Consumer<Disposable>() {

        @Override
        public void accept(Disposable t) throws Exception {
            System.out.println("--> accept onSubscribeComsumer ");
        }
    };

    // 1. 进行订阅,subscribe(Observer)
    observable.subscribe(observer);
    
    System.out.println("---------------------------------------------");
    // 2. 进行订阅,subscribe(Consumer onNext)
    observable.subscribe(nextConsumer);
    
    System.out.println("---------------------------------------------");
    // 3. 进行订阅,subscribe(Consumer onNext, Consumer onError)
    observable.subscribe(nextConsumer, errorConsumer);
    
    System.out.println("---------------------------------------------");
    // 4. 进行订阅,subscribe(Consumer onNext, Consumer onError, Action onCompleted)
    observable.subscribe(nextConsumer, errorConsumer, completedAction);
    
    System.out.println("---------------------------------------------");
    // 5. 进行订阅,subscribe(Consumer onNext, Consumer onError, Action onCompleted, Consumer onSubscribe)
    observable.subscribe(nextConsumer, errorConsumer, completedAction, onSubscribeComsumer);

输出:

--> onSubscribe
--> onNext = Hello
--> onNext = World
--> onComplete
---------------------------------------------
--> accept nextConsumer: Hello
--> accept nextConsumer: World
---------------------------------------------
--> accept nextConsumer: Hello
--> accept nextConsumer: World
---------------------------------------------
--> accept nextConsumer: Hello
--> accept nextConsumer: World
--> run completedAction
---------------------------------------------
--> accept onSubscribeComsumer 
--> accept nextConsumer: Hello
--> accept nextConsumer: World
--> run completedAction

注意:create 方法默认不在任何特定的调度器上执行。

2. Defer

直到有观察者订阅时才创建 Observable,并且为每个观察者创建一个新的 Observable.

Defer 操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个 Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个 Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。

技术图片

实例代码:

    // 创建一个Defer类型的Observable
    Observable<Integer> deferObservable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
        public ObservableSource<? extends Integer> call() throws Exception {
            // 创建每个观察者订阅所返回的 Observable
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onNext(4);
                    emitter.onNext(5);
                    emitter.onComplete();
                }
            });
            return observable;
        }
    });
    
    // 创建第一个观察者并订阅defer Observable
    deferObservable.subscribe(new Consumer<Integer>() {
    
        public void accept(Integer t) throws Exception {
            System.out.println("No.1 --> accept = " + t);
        }
    });
    
    // 创建第二个观察者并订阅defer Observable
    deferObservable.subscribe(new Consumer<Integer>() {
    
        public void accept(Integer t) throws Exception {
            System.out.println("No.2 --> accept = " + t);
        }
    });
    
    // 创建第三个观察者并订阅defer Observable
    deferObservable.subscribe(new Consumer<Integer>() {
    
        public void accept(Integer t) throws Exception {
            System.out.println("No.3 --> accept = " + t);
        }
    });

输出:

No.1 --> accept = 1
No.1 --> accept = 2
No.1 --> accept = 3
No.1 --> accept = 4
No.1 --> accept = 5
No.2 --> accept = 1
No.2 --> accept = 2
No.2 --> accept = 3
No.2 --> accept = 4
No.2 --> accept = 5
No.3 --> accept = 1
No.3 --> accept = 2
No.3 --> accept = 3
No.3 --> accept = 4
No.3 --> accept = 5

注意:defer 方法默认不在任何特定的调度器上执行。
Javadoc: defer(Func0)

3. Empty/Never/Error

Empty:创建一个不发射任何数据但是正常终止的Observable
Never:创建一个不发射数据也不终止的Observable
Error:创建一个不发射数据以一个错误终止的Observable

这三个操作符生成的 Observable 行为非常特殊和受限,多用于一些特殊的场景(某些操作状态异常后返回一个error、empty、never 的 Observable)。测试的时候很有用,有时候也用于结合其它的 Observables,或者作为其它需要 Observable 的操作符的参数。

实例代码:

    System.out.println("--> 1 -----------------------------------");
    // 1.  创建一个不发射任何数据但是正常终止的Observable
    Observable.empty()
        .subscribe(new Observer<Object>() {
    
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }
    
            @Override
            public void onNext(Object t) {
                System.out.println("onNext: " + t);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("onError: " + e);
            }
    
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
    
    System.out.println("--> 2 -----------------------------------");
    // 2.  创建一个不输出数据,并且不会终止的Observable
    Observable.never()
        .subscribe(new Observer<Object>() {
    
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }
    
            @Override
            public void onNext(Object t) {
                System.out.println("onNext: " + t);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("onError: " + e);
            }
    
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
    
    System.out.println("--> 3 -----------------------------------");
    // 3.  创建一个不发射数据以一个错误终止的Observable
    Observable.error(new NullPointerException("error test"))
        .subscribe(new Observer<Object>() {
    
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }
    
            @Override
            public void onNext(Object t) {
                System.out.println("onNext: " + t);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("onError: " + e);
            }
    
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

输出:

--> 1 -----------------------------------
onSubscribe
onComplete
--> 2 -----------------------------------
onSubscribe
--> 3 -----------------------------------
onSubscribe
onError: java.lang.NullPointerException: error test

注意

  • RxJava将这些操作符实现为 empty,never和 error。
  • error 操作符需要一 个 Throwable参数,你的Observable会以此终止。
  • 这些操作符默认不在任何特定的调度器上执行,但是 empty 和 error 有一个可选参数是Scheduler,如果你传递了Scheduler参数,它 们会在这个调度器上发送通知.
    Javadoc: empty()
    Javadoc: never()
    Javadoc: error(java.lang.Throwable)

4. Just

创建一个发射指定值的Observable。

Just 将单个数据转换为发射那个数据的Observable。类似于From,但是From会将数组或Iterable的数据取出然后逐个发射,而Just只是简单的原样发射,将数组或Iterable当做单个数据。

注意: 如果你传递 nullJust,它会返回一个发射 null 值的 Observable。不要误认为它会返回一个空Observable(完全不发射任何数据的Observable),如果需要空Observable你应该使用Empty操作符。
技术图片

实例代码:

    // 单个对象发送
    Observable.just(1)
            .subscribe(new Consumer<Integer>() {

                public void accept(Integer t) throws Exception {
                    System.out.println("--> singe accept: " + t);
                }
            });
    
    System.out.println("---------------------------------");
    // 多个对象发送,内部实际使用from实现 (接受一至九个参数,返回一个按参数列表顺序发射这些数据的Observable)
    Observable.just(1, 2, 3, 4, 5)
            .subscribe(new Consumer<Integer>() {

                public void accept(Integer t) throws Exception {
                    System.out.println("--> mutil accept: " + t);
                }
            });

输出:

--> singe accept: 1
---------------------------------
--> mutil accept: 1
--> mutil accept: 2
--> mutil accept: 3
--> mutil accept: 4
--> mutil accept: 5

Javadoc: just(item ...)

5. From

将其它种类的对象和数据类型转换为Observable,发射来自对应数据源数据类型的数据,在RxJava中,from 操作符可以转换 FutureIterable数组。对于Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据。
技术图片

实例代码:

        // 初始化数据
        Integer[] array = { 1, 2, 3, 4, 5, 6 };
        List<String> iterable = new ArrayList<String>();
        iterable.add("A");
        iterable.add("B");
        iterable.add("C");
        iterable.add("D");
        iterable.add("E");
        
        // 1. fromArray
        Observable.fromArray(array).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept(1):fromArray: " + t);
            }
        });
        
        System.out.println("---------------------------------------");
        // 2. fromIterable
        Observable.fromIterable(iterable)
            .subscribe(new Consumer<String>() {

                @Override
                public void accept(String t) throws Exception {
                    System.out.println("--> accept(2) fromIterable: " + t);
                }
            });
        
        System.out.println("---------------------------------------");
        // 3. fromCallable
        Observable.fromCallable(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                return 1;
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept(3): fromCallable: " + t);
            }
        });
        
        System.out.println("---------------------------------------");
        // 4. fromFuture
        Observable.fromFuture(new Future<String>() {

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return false;
            }

            @Override
            public String get() throws InterruptedException, ExecutionException {
                System.out.println("--> fromFutrue: get()");
                return "hello";
            }

            @Override
            public boolean isCancelled() {
                return false;
            }

            @Override
            public boolean isDone() {
                return false;
            }

            @Override
            public String get(long timeout, TimeUnit unit)
                    throws InterruptedException, ExecutionException, TimeoutException {
                return null;
            }
        }).subscribe(new Consumer<String>() {

            @Override
            public void accept(String t) throws Exception {
                System.out.println("--> accept(4): fromFuture: " + t);
            }
        });

输出:

--> accept(1):fromArray: 1
--> accept(1):fromArray: 2
--> accept(1):fromArray: 3
--> accept(1):fromArray: 4
--> accept(1):fromArray: 5
--> accept(1):fromArray: 6
---------------------------------------
--> accept(2) fromIterable: A
--> accept(2) fromIterable: B
--> accept(2) fromIterable: C
--> accept(2) fromIterable: D
--> accept(2) fromIterable: E
---------------------------------------
--> accept(3): fromCallable: 1
---------------------------------------
--> fromFutrue: get()
--> accept(4): fromFuture: hello

注意:from默认不在任何特定的调度器上执行。然而你可以将Scheduler作为可选的第二个参数传递给Observable,它会在那个调度器上管理这个Future。
Javadoc: from(array)
Javadoc: from(Iterable)
Javadoc: from(Callable)
Javadoc: from(Future)
Javadoc: from(Future,Scheduler)
Javadoc: from(Future,timeout,timeUnit)

6. Repeat

创建一个发射特定数据重复多次的Observable,它不是创建一个Observable,而是重复发射原始 Observable的数据序列,这个序列或者是无限的,或者通过 repeat(n) 指定重复次数。
技术图片

实例代码:

    // 1. repeat(): 一直重复发射原始 Observable的数据序列
    Observable.range(1, 5)
            .repeat()
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept(1): " + t);
                }
            });
    
    System.out.println("----------------------------------------");
    // 2. repeat(n): 重复执行5次
    Observable.range(1, 2)
            .repeat(3)
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
            });

输出:

--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
......
----------------------------------------
--> accept(2): 1
--> accept(2): 2
--> accept(2): 1
--> accept(2): 2
--> accept(2): 1
--> accept(2): 2

注意: repeat 操作符默认在 trampoline 调度器上执行。有一个变体可以通过可选参数指定 Scheduler。
Javadoc: repeat()
Javadoc: repeat(long)
Javadoc: repeat(Scheduler)
Javadoc: repeat(long,Scheduler)

7. RepeatWhen

repeatWhen的操作符,它不是缓存和重放原始 Observable 的数据序列,接收到原始 Observable 终止通知后,有条件的决定是否重新订阅原来的 Observable 。

将原始 Observable 的终止通知(完成或错误)当做一个 void 数据传递给一个通知处理器,它以此来决定是否要重新订阅和发射原来的 Observable。这个通知处理器就像一个 Observable 操作符,接受一个发射 void通知的 Observable为输入,返回一个发射 void 数据(意思是,重新订阅和发射原始 Observable)或者直接终止(意思是,使用 repeatWhen 终止发射数据)的 Observable。

技术图片

实例代码:

    // repeatWhen(Func1()):接收到终止通知后,在函数中决定是否重新订阅原来的Observable
    // 需要注意的是repeatWhen的objectObservable处理(也可以单独自定义Observable返回),这里使用flathMap进行处理,
    // 让它延时发出onNext,这里onNext发出什么数据都不重要,它只是仅仅用来处理重订阅的通知,如果发出的是onComplete/onError,则不会触发重订阅
    Observable.range(1, 2)
            .doOnComplete(new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("-----------> 完成一次订阅");
                }
            }).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                private int n = 0;
                
                @Override
                public ObservableSource<?> apply(Observable<Object> t) throws Exception {
                    // 接收到原始Observable的终止通知,决定是否重新订阅
                    System.out.println("--> apply repeat ");
                    return t.flatMap(new Function<Object, ObservableSource<?>>() {

                        @Override
                        public ObservableSource<?> apply(Object t) throws Exception {
                            if(n < 3) { // 重新订阅3次
                                n ++;
                                return Observable.just(0);   
                            } else {
                                return Observable.empty();
                            }
                        }
                    });
                    // return Observable.timer(1, TimeUnit.SECONDS);        // 间隔一秒后重新订阅一次
                    // return Observable.interval(1, TimeUnit.SECONDS); // 每间隔一秒重新订阅一次
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept: " + t);
                }
            });

输出:

--> apply repeat 
--> accept: 1
--> accept: 2
-----------> 完成一次订阅
--> accept: 1
--> accept: 2
-----------> 完成一次订阅
--> accept: 1
--> accept: 2
-----------> 完成一次订阅
--> accept: 1
--> accept: 2
-----------> 完成一次订阅

注意:repeatWhen操作符默认在 trampoline 调度器上执行。

Javadoc: repeatWhen(Func1)

8. RepeatUntil

根据条件(函数BooleanSupplier)判断是否需要继续订阅: false:继续订阅; true:取消订阅
技术图片

实例代码:

    // repeatUntil 根据条件(BooleanSupplier)判断是否需要继续订阅
    Observable.range(1, 2)
            .doOnComplete(new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("-----------> 完成一次订阅");
                }
            }).repeatUntil(new BooleanSupplier() {

                private int n = 0;

                @Override
                public boolean getAsBoolean() throws Exception {
                    System.out.println("getAsBoolean = " + (n < 3? false:true) );
                    // 是否需要终止
                    if (n < 3) {
                        n++;
                        return false;   // 继续重新订阅
                    }
                    return true;        // 终止重新订阅
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept: " + t);
                }
            });

输出:

--> accept: 1
--> accept: 2
-----------> 完成一次订阅
getAsBoolean = false
--> accept: 1
--> accept: 2
-----------> 完成一次订阅
getAsBoolean = false
--> accept: 1
--> accept: 2
-----------> 完成一次订阅
getAsBoolean = false
--> accept: 1
--> accept: 2
-----------> 完成一次订阅
getAsBoolean = true

Javadoc: repeatWhen(Func1)

9. Range

创建一个发射特定整数序列的Observable。

Range操作符发射一个范围内的有序整数序列,你可以指定范围的起始和长度。

RxJava将这个操作符实现为 range 函数,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。如果你将第二个参数设为0,将导致Observable不发射任何数据(如果设置 为负数,会抛异常)。
技术图片

实例代码:

    // 1. range(n,m) 发射从n开始的m个整数序列,序列区间[n,n+m-1)
    Observable.range(0, 5)
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("-- accept(range): " + t);
                }
            });
    
    System.out.println("------------------------------");
    // 2. rangeLong(n,m) 发射从n开始的m个长整型序列,序列区间[n,n+m-1)
    Observable.rangeLong(1, 5)
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("-- accept(rangeLong): " + t);
                }
            });

输出:

-- accept(range): 0
-- accept(range): 1
-- accept(range): 2
-- accept(range): 3
-- accept(range): 4
------------------------------
-- accept(rangeLong): 1
-- accept(rangeLong): 2
-- accept(rangeLong): 3
-- accept(rangeLong): 4
-- accept(rangeLong): 5

Javadoc: range(int start,int count)
Javadoc: rangeLong(long start, long count)

10. interval

创建一个按固定时间间隔发射整数序列的Observable,它按固定的时间间隔发射一个无限递增的整数序列。
RxJava将这个操作符实现为 interval 方法。它接受一个表示时间间隔的参数和一个表示时间单位的参数。
技术图片

实例代码:

    // [1] interval(long period, TimeUnit unit) 
    // 每间隔period时间单位,发射一次整数序列
    Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {

                public void accept(Long l) throws Exception {
                    System.out.println("--> accept(1): " + l);
                }
            });

    System.out.println("------------------------------------");
    // [2] interval(long initialDelay, long period, TimeUnit unit)
    // 在延迟initialDelay秒后每隔period时间单位发射一个整数序列
    Observable.interval(0, 1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {

                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
            });
    
    System.out.println("------------------------------------");
    // [3] intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
    // 延迟initialDelay秒后从起始数据start开始,每隔period秒发送一个数字序列,一共发送count个数据
    Observable.intervalRange(1, 5, 3, 2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {

                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(3): " + t);
                }
            });

注意:interval 默认在 computation 调度器上执行, 有一个变体可以通过可选参数指定 Scheduler。
Javadoc: interval(long period, TimeUnit unit)
Javadoc: interval(long period, TimeUnit unit, Scheduler scheduler)
Javadoc: interval(long initialDelay, long period, TimeUnit unit)
Javadoc: interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
Javadoc: intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
Javadoc: intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

输出:

--> accept(1): 0
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
...
------------------------------------
--> accept(2): 0
--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
--> accept(2): 4
--> accept(2): 5
...
------------------------------------
--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
--> accept(3): 4
--> accept(3): 5

11. Timer

创建一个给定的延迟后发射一个特殊的值的Observable。

RxJava将这个操作符实现为 timer 函数。timer 返回一个Observable,它在延迟一段给定的时间后发射一个简单的数字0
技术图片

实例代码:

    // timer(long delay, TimeUnit unit, Scheduler scheduler)
    // 定时delay时间 单位后发送数字0,指定可选参数Schedule调度器为trampoline(当前线程排队执行)
    Observable.timer(5, TimeUnit.SECONDS, Schedulers.trampoline())
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept: " + t);
                }
            });

输出:

--> accept: 0

注意:timer 操作符默认在 computation 调度器上执行。有一个变体可以通过可选参数指定 Scheduler。
Javadoc: timer(long delay, TimeUnit unit)
Javadoc: timer(long delay, TimeUnit unit, Scheduler scheduler)

小结

根据实际情况,使用不同的方式创建不同种类的Observable,这个在开发中非常有用,可以减少很多重复、复杂、冗余的操作,可以快速的创建一个符合要求的Observable,一定程度上提高了开发的效率。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

实例代码:

Rxjava2 Observable的创建详解及实例

标签:exe   error:   有序   react   部分   测试   重要   ide   版本   

原文地址:https://www.cnblogs.com/jiangming-blogs/p/12127246.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!