码迷,mamicode.com
首页 > 编程语言 > 详细

RxJava源码初探

时间:2016-05-18 19:35:04      阅读:287      评论:0      收藏:0      [点我收藏+]

标签:

Demo分析

响应式编程的概念现在火的一塌糊涂,各种RxXXX库层出不穷,虽然这些库的实现语言各不相同,但是原理都是一样的。我的理解是这些库主要都包含三个东西:Observable, OnSubscribe, Subscriber。阅读本文的读者必须懂的这些概念,初学者建议看下RxJava专题 上的文章再来看本文。我们就从源码层级来分析一下这中间的事件流,线程切换是怎么个原理。这里交代下本文分析的RxJava的版本是1.1.0 先来看个简单的Demo实例

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("hello world");
            subscriber.onCompleted();   
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(String s) {
            }
        });

上面的demo就是简单的创建一个Observable,然后触发事件流。ok, 我们一步一步来分析。我们的Demo先是调用Observabale的create方法,而create方法其实内部就是调用Observale的构造函数来实例化一个Observable对象。再然后就是调用subscrible方法

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // 此处省略一堆验证参数的代码

        // new Subscriber so onStart it
        subscriber.onStart();

        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

创建完Observable对象后,就会调用subscribe方法触发事件流。subscribe方法会先做一些参数验证,然后调用subscriber.onStart()方法,再然后调用我们在创建Observable的时候传递进去的OnSubscribe对象的call方法来触发Subscriber对象的onNext, onComplete 或者onError方法。用个图来表示。
技术分享
这里为了方便都采用了简写
* Observable ———> Ob
* OnSubscribe ———> OnSub
* Subscriber———> Sub
图中的箭头表示调用关系,上图中的箭头表示OnSub的call方法的调用,而call方法中又会调用Sub对象的方法onNext, onError或者onComplete方法。

map/lift方法分析

分析map方法和分析lift方法是一样的,弄懂了一个,另外一个自然也就懂了。ok, 先看下Observable的map方法的代码

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

可以看到map方法实质上就是调用lift方法实现的,这里我们先要知道Func1<? super T, ? extends R> 和OperatorMap<T, R>对象充当什么样的角色。先看下Func1的代码

public interface Func1<T, R> extends Function {
    R call(T t);
}

Func1是一个接口,它只有一个方法, 传入一个T类型的对象,返回一个R类型的对象,所以它是一个转换器。调用它的call方法可以将第一个泛型类型的对象T转换成第二个泛型类型的对象R。
那OperatorMap呢?看下代码

public final class OperatorMap<T, R> implements Operator<R, T> {

    private final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }

        };
    }

看这段代码我们知道OperatorMap<R, T> 实现了Operator< T, R>接口,这里要注意两个泛型 T 和 R 在OperatorMap和Operator里面的顺序是对换的。先来看下Operator接口的代码

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }

在这里大家不要去记住泛型T,泛型R啊什么的, 我们按顺序来区分它们,就记住第一个泛型对象,第二个泛型对象。上面我们说道Func1是个转换器, 那Operator也是个转换器咯,不过它不是把第一个泛型的对象转换成第二个泛型的对象, 而是将Susbcriber<第一个泛型>对象转换成Sbscriber<第二个泛型>对象。再根据OperatorMap<T, R> implements Operator<R, T> OperatorMap<R, T> 和Operator< T, R>的泛型顺序是相反的可知,OperatorMap<T, R >转换器的功能是将Subscriber<第二个泛型>对象转换成Subscriber<第一个泛型>对象。 再来看下lift方法

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
               // 此处省略无关代码
                        st.onError(e);
                    }
                } catch (Throwable e) {
                   // 此处省略无关代码
                    o.onError(e);
                }
            }
        });
    }

