标签:
package com.yue.test; import java.awt.Cursor; import java.util.ArrayList; import java.util.List; import com.yue.bean.Course; import com.yue.bean.Student; import rx.Observable; import rx.Subscription; import rx.Observable.OnSubscribe; import rx.Observable.Operator; import rx.exceptions.Exceptions; import rx.exceptions.OnErrorFailedException; import rx.exceptions.OnErrorThrowable; import rx.functions.Func1; import rx.internal.operators.OnSubscribeLift; import rx.internal.operators.OperatorMerge; import rx.internal.operators.OnSubscribeMap.MapSubscriber; import rx.internal.operators.OperatorMerge.InnerSubscriber; import rx.internal.util.ScalarSynchronousObservable; import rx.internal.util.UtilityFunctions; import rx.plugins.RxJavaHooks; import rx.plugins.RxJavaPlugins; import rx.subscriptions.Subscriptions; import rx.Subscriber; /** * flatmap 一对多的转换 * * RxJava的其他方式的观察者模式的实现都和普通的一样,所以楼主用最普通的方式实现实例 ,而没有用just、from 或ActionX等方式 * * @ClassName: RxText3 * @Description: TODO * @author shimingyue * @date 2016-7-14 上午10:25:57 * */ public class RxText3 { public static void main(String[] args) { RxText3 rxText3 = new RxText3(); rxText3.method3(); } /** * method1 */ private void method1() { final Integer[] student = { 110, 120, 119 }; /** * 泛型为String的观察者 */ Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String t) { System.out.println(t); } @Override public void onError(Throwable e) { } @Override public void onCompleted() { } }; /** * 泛型为Integer的源被观察者 */ Observable<Integer> observable = Observable .create(new OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { // 这里和from的效果是一样的 Observable.from() subscriber.onNext(student[0]); subscriber.onNext(student[1]); subscriber.onNext(student[2]); subscriber.onCompleted(); } }); /** * 转换后的被观察者observable2 泛型为String ,我们要在这里进行类型 */ Observable<String> observable2 = observable .map(new Func1<Integer, String>() { @Override public String call(Integer t) { System.out.println("funcx 源被传的值:" + t); return "转换后:" + t; } }); observable2.subscribe(subscriber); } /** * 使用map和for循环 一对一 打印 */ private void method2() { final Student[] students = getParams(); /** * 定义一个观察者 */ Subscriber<Student> subscriber = new Subscriber<Student>() { @Override public void onNext(Student student) { // 接收被观察者的消息通知,被观察者会传过来student这个值 System.out.println("--------------------"); System.out.println(student.getName()); List<Course> courses = student.getCourses(); for (Course course : courses) { System.out.println(course.getName()); } } @Override public void onError(Throwable e) { } @Override public void onCompleted() { } }; /** * 定义一个被观察者 */ Observable<Student> observable = Observable .create(new OnSubscribe<Student>() { /** * 这个方法将在订阅时被调用, */ @Override public void call(Subscriber<? super Student> subscriber) { subscriber.onNext(students[0]); subscriber.onNext(students[1]); subscriber.onNext(students[2]); subscriber.onCompleted(); } }); // 观察者订阅被观察者 observable.subscribe(subscriber); } /** * 上面的方法(method1)是用for循环逐个打印每个学生对应的书籍的 */ private void method3() { final Student[] students = getParams(); /** * 创建一个观察者 泛型类型为Course */ Subscriber<Course> subscriber = new Subscriber<Course>() { @Override public void onNext(Course course) { System.out.println(course.getName()); } @Override public void onError(Throwable e) { } @Override public void onCompleted() { } }; /** * 创建一个观察者 泛型类型为Student */ Observable<Student> observable = Observable .create(new OnSubscribe<Student>() { //自己写的 @Override public void call(Subscriber<? super Student> subscriber) { subscriber.onNext(students[0]); subscriber.onNext(students[1]); subscriber.onNext(students[2]); subscriber.onCompleted(); } }); /** * 使用flatmap进行类型转换 将泛型为Student的被观察者转换为泛型为Course */ Observable<Course> observable2 = observable .flatMap(new Func1<Student, Observable<? extends Course>>() { @Override public Observable<? extends Course> call(Student student) { System.out.println("------------------"); System.out.println(student.getName()); return Observable.from(student.getCourses()); } }); // 两个不同类型的观察者和被观察者定义好啦 observable2.subscribe(subscriber); } /** * flatMap源码分析 * * 首先打开flatMap 方法 * public final <R> Observable<R> flatMap(Func1<? super T, ? * extends Observable<? extends R>> func) { if (getClass() == * ScalarSynchronousObservable.class) { return * ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func); } return * merge(map(func)); } * * 可以看到一个merge方法里有一个map方法的执行,map方法我们前面分析过,但是里面的FuncX有所不同, * 它的call方法里返回的是一个Observable被观察者对象,所以我们要重新审视一下map这个方法。 * * 原来正常的map 如果观察者订阅被观察者导致执行的话 * 我们知道一起方法的执行都是起源于subscribe订阅方法的 * 而subscribe最后又会执行被观察者的OnSubscribe里的call(Subscriber)方法, * call里又执行观察者的onNext方法进行通知,而在map转换里面,问题就出在OnSubscribe.call * 里啦。因为经过map转换后会产生一个新的被观察者,而这个新的被观察者会持有一个新的OnSubscribe * 监听实例OnSubscribeMap[继承于OnSubscribe], * 所以在我们的观察者订阅被观察者的时候,在被观察者的订阅方法subscribe里执行的OnSubscribe.call * 实际上是OnSubscribeMap的call方法,在这个call方法里会初始化一个观察者的实例MapSubscriber, * 它持有源被观察者和funcx,然后兜兜圈圈又神奇的执行到源Observable的OnSubscribe实例的call方法 * ,然而此时这个源call方法里subscriber对象俨然变成了MapSubscriber,所以后续执行的是MapSubscriber * 的onNext方法,进行转换 * * * * * * * * * 创建 * * flatMap方法,上面的map是经过了执行的,不执行其实就是上面的新观察者的创建过程 * * 我们知道map返回了一个上述的被观察者,如果不使用flatMap的话会按照套路走,那使用flatmap后呢, * 我们看到它吧map生成的被观察者对象放入了一个叫merge的方法,进去看看 * * @SuppressWarnings({"unchecked", "rawtypes"}) public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) { if (source.getClass() == ScalarSynchronousObservable.class) { return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity()); } /** * 含有使用源被观察者和FuncX实例的新被观察者source * OperatorMerge:Operator:经营者,操作者 Merge:合并 * 他创建了一个合并的类,但这个类合并的是什么暂时不知道 * 但可以到我们新的被观察者的lift里去看看 return source.lift(OperatorMerge.<T>instance(false)); } * * 我们看到执行lift后又反悔了一个Observable,说明在经过map转换后又进行了一次调整 * * public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return create(new OnSubscribeLift<T, R>(onSubscribe, operator)); } * 在看看create方法 OnSubscribeLift,这是什么鬼, * 可以看出这也是一个OnSubscribe的子类,和OnSubscribeMap一样,看看声明时里面传的参数 * onSubscribe:因为传入lift的是一个经过map转换的新被观察者对象[提示merge],所以这个参数是OnSubscribeMap监听实例 * operator:暂时不知道什么作用的一个对象 * 到这里生成了一个持有新被观察者OnSubscribeMap和一个operator参数的OnSubscribeLift监听实例 * * 继续向下, * public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); } * * 果然,OnSubscribeLift被用来创建一个2级的新的被观察者,这样源被观察者被动了两次手术,但有一个手术刀使我们不知道怎么用的,就是operator * * 这个参数: 没关系我们可以百度: * http://www.tuicool.com/articles/VrQRvur * Observable -> Operator 1 -> Operator 2 -> Operator 3 -> Subscriber * 因为subscriber通常在主线程中执行,因此设计上要求其代码尽可能简单,只对事件进行响应,而修改事件的工作全部由operator执行。 * Observable生产事件,Operator修改事件,最后由Subscriber去及时响应事件,现在我们知道这个参数是干啥的啦,就可以继续往下看啦 * * * * * * * * * * * * * 执行 * * * 创建完啦,转换也完啦,就剩执行啦,拉莫还是回头看subscribe()订阅方法 * 我们知道在里面执行了 OnSubscribe.call方法,那这个OnSubscribe是啥呢。 * 没有错,他就是我们flatMap转换后返回的Observable实例,经过上面的分析,我们知道flatMap返回的实例是一个 * 含有OnSubscribeLift监听实例的3手的被观察者, * 所以会执行OnSubscribeLift.call(),进去看看做了什么 * * @Override public void call(Subscriber<? super R> o) { try { 创建一个新的观察者,还记得map的MapSubscriber吗,它也是个新的观察者,基于源观察者o的,在map里会兜兜圈圈执行这个新观察者 onNext,MapSubscriber持有FuncX对象和源观察者。 Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o); try { // 看这里 st.onStart(); parent.call(st); } catch (Throwable e) { Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); o.onError(e); } } * * 其实走到这里我们应该捋一捋 * 1.我们首先使用map创建了一个新的被观察者,他持有OnSubscribeMap:他持有源被观察者和FuncX * 2.然后我们用又使用这个新的被观察者创建了一个3手的被观察者,而它持有一个OnSubscribeLift:它持有OnSubscribeMap和一个operator[事件操作者] * 4.然后我们调用3手被观察者的OnSubscribeLift.call()方法,到这里OnSubscribeLift又持有一个观察者对象 * * 好啦,到了关键时候啦,我们看到OnSubscribeLift只有几行重要的代码 * 先看新观察者的创建过程,回顾一下前面的operator:在merge中 return source.lift(OperatorMerge.<T>instance(false)); * instance 里的返回 T是源被观察者的泛型 * static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(false, Integer.MAX_VALUE); * operator原来是这个样子的,都快忘啦。 * * 好,回顾完啦。我们看RxJavaHooks.onObservableLift(operator);这一句做了什么 * 它传进去了一个operator,那是我们的operator,它用它做了什么呢,其实很前面一样,啥都没做, * 就是判断了一下,然后就将这个operator又原路返回啦,源码如下: * @SuppressWarnings({ "rawtypes", "unchecked" }) public static <T, R> Operator<R, T> onObservableLift(Operator<R, T> operator) { Func1<Operator, Operator> f = onObservableLift; if (f != null) { return f.call(operator); } return operator; } * * * onObservableLift = new Func1<Operator, Operator>() { @Override public Operator call(Operator t) { return RxJavaPlugins.getInstance().getObservableExecutionHook().onLift(t); } }; 所以Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o); 这儿调用的call方法是我们前面OperatorMerge的call方法,进去看看 * * * maxConcurrent:Integer.MAX_VALUE 65535 * delayErrors:延迟错误,传过来的是false OperatorMerge.<T>instance(false) * child:源观察者 * @Override public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) { MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent); MergeProducer<T> producer = new MergeProducer<T>(subscriber); subscriber.producer = producer; //这两句楼主也没明白啥意思,有知道的请告知哦 child.add(subscriber); child.setProducer(producer); return subscriber; } * *可以看出创建了一个MergeSubscriber新观察者,它持有上面三个参数,是不是和OnSubscribeMap里的内部类MapSubscriber差不多, *没错,他也是一个内部类 *可以看到除了一个MergeSubscriber对象外,还有一个MergeProducer对象,他把MergeSubscriber传进了构造方法,到这里,我们的 *源观察者放进了MergeSubscriber,而MergeSubscriber放进了MergeProducer,仅仅是放了进去,啥也没有做呢,再向下看 *subscriber.producer = producer;将新创建的MergeProducer赋值。再往下看。又有add方法,与上面一样将我们的新观察者放进 *了list里面,还不知道有什么卵用,那就看下一句,仅仅设置了一下setProducer,最后返回了这个新创建的观察者, * *ok,获得了观察者,在回头看OnSubscribeLift的call方法,它执行了新观察者的MergeSubscriber的onStart方法,其实MergeSubscriber *是没有onStart方法的,所以,执行的是父类的,然而并没有什么卵用,父类啥也没做 * * * * *继续 parent.call(st); *parent是我们2手观察者的OnSubscribe,也就是OnSubscribeMap,终于到了关键的时候啦, *它将我们新创建的观察者放入了OnSubscribeMap的call方法内 *回顾一下这个方法: * @Override public void call(final Subscriber<? super R> o) { MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); 这里source还是源被观察者 source.unsafeSubscribe(parent); } *在这里,它使用MergeSubscriber又创建了一个MapSubscriber,transformer是使我们自己定义的FuncX, *他在前面merge(map(funcx))里已经传入,现在终于排上用场啦 *add方法不用管,lz也不知道有何意义,直接看source.unsafeSubscribe(parent);,前面map我们分析过这个东西, *在执行一遍流程。 *MapSubscriber 3手观察者:持有一个MergeSubscriber观察者,MergeSubscriber持有源观察者和其他两个参数maxConcurrent,delayErrors * * * public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) { try { // new Subscriber so onStart it subscriber.onStart(); // allow the hook to intercept and/or decorate RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(RxJavaHooks.onObservableError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. RxJavaHooks.onObservableError(r); // TODO why aren't we throwing the hook's return value. throw r; // NOPMD } return Subscriptions.unsubscribed(); } } *看下面这一句 *RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber); *我们在上一篇map的时候已经知道执行的是源被观察的onSubscribe的call方法, *subscriber就是上面持有我们MergeSubscriber观察者的MapSubscriber观察者, *既然执行我们源观察者的onSubscribe的call方法,那就看我们自己写的吧 * * 创建一个观察者 泛型类型为Student Observable<Student> observable = Observable .create(new OnSubscribe<Student>() { //自己写的这里面的subscriber是MapSubscriber包含MergeSubscriber的复合观察者,我们这里执行了三次 @Override public void call(Subscriber<? super Student> subscriber) { subscriber.onNext(students[0]); subscriber.onNext(students[1]); subscriber.onNext(students[2]); subscriber.onCompleted(); } }); *转了一圈又一圈,终于有回到了原点,只不过比map又多了一个MergeSubscriber,他是OperatorMerge的内部类 *下面我们看是怎么执行的 *我们知道subscriber是MapSubscriber,所以首先看MapSubscriber的onNext方法 * *@Override public void onNext(T t) { R result; try { mapper:我们的funcx result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } actual:这里的就是我们调用OnSubscribeMap.call传过来的观察者,这里是MergeSubscriber actual.onNext(result); } * 还是和以前一样貌美如花, result = mapper.call(t);这里的t是源观察者调用onNext传的参数, * 我们使用funcx的call方法将这个参数转换(至于转换成什么,看你喽),这里得转换成一个被观察者对象 * ,得到返回结果后执行传过来的观察者MergeSubscriber的 * onNext方法,并传入我们自己生成的被观察者,这才是关键,so,看看MergeSubscriber里的onNext是怎么执行的, * * @Override public void onNext(Observable<? extends T> t) { if (t == null) { return; } if (t == Observable.empty()) { emitEmpty(); } else if (t instanceof ScalarSynchronousObservable) { 要是你声明的被观察者是ScalarSynchronousObservable就看着而吧 tryEmit(((ScalarSynchronousObservable<? extends T>)t).get()); } else { //主要看这儿 InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++); addInner(inner); //这里的t就是我们funcx返回的被观察者Observable t.unsafeSubscribe(inner); emit(); } } * *可以看出InnerSubscriber内部类,窝草,又一个观察者,fuck, *可以看出这个内部的观察者持有一个MergeSubscriber的实例,然后又有一个uniqueId:应该是一个记录的参数,先不管他 *下面到了addInner(inner);先不管 *再往下t.unsafeSubscribe(inner); *t.unsafeSubscribe(inner)这一句代码是不是有点熟悉,我们在分析map的时候,回顾一下吧 *在OnSubscribeMap里 * @Override public void call(final Subscriber<? super R> o) { MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); source.unsafeSubscribe(parent); } *好啦回顾完毕,看看这个鬼执行了神马,和map的一样,走进去看看,老样子 * * public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) { try { // new Subscriber so onStart it subscriber.onStart(); // allow the hook to intercept and/or decorate RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(RxJavaHooks.onObservableError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. RxJavaHooks.onObservableError(r); // TODO why aren't we throwing the hook's return value. throw r; // NOPMD } return Subscriptions.unsubscribed(); } } * *RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);主要看这句 *我们是用funcx返回的Obsevable来调用的unsafeSubscribe方法,而RxJavaHooks.onObservableStart(this, onSubscribe) *这一段就是进行了一下安全判断在返回这个Obserable的onSubscribe,所以我们使用Funcx的call方法获得的Obserable的 *onSubscribe执行了call方法,具体执行了什么,看你返回的Obserable喽,到此分析完毕 * * *好啦 到这里该总结一下流程了 * *1.源观察者执行 subscriber.onNext方法将参数传入 *2.上面的subscriber其实已经被转义为MapSubscriber的对象了,他持有。。。。 *3.在MapSubscriber的onNext方法里进行了转换并获得了一个Obserable *4.Obserable是我们的FuncX的call方法转换返回的call里面的值是我们源观察者的onNext的传入参数,不懂看1和2 *5.得到这个结果后,onNext方法里执行了actual.onNext(result);这一句actual是一个MergeSubscriber的实例 *6.所以actual.onNext(result);执行的是MergeSubscriber里的onNext方法,其实MergeSubscriber还会被包装一下 *成为一个InnerSubscriber楼主不做介绍 *7。t.unsafeSubscribe(inner);然后到了这里, *8.最后到了RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);这里执行了返回的被观察者的onSubscribe的call方法, */ /** * 设置参数 */ private Student[] getParams() { Course course1 = new Course(); course1.setName("红楼梦"); Course course2 = new Course(); course2.setName("水浒传"); Course course3 = new Course(); course3.setName("西游记"); Course course4 = new Course(); course4.setName("三国演义"); List<Course> courses = new ArrayList<Course>(); courses.add(course1); courses.add(course2); courses.add(course3); courses.add(course4); Student student1 = new Student(); student1.setName("小明"); student1.setCourses(courses); Student student2 = new Student(); student2.setName("小牧"); student2.setCourses(courses); Student student3 = new Student(); student3.setName("小段"); student3.setCourses(courses); Student[] students = { student1, student2, student3 }; return students; } }
标签:
原文地址:http://blog.csdn.net/sinat_32955803/article/details/51923288