标签:
Create
创建一个Observable比较简单,最基础的方法是调用Observable的create方法进行创建,贴一下示例:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { //执行想要的操作 } });
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("observer can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // in case the subscriber can't listen to exceptions anymore if (subscriber.isUnsubscribed()) { RxJavaPluginUtils.handleException(hook.onSubscribeError(e)); } else { // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren't we throwing the hook's return value. throw r; } } return Subscriptions.unsubscribed(); } }
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
Map
先贴出简单的示例代码:
Observable.just(1,2,3,4,5) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return integer+"test"; } }) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.d(TAG, "call: "+s); } });
map方法的作用是将一种类型的Observer变成另一种类型的Observer,看一下它内部的实现
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return lift(new OperatorMap<T, R>(func)); }
public interface Func1<T, R> extends Function { R call(T t); }
回到上面的例子中,T就是Integer,R就是String,而我们使用了匿名类来实现的Func1接口,目的是将Integer类转化成String类。
在map方法中新出现了一个类叫OperatorMap,看一下这个类:
public final class OperatorMap<T, R> implements Operator<R, T> { final Func1<? super T, ? extends R> transformer; public OperatorMap(Func1<? super T, ? extends R> transformer) { this.transformer = transformer; } @Override public Subscriber<? super T> call(final Subscriber<? super R> o) { MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); return parent; } static final class MapSubscriber<T, R> extends Subscriber<T> { ... } }
但是还有一点值得去看的就是类名:OperatorMap<T, R> implements Operator<R, T>,这个Operator是个啥, 找一下它定义的地方:
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> { // cover for generics insanity }
@Override public Subscriber<? super T> call(final Subscriber<? super R> o) { <pre name="code" class="java"><span style="white-space:pre"> </span>MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); return parent;}
可以看出如果调用Operator的call方法可以将一种类型的Subscriber转化成另一种类型的Subscriber。而转化的方法的是创建了一个MapSubscriber直接返回。这个类是OperatorMap的内部类,贴一下代码简单看一下:
static final class MapSubscriber<T, R> extends Subscriber<T> { final Subscriber<? super R> actual; final Func1<? super T, ? extends R> mapper; boolean done; public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } actual.onNext(result); } @Override public void onError(Throwable e) { if (done) { RxJavaPluginUtils.handleException(e); return; } done = true; actual.onError(e); } @Override public void onCompleted() { if (done) { return; } actual.onCompleted(); } @Override public void setProducer(Producer p) { actual.setProducer(p); } }
<span style="white-space:pre"> </span>R result; result = mapper.call(t); actual.onNext(result);
将方法简化后在onNext中只剩下这三句,mapper就是在调用map方法的时候我们传入的Func1的匿名实现类,而actual是调用OperatorMap的call方法中传入的参数,暂时还没涉及到,等下涉及到了再来看。最后调用了actual的onNext方法,然后一切就结束了。
再回过头继续看map:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return lift(new OperatorMap<T, R>(func)); }
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> { static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); final OnSubscribe<T> parent; final Operator<? extends R, ? super T> operator; public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) { this.parent = parent; this.operator = operator; } @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { st.onStart(); parent.call(st); } catch (Throwable e) { Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } }
,那么就和它有一样的作用。在OnSubscribeLift的构造方法中接收了两个参数,一个是原始的onSubscribe,另一个就是Operator啦,如果还有印象应该可以Operator这个接口目的是将一种类型的Subscriber转化成另一种类型的Subscriber。
而OnSubscribe中最重要的就是call方法,也是通过create创建Observable时我们需要重写的,OnSubscribeLift中的call方法已经给出了实现,简单看一下。
hook.onLift(operator) 这句直接返回的就是operator,所以这句话的作用是调用operator的call方法将R类型的Subscriber转化成T类型的Subscriber,调用新创建出了Subscriber的onStart, 然后使用原来的OnSubscribe调用call方法,传入的是新创建的Subscriber。
到这里整个map的调用流程就结束了,但是相信看过了一定还是一脸懵逼,再从头看一遍,相信一下就能明白了,我再把开始时候的例子重新贴出来:
Observable.just(1,2,3,4,5) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return integer+"test"; } }) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.d(TAG, "call: "+s); } });
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
订阅的时候调用的是OnSubscribeLift的call方法,再贴一下:
public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); parent.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } }
public Subscriber<? super T> call(final Subscriber<? super R> o) { MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); return parent; }
public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } actual.onNext(result); }
这里的mapper就是我们自定义的那个Func1的实例。还记得Func1的作用么,将T转换成R,在本例中也就是将Integer转换成String,这里进行了转换,终于找到了~~~
那转换之后呢,调用了actual的onNext方法,这个actual就是OperatorMap中call方法里面的o,也就是最终我们在代码中写的封装了Action1的Subscriber。这样就完成了一连串的转换和调用。
简单的总结一下整个过程。因为只有原始我们写的OnSubscribe中有发射数据的逻辑,所以原始的那个必需得用上,但是原始的那个OnSubscribe想要的是一个T类型的Subscriber,不过我们现在有的只是R类型的Subscriber,所以进行了一层封装,将R类型的Subscriber转换成T类型的Subscriber。然后在封装类的OnNext方法中将原始的OnSubscribe里call方法中发射的一系列数据进行一个转换(从T转换成R),用的就是自己写的Func1,然后调用我们所写的Subscriber<R>中的onNext等方法,方法需要的参数类型是R,而我们转换生成的参数也是R,整个调用过程完成~~
我自己画了一个简单的流程图如下:
FlatMap
假设现在有一个商店类Shop,商店类中有地址和商品集合,每一个商品都有自己的名字,如果给出一个商店的集合,要将商店的地址输出出来该怎么做?
直接使用上面说过的map就可以,代码如下(实体的代码比较简单就没有贴出来):
List<Shop> shops = new ArrayList<>(); for(int i = 1;i<=10;++i){ Shop shop = new Shop(); shop.setAddress(new Address("Shop"+i)); for(int j = 1;j<=10;++j){ Good good = new Good("Shop"+i+":Good"+j); shop.getGoods().add(good); } shops.add(shop); } Observable.from(shops) .map(new Func1<Shop, Address>() { @Override public Address call(Shop shop) { return shop.getAddress(); } }) .subscribe(new Action1<Address>() { @Override public void call(Address address) { Log.d(TAG, "call: "+address.getName()); } });
可以这样:
Observable.from(shops) .map(new Func1<Shop, List<Good>>() { @Override public List<Good> call(Shop shop) { return shop.getGoods(); } }) .subscribe(new Action1<List<Good>>() { @Override public void call(List<Good> goods) { for(Good g:goods){ Log.d(TAG, "call: "+g.getName()); } } });
基于以上问题可以总结一点小的共性出来,还是将一个Observable<T>转换成另一个Observable<R>,只不过这个R并不是T中的属性,而是T中某个集合中的元素,这时候我们可以考虑使用FlatMap。贴一下使用FlatMap后的代码:
Observable.from(shops) .flatMap(new Func1<Shop, Observable<Good>>() { @Override public Observable<Good> call(Shop shop) { return Observable.from(shop.getGoods()); } }) .subscribe(new Action1<Good>() { @Override public void call(Good good) { Log.d(TAG, "call: "+good.getName()); } });
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) { ... return merge(map(func)); }
看一下merge的实现:
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) { ... return source.lift(OperatorMerge.<T>instance(false)); }
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
Operator<T, Observable<T>>
看一下OperatorMerge的定义:
public final class OperatorMerge<T> implements Operator<T, Observable<? extends T>> { ... }
instance方法是一个单例方法,采用了静态内部类的形式,代码如下:
public static <T> OperatorMerge<T> instance(boolean delayErrors) { if (delayErrors) { return (OperatorMerge<T>)HolderDelayErrors.INSTANCE; } return (OperatorMerge<T>)HolderNoDelay.INSTANCE; }
private static final class HolderNoDelay { /** A singleton instance. */ static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(false, Integer.MAX_VALUE); }
@Override public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) { MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent); MergeProducer<T> producer = new MergeProducer<T>(subscriber); subscriber.producer = producer; child.add(subscriber); child.setProducer(producer); return subscriber; }
static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>> { final Subscriber<? super T> child; ... public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent) { this.child = child; ... } ... <pre name="code" class="java"> <span style="white-space:pre"> </span>@Override public void onNext(Observable<? extends T> t) { if (t == null) { return; } if (t == Observable.empty()) { emitEmpty(); } else if (t instanceof ScalarSynchronousObservable) { tryEmit(((ScalarSynchronousObservable<? extends T>)t).get()); } else { InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++); addInner(inner); t.unsafeSubscribe(inner); emit(); } }
这个类的代码想对比较多,我们还是挑一些来看。构造方法中重点是传入了一个child,这个对象就是最终我们自己写的Subscriber,从它的泛型定义上也可以看的出来Subscriber<T>。
然后看一下onNext,除去各种错误检查,直接看最后一个else,里面首先创建出一个InnerSubscriber,看一下它的代码:
static final class InnerSubscriber<T> extends Subscriber<T> { final MergeSubscriber<T> parent; final long id; volatile boolean done; volatile RxRingBuffer queue; int outstanding; static final int limit = RxRingBuffer.SIZE / 4; public InnerSubscriber(MergeSubscriber<T> parent, long id) { this.parent = parent; this.id = id; } @Override public void onStart() { outstanding = RxRingBuffer.SIZE; request(RxRingBuffer.SIZE); } @Override public void onNext(T t) { parent.tryEmit(this, t); } @Override public void onError(Throwable e) { done = true; parent.getOrCreateErrorQueue().offer(e); parent.emit(); } @Override public void onCompleted() { done = true; parent.emit(); } public void requestMore(long n) { int r = outstanding - (int)n; if (r > limit) { outstanding = r; return; } outstanding = RxRingBuffer.SIZE; int k = RxRingBuffer.SIZE - r; if (k > 0) { request(k); } } }
t.unsafeSubscribe(inner);这句比较重点,看一下实现:
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) { try { // new Subscriber so onStart it subscriber.onStart(); // allow the hook to intercept and/or decorate hook.onSubscribeStart(this, onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren't we throwing the hook's return value. throw r; } return Subscriptions.unsubscribed(); } }
@Override public void onNext(T t) { parent.tryEmit(this, t); }
void tryEmit(InnerSubscriber<T> subscriber, T value) { boolean success = false; long r = producer.get(); if (r != 0L) { synchronized (this) { // if nobody is emitting and child has available requests r = producer.get(); if (!emitting && r != 0L) { emitting = true; success = true; } } } if (success) { emitScalar(subscriber, value, r); } else { queueScalar(subscriber, value); } }
protected void emitScalar(InnerSubscriber<T> subscriber, T value, long r) { boolean skipFinal = false; try { try { child.onNext(value); } catch (Throwable t) { if (!delayErrors) { Exceptions.throwIfFatal(t); skipFinal = true; subscriber.unsubscribe(); subscriber.onError(t); return; } getOrCreateErrorQueue().offer(t); } if (r != Long.MAX_VALUE) { producer.produced(1); } subscriber.requestMore(1); // check if some state changed while emitting synchronized (this) { skipFinal = true; if (!missed) { emitting = false; return; } missed = false; } } finally { if (!skipFinal) { synchronized (this) { emitting = false; } } } /* * In the synchronized block below request(1) we check * if there was a concurrent emission attempt and if there was, * we stay in emission mode and enter the emission loop * which will take care all the queued up state and * emission possibilities. */ emitLoop(); }
child.onNext(value);记性好的还能想起来child就是MergeSubscriber构造方法传入的我们自己写的Subscriber。
所以整个主线流程到这里就结束啦,看得还是晕晕的,下面再上一张FlatMap的流程图:
subscribeOn和observerOn
RxJava神奇之处还有一个地方就是它可以将线程的转换也简单的通过流式来处理了,下面来看一下例子:
Observable.just(1,2,3,4,5) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { } });
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn<T>(this, scheduler)); }
@Override public void call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); Subscriber<T> s = new Subscriber<T>(subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } @Override public void setProducer(final Producer p) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread()) { p.request(n); } else { inner.schedule(new Action0() { @Override public void call() { p.request(n); } }); } } }); } }; source.unsafeSubscribe(s); } }
@Override public void onNext(T t) { subscriber.onNext(t); }
这个时候我们可以简单的猜想一下,其实在inner.schedule方法执行中就已经切换了线程。我们来看一下源码。
在例子中使用的Scheduler是Schedulers.io,然后在OperatorSubscribeOn的call方法中调用了Schedulers.io返回的Scheduler的createWorker方法,跟进看一下:
public static Scheduler io() { return INSTANCE.ioScheduler; }
Scheduler io = hook.getIOScheduler(); if (io != null) { ioScheduler = io; } else { ioScheduler = RxJavaSchedulersHook.createIoScheduler(); }
public static Scheduler createIoScheduler() { return createIoScheduler(new RxThreadFactory("RxIoScheduler-")); }
public static Scheduler createIoScheduler(ThreadFactory threadFactory) { if (threadFactory == null) throw new NullPointerException("threadFactory == null"); return new CachedThreadScheduler(threadFactory); }
@Override public Worker createWorker() { return new EventLoopWorker(pool.get()); }
EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.threadWorker = pool.get(); }
public CachedThreadScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; this.pool = new AtomicReference<CachedWorkerPool>(NONE); start(); }
static { NONE = new CachedWorkerPool(null, 0, null); NONE.shutdown(); }
我们再回头看一下OperatorSubscribeOn的call方法,调用了inner.schedule,这个inner就是EventLoopWorker,它的schedule方法如下:
@Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { if (innerSubscription.isUnsubscribed()) { // don't schedule, we are unsubscribed return Subscriptions.unsubscribed(); } ScheduledAction s = threadWorker.scheduleActual(new Action0() { @Override public void call() { if (isUnsubscribed()) { return; } action.call(); } }, delayTime, unit); innerSubscription.add(s); s.addParent(innerSubscription); return s; }
ThreadWorker get() { if (allWorkers.isUnsubscribed()) { return SHUTDOWN_THREADWORKER; } while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } } // No cached worker found, so create a new one. ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; }
private static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } public long getExpirationTime() { return expirationTime; } public void setExpirationTime(long expirationTime) { this.expirationTime = expirationTime; } }
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { Action0 decoratedAction = schedulersHook.onSchedule(action); ScheduledAction run = new ScheduledAction(decoratedAction); Future<?> f; if (delayTime <= 0) { f = executor.submit(run); } else { f = executor.schedule(run, delayTime, unit); } run.add(f); return run; }
public NewThreadWorker(ThreadFactory threadFactory) { ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak boolean cancelSupported = tryEnableCancelPolicy(exec); if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) { registerExecutor((ScheduledThreadPoolExecutor)exec); } schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook(); executor = exec; }
至此subscribeOn整体就分析完成了,大体流程也不难,主要就是代码的层次有点深,不太好找。这里再画一个图来总结一下:
下面看一下observerOn:
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); }
@Override public Subscriber<? super T> call(Subscriber<? super T> child) { if (scheduler instanceof ImmediateScheduler) { // avoid overhead, execute directly return child; } else if (scheduler instanceof TrampolineScheduler) { // avoid overhead, execute directly return child; } else { ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); parent.init(); return parent; } }
@Override public void onNext(final T t) { if (isUnsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); }
protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } }
final Scheduler.Worker recursiveScheduler; public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); this.delayError = delayError; this.on = NotificationLite.instance(); int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; // this formula calculates the 75% of the bufferSize, rounded up to the next integer this.limit = calculatedSize - (calculatedSize >> 2); if (UnsafeAccess.isUnsafeAvailable()) { queue = new SpscArrayQueue<Object>(calculatedSize); } else { queue = new SpscAtomicArrayQueue<Object>(calculatedSize); } // signal that this is an async operator capable of receiving this many request(calculatedSize); }
回头看onNext,这个Worker的schedule方法需要一个Action0,正好ObserveOnSubscriber实现了Action0,那么看一下它的call方法,也就是在新线程中被调用的方法:
@Override public void call() { long missed = 1L; long currentEmission = emitted; // these are accessed in a tight loop around atomics so // loading them into local variables avoids the mandatory re-reading // of the constant fields final Queue<Object> q = this.queue; final Subscriber<? super T> localChild = this.child; final NotificationLite<T> localOn = this.on; // requested and counter are not included to avoid JIT issues with register spilling // and their access is is amortized because they are part of the outer loop which runs // less frequently (usually after each bufferSize elements) for (;;) { long requestAmount = requested.get(); while (requestAmount != currentEmission) { boolean done = finished; Object v = q.poll(); boolean empty = v == null; if (checkTerminated(done, empty, localChild, q)) { return; } if (empty) { break; } localChild.onNext(localOn.getValue(v)); currentEmission++; if (currentEmission == limit) { requestAmount = BackpressureUtils.produced(requested, currentEmission); request(currentEmission); currentEmission = 0L; } } if (requestAmount == currentEmission) { if (checkTerminated(finished, q.isEmpty(), localChild, q)) { return; } } emitted = currentEmission; missed = counter.addAndGet(-missed); if (missed == 0L) { break; } } }
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); this.delayError = delayError; this.on = NotificationLite.instance(); int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; // this formula calculates the 75% of the bufferSize, rounded up to the next integer this.limit = calculatedSize - (calculatedSize >> 2); if (UnsafeAccess.isUnsafeAvailable()) { queue = new SpscArrayQueue<Object>(calculatedSize); } else { queue = new SpscAtomicArrayQueue<Object>(calculatedSize); } // signal that this is an async operator capable of receiving this many request(calculatedSize); }
到这里observerOn就结束了,有之前的subscribeOn做铺垫看起来就比较轻松。下面还是总结一下画一张图:
RxJava基础的一些操作就到这里了,如果发现出现问题请务必告知,谢谢
标签:
原文地址:http://blog.csdn.net/u014787113/article/details/51531955