lift方法就是创建了一个Observable<R>对象并返回。每个Observable<R>对象必须有一个OnSubscribe<R>对象,所以lift方法中也实例化了一个OnSubscribe<R>对象并传递给Observable<R>。该OnSubscribe对象的call方法又会调用OperatorMap的call方法创建一个Subscriber,最后组成了一个Observable对象返回。那么一个简单的demo加上一个map方法的图就是这样的。
技术分享
上面的Ob<T>图我们已经见过了,下面的Ob<R>就是lift创建并返回的,从lift代码中我们知道Observable<R>中的OnSubscribe<R>对象的call(Subscriber<R> subscriber)方法会调用原始OnSubscribe<T>对象的call(Subscriber<T> subscriber)方法。在简单domo中,Subscriber<T>对象是我们创建并调用Observable<T>的subscribe方法传递进去的,那这里的Subscriber<T>是哪来的呢。答案就是OperatorMap的call方法,该call方法会将一个Subscriber<R>对象转换成Subscriber<T>对象。 新创建的Subscriber<T>对象的onNext, onError或者onComplete方法都会调用Subscriber<R>的相应方法。这也是上图中右边的由Sub<T>指向Sub<R>的表示意思。我们调用map方法时需要传入的Func1对象的call方法也是在Subscriber<T>的onNext方法中被调用。 那这个Subscriber<R>对象自然就是我们在代码实例化并通过subscribe方法传递给新的Observable<R>对象的参数咯。这里说下图中的虚线和实线的意思,虚线表示OnSubscribe对象的call方法调用, 实线表示Subscriber对象的onNext, onError或者onComplete方法的调用。

flatMap方法分析

先上代码

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
    }

可以看到有两条路径可以走,第一条是当原始Observable是ScalarSynchronousObservable对象的时候才会执行的。我能找到的只有Observable的just方法才会创建ScalarSynchornousObservable对象,其他的都是直接new一个Observable对象。那我们先来分析原始Observable<T>是调用just方法创建的情况,即第一条路径ScalarSynchronousObservable.scalarFlatMap(func)的执行情况。

public <R> Observable<R> scalarFlatMap(final Func1<? super T, ? extends Observable<? extends R>> func) {
        return create(new OnSubscribe<R>() {
            @Override
            public void call(final Subscriber<? super R> child) {
                Observable<? extends R> o = func.call(t);
                if (o.getClass() == ScalarSynchronousObservable.class) {
                    child.onNext(((ScalarSynchronousObservable<? extends R>)o).t);
                    child.onCompleted();
                } else {
                    o.unsafeSubscribe(new Subscriber<R>(child) {
                        @Override
                        public void onNext(R v) {
                            child.onNext(v);
                        }
                        @Override
                        public void onError(Throwable e) {
                            child.onError(e);
                        }
                        @Override
                        public void onCompleted() {
                            child.onCompleted();
                        }
                    });
                }
            }
        });
    }

ScalarSynchronousObservable的scalarFlatMap方法创建了一个新的Observable<R>对象。算上原始Observable<T>对象和我们Func1对象的call函数创建的Observable<R>对象,我们现在一共有三个Observable对象。
如果我们提供的Func1对象的call函数中创建Observable<R>对象的时候是调用Observable.just方法创建的,则if里面的条件成立,那么会执行

Observable<? extends R> o = func.call(t);
if (o.getClass() == ScalarSynchronousObservable.class) {
                    child.onNext(((ScalarSynchronousObservable<? extends R>)o).t);
                    child.onCompleted();
                } 

这里要注意下func.call(t)的t对象是原始Observable.just(T t)方法传递进去的t。而ScalarSynchronousObservable<? extends R>)o).t的t是我们在Func1.call方法里面用just(T t)方法创建Observable对象时传递进去的。
技术分享
最后的图就是这样了。原始的Observable<T>已经没什么用了,它就当做一个提供数据T的容器,被Func1.call方法用来取数据Observable<T>.t。Func1.call方法将T对象转换成Observable<R>对象,该Obersvable<R>也没什么用,只是当做数据R的容器,被最后的Observable<R>里面的OnSubscrib<R>.call方法调用到,用来获取R数据对象并传递给Sub<R>.onNext(R r)方法。
ok, 如果Func1.call方法里面提供的Observable<R>不是通过just创建的,那if条件就不成立,那代码就会走else里面的

Observable<? extends R> o = func.call(t);
o.unsafeSubscribe(new Subscriber<R>(child) {
                        @Override
                        public void onNext(R v) {
                            child.onNext(v);
                        }
                        @Override
                        public void onError(Throwable e) {
                            child.onError(e);
                        }
                        @Override
                        public void onCompleted() {
                            child.onCompleted();
                        }
                    }

那么最后的图就是这样的
技术分享
上图中标明依赖 字样的虚线表示依赖关系,并不是事件流。那原始Observable<T>不是通过just创建的呢,是普通的Observable对象,在flatMap函数中代码就是走merge(map(func))。map函数我们已经知道是怎么回事了,所以先上执行完map的图
技术分享
再看下merge函数

public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
        if (source.getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
        }
        return source.lift(OperatorMerge.<T>instance(false));
    }

代码将会执行source.lift方法。为什么不会执行上面if里面的代码?因为经过map函数调用生成的Ob<Ob<R>>不是ScalarSynchronousObservable类型的对象。lift方法的图谱我们在分析map方法时已经知道了怎么画,直接上图再解释
技术分享
一个完整的图就是上面那样的了,问题的关键点是Sub<Ob<R>>是如何触发Sub<R>的事件的,即右下角的那条实线箭头是怎么来的。在map方法的分析中,我们知道决定Sub<Ob<R>>触发Sub<R>事件的代码在Operator中。在这里就是OperatorMerge的call方法。

@Override
    public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
        MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
        MergeProducer<T> producer = new MergeProducer<T>(subscriber);
        subscriber.producer = producer;

        child.add(subscriber);
        child.setProducer(producer);

        return subscriber;
    }

当MergeSubscriber的onNext, onComplete或者onError方法被调用时,都会调用其emit方法, 而emit方法又会调用emitLoop方法,看下emitLoop方法

void emitLoop() {
           // 删除掉大量的无关的代码
            try {
                final Subscriber<? super T> child = this.child;
                for (;;) {       
                    Queue<Object> svq = queue;
                    long r = producer.get();
                    boolean unbounded = r == Long.MAX_VALUE;         
                    if (svq != null) {
                        for (;;) {
                            while (r > 0) {
                                o = svq.poll();
                                T v = nl.getValue(o);
                                try {
                                    child.onNext(v);
                                } catch (Throwable t) {
                                }
                                r--;
                            }
                        }
                     }
                 }
             }catch (Exception e){
             }
        }

在这里会多次触发child.onNext()事件,于是就是现实了由一个Subscriber.onNext(T t)事件裂变成多个Subscriber.onNext(R r)事件。具体整个流程相当麻烦,有兴趣的读者可以自己慢慢研究下。

observeOn方法分析

老规矩,先上源码

public final Observable<T> observeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler));
    }

跟flatMap很像,有两种情况,先分析第一种情况原始Observable<T>是由just方法创建的情况,just方法创建的是ScalrSynchronousObservable对象,所以代码走((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);

public Observable<T> scalarScheduleOn(Scheduler scheduler) {
        if (scheduler instanceof EventLoopsScheduler) {
            EventLoopsScheduler es = (EventLoopsScheduler) scheduler;
            return create(new DirectScheduledEmission<T>(es, t));
        }
        return create(new NormalScheduledEmission<T>(scheduler, t));
    }

这里可以看到根据Scheduler的类型又分为两种情况,不过都是创建一个新的Observable<T>对象。那么不同的就是两个OnSubscribe<T>对象了,一个是DirectScheduledEmission,另一个是NormalScheduleEmission。

static final class DirectScheduledEmission<T> implements OnSubscribe<T> {
        private final EventLoopsScheduler es;
        private final T value;
        DirectScheduledEmission(EventLoopsScheduler es, T value) {
            this.es = es;
            this.value = value;
        }
        @Override
        public void call(final Subscriber<? super T> child) {
            child.add(es.scheduleDirect(new ScalarSynchronousAction<T>(child, value)));
            //这个方法的代码看不懂,可以把它拆开看,拆开后如下
            //ScalarSynchronousAction<T> scalarAction = new ScalarSynchronousAction<T>(child, value);
            //Subscription subscription = es.scheduleDirect(scalrAction);
            //child.add(subscription);
        }
    }
    /** Emits a scalar value on a general scheduler. */
    static final class NormalScheduledEmission<T> implements OnSubscribe<T> {
        private final Scheduler scheduler;
        private final T value;

        NormalScheduledEmission(Scheduler scheduler, T value) {
            this.scheduler = scheduler;
            this.value = value;
        }

        @Override
        public void call(final Subscriber<? super T> subscriber) {
            Worker worker = scheduler.createWorker();
            subscriber.add(worker);
            worker.schedule(new ScalarSynchronousAction<T>(subscriber, value));
        }
    }
    /** Action that emits a single value when called. */
    static final class ScalarSynchronousAction<T> implements Action0 {
        private final Subscriber<? super T> subscriber;
        private final T value;

        private ScalarSynchronousAction(Subscriber<? super T> subscriber,
                T value) {
            this.subscriber = subscriber;
            this.value = value;
        }

        @Override
        public void call() {
            try {
                subscriber.onNext(value);
            } catch (Throwable t) {
                subscriber.onError(t);
                return;
            }
            subscriber.onCompleted();
        }
    }

ScalarSynchronousAction是一个Action0,它的call方法会触发subscriber的onNext, onError或者onComplete事件。而DirectScheduledEmission和NormalScheduledEmission都是OnSubscribe对象,它们的call方法中都会将ScalarSychronousAction放入一个线程池Scheduler中去执行。所以当Observable<T>是ScalarSynchronousObservable<T>的时候即通过just方法创建出来的时候,事件图是这样的。
技术分享
Scheduler是在OnSubscribe<T>的call方法中启动并切换线程的。
那么原始Observable<T>不是ScalrSynchronousObservable的时候,observeOn方法的代码走向是lift(new OperatorObserveOn(scheduler))。lift方法的事件图我们已经知道了,先上图
技术分享
lift方法里面会创建一个新的Observable<T>对象和一个新的OnSubscribe<T>对象,新的OnSubscribe<T>.call方法会调用原始的OnSubscribe<T>.call(Subscriber<T> subscriber)方法。这个Subscriber<T>参数对象是通过OperatorObserveOn对象的call方法获取的。

@Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
            parent.init();
            return parent;
        }
    }

当Scheduler是ImmediateScheduler或者TrampolineScheduler对象时,返回的是我们代码里面创建并传递给Observable.subscribe()方法的Subscriber对象而且这个过程并没有使用到Scheduler去切换线程,也就是说这个过程依然运行在当前线程。那么事件图应该是这样的。
技术分享
如果Scheduler不是ImmediateScheduler和TrampolineScheduler的话,OperatorObserveOn对象的call方法就会执行以下代码

@Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        //省略掉一些代码
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
            parent.init();
            return parent;

    }

这里要关注的就是ObserveOnSubscriber对象,它是一个Subscriber对象。ok,也看下代码

private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
        final Subscriber<? super T> child;
        final Scheduler.Worker recursiveScheduler;

        public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
            //省略掉无关代码
        }

        void init() {
            child.add(scheduledUnsubscribe);
            child.setProducer(new Producer() {

                @Override
                public void request(long n) {
                    BackpressureUtils.getAndAddRequest(requested, n);
                    schedule();
                }

            });
            child.add(recursiveScheduler);
            child.add(this);
        }

        @Override
        public void onStart() {
            request(RxRingBuffer.SIZE);
        }

        @Override
        public void onNext(final T t) {
            //省略掉无关代码
            schedule();
        }

        @Override
        public void onCompleted() {
            //省略掉无关代码
            schedule();
        }

        @Override
        public void onError(final Throwable e) {
           //省略掉无关代码
            schedule();
        }

        final Action0 action = new Action0() {

            @Override
            public void call() {
                pollQueue();
            }

        };

        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(action);
            }
        }

        // only execute this from schedule()
        void pollQueue() {
            //省略掉无关代码
            do {
                for (;;) {
                    if (finished) {
                        if ((error = this.error) != null) {
                            queue.clear();
                            child.onError(error);
                            return;
                        } else if (queue.isEmpty()) {
                            child.onCompleted();
                            return;
                        }
                    }
                    if (r > 0) {
                        Object o = queue.poll();
                        if (o != null) {
                            child.onNext(on.getValue(o));
                        } else {
                            break;
                        }
                    } else {
                        break;
                    }
                }
            if (emitted > 0) {
                request(emitted);
            }
        }
    }

可以看出当ObserveOnSubscriber对象的onNext, onError或者onComplete事件被触发时,它们会调用schedule方法,而schedule方法会将Action0加入线程池去执行,也就是说Action0的call方法执行的代码是在scheduler切换后的线程中执行,Action0.call方法会调用pollQueue去触发新的Subscriber<T>的onNext, onError或者onComplete事件。最后的事件图是这样的。
技术分享
上图中的 Scheduler作用于 表示 在Subscriber<T>对象的onNext, onError或者onComplete方法中调用Scheduler的方法去切换线程。

subscribeOn方法分析

先看subscriveOn方法的代码

public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return nest().lift(new OperatorSubscribeOn<T>(scheduler));
    }

当原始Observable<T>对象是ScalarSynchronousObservable对象时,即原始Observable<T>是由just方法创建的时候,代码和observeOn方法中情况一样,直接上事件流图
技术分享
Scheduler是在OnSubscribe<T>的call方法中启动并切换线程的。那么原始Observable<T>不是ScalrSynchronousObservable的时候,subscribeOn方法的代码走向是nest().lift(new OperatorSubscribeOn<T>(scheduler));
先看下nest方法的代码

public final Observable<Observable<T>> nest() {
        return just(this);
    }

public final static <T> Observable<T> just(final T value) {
        return ScalarSynchronousObservable.create(value);
    }

public final class ScalarSynchronousObservable<T> extends Observable<T> {

    public static final <T> ScalarSynchronousObservable<T> create(T t) {
        return new ScalarSynchronousObservable<T>(t);
    }

    private final T t;

    protected ScalarSynchronousObservable(final T t) {
        super(new OnSubscribe<T>() {

            @Override
            public void call(Subscriber<? super T> s) {
                s.onNext(t);
                s.onCompleted();
            }

        });
        this.t = t;
    }

    public T get() {
        return t;
    }
    //此处省略掉无关代码
}

nest方法其实是调用just方法创建一个ScalarSynchronousObservable对象。这里要注意的是just方法的参数是原始Observable<T>对象,执行完nest方法的图如下
技术分享
原始Observable<T>对象被当做一个属性保存在新创建出来的ScalarSynchronousObservable对象当中。以上就是nest方法执行完之后的情况,按照nest().lift(new OperatorSubscribeOn<T>(scheduler)); 的代码走向,我们来看下执行lift的时候事件流的图是怎么样的。先上一张没有分析OperatorSubscribeOn代码的图
技术分享
lift方法会创建一个Observable<T>对象,该对象的内部有一个OnSubscribe<T>对象,它的call方法会调用OperatorSubscribeOn.call方法得到一个Subscriber<Observable<T>>对象,并在调用上面的Observable<Observable<T>>.OnSubscribe<Observable<T>>.call方法时传递进去。那么来看下OperatorSubscribeOn.call是如何将一个Subscribe<T>转换成Subscriber<Observable<T>>对象的。

public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {

    private final Scheduler scheduler;

    public OperatorSubscribeOn(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        return new Subscriber<Observable<T>>(subscriber) {

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }

            @Override
            public void onNext(final Observable<T> o) {
                inner.schedule(new Action0() {

                    @Override
                    public void call() {
                        final Thread t = Thread.currentThread();
                        o.unsafeSubscribe(new Subscriber<T>(subscriber) {

                            @Override
                            public void onCompleted() {
                                subscriber.onCompleted();
                            }

                            @Override
                            public void onError(Throwable e) {
                                subscriber.onError(e);
                            }

                            @Override
                            public void onNext(T t) {
                                subscriber.onNext(t);
                            }

                            @Override
                            public void setProducer(final Producer producer) {
                                subscriber.setProducer(new Producer() {

                                    @Override
                                    public void request(final long n) {
                                        if (Thread.currentThread() == t) {
                                            producer.request(n);
                                        } else {
                                            inner.schedule(new Action0() {

                                                @Override
                                                public void call() {
                                                    producer.request(n);
                                                }
                                            });
                                        }
                                    }

                                });
                            }

                        });
                    }
                });
            }

        };
    }
}

可以看到OperatorSubscribeOn对象的call方法里面实例化了一个Subscriber<T>对象并返回。该Subscriber<T>的onNext方法中会调用Scheduler.schedule将一个Action0加入线程池里执行,该Action0的call方法会触发原始Observable<T>的unsafeSubscribe方法,然后会触发原始OnSubscribe<T>.call方法,也就是说原始OnSubscribe<T>.call方法是在切换后的线程中执行。如图
技术分享

RxJava源码初探

标签:

原文地址:http://blog.csdn.net/p892848153/article/details/51412589

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!