码迷,mamicode.com
首页 > 编程语言 > 详细

RxJava1.0 flatMap方法的源码分析

时间:2016-07-16 16:10:56      阅读:415      评论:0      收藏:0      [点我收藏+]

标签:

RxJava1.0 flatMap方法的源码分析


  • package com.yue.test;
    
    import java.awt.Cursor;
    import java.util.ArrayList;
    import java.util.List;
    
    import com.yue.bean.Course;
    import com.yue.bean.Student;
    
    import rx.Observable;
    import rx.Subscription;
    import rx.Observable.OnSubscribe;
    import rx.Observable.Operator;
    import rx.exceptions.Exceptions;
    import rx.exceptions.OnErrorFailedException;
    import rx.exceptions.OnErrorThrowable;
    import rx.functions.Func1;
    import rx.internal.operators.OnSubscribeLift;
    import rx.internal.operators.OperatorMerge;
    import rx.internal.operators.OnSubscribeMap.MapSubscriber;
    import rx.internal.operators.OperatorMerge.InnerSubscriber;
    import rx.internal.util.ScalarSynchronousObservable;
    import rx.internal.util.UtilityFunctions;
    import rx.plugins.RxJavaHooks;
    import rx.plugins.RxJavaPlugins;
    import rx.subscriptions.Subscriptions;
    import rx.Subscriber;
    
    /**
     * flatmap 一对多的转换
     * 
     * RxJava的其他方式的观察者模式的实现都和普通的一样,所以楼主用最普通的方式实现实例 ,而没有用just、from 或ActionX等方式
     * 
     * @ClassName: RxText3
     * @Description: TODO
     * @author shimingyue
     * @date 2016-7-14 上午10:25:57
     * 
     */
    public class RxText3 {
    
    	public static void main(String[] args) {
    		RxText3 rxText3 = new RxText3();
    		rxText3.method3();
    	}
    
    	/**
    	 * method1
    	 */
    	private void method1() {
    		final Integer[] student = { 110, 120, 119 };
    		/**
    		 * 泛型为String的观察者
    		 */
    		Subscriber<String> subscriber = new Subscriber<String>() {
    
    			@Override
    			public void onNext(String t) {
    				System.out.println(t);
    			}
    
    			@Override
    			public void onError(Throwable e) {
    			}
    
    			@Override
    			public void onCompleted() {
    			}
    		};
    
    		/**
    		 * 泛型为Integer的源被观察者
    		 */
    		Observable<Integer> observable = Observable
    				.create(new OnSubscribe<Integer>() {
    
    					@Override
    					public void call(Subscriber<? super Integer> subscriber) {
    						// 这里和from的效果是一样的 Observable.from()
    						subscriber.onNext(student[0]);
    						subscriber.onNext(student[1]);
    						subscriber.onNext(student[2]);
    						subscriber.onCompleted();
    					}
    				});
    		/**
    		 * 转换后的被观察者observable2 泛型为String ,我们要在这里进行类型
    		 */
    		Observable<String> observable2 = observable
    				.map(new Func1<Integer, String>() {
    
    					@Override
    					public String call(Integer t) {
    						System.out.println("funcx 源被传的值:" + t);
    						return "转换后:" + t;
    					}
    				});
    		observable2.subscribe(subscriber);
    	}
    
    	/**
    	 * 使用map和for循环 一对一 打印
    	 */
    	private void method2() {
    		final Student[] students = getParams();
    		/**
    		 * 定义一个观察者
    		 */
    		Subscriber<Student> subscriber = new Subscriber<Student>() {
    
    			@Override
    			public void onNext(Student student) {
    				// 接收被观察者的消息通知,被观察者会传过来student这个值
    				System.out.println("--------------------");
    				System.out.println(student.getName());
    				List<Course> courses = student.getCourses();
    				for (Course course : courses) {
    					System.out.println(course.getName());
    				}
    			}
    
    			@Override
    			public void onError(Throwable e) {
    			}
    
    			@Override
    			public void onCompleted() {
    			}
    		};
    
    		/**
    		 * 定义一个被观察者
    		 */
    		Observable<Student> observable = Observable
    				.create(new OnSubscribe<Student>() {
    
    					/**
    					 * 这个方法将在订阅时被调用,
    					 */
    					@Override
    					public void call(Subscriber<? super Student> subscriber) {
    						subscriber.onNext(students[0]);
    						subscriber.onNext(students[1]);
    						subscriber.onNext(students[2]);
    						subscriber.onCompleted();
    					}
    				});
    
    		// 观察者订阅被观察者
    		observable.subscribe(subscriber);
    	}
    
    	/**
    	 * 上面的方法(method1)是用for循环逐个打印每个学生对应的书籍的
    	 */
    
    	private void method3() {
    		final Student[] students = getParams();
    		/**
    		 * 创建一个观察者 泛型类型为Course
    		 */
    		Subscriber<Course> subscriber = new Subscriber<Course>() {
    
    			@Override
    			public void onNext(Course course) {
    				System.out.println(course.getName());
    			}
    
    			@Override
    			public void onError(Throwable e) {
    			}
    
    			@Override
    			public void onCompleted() {
    			}
    		};
    
    		/**
    		 * 创建一个观察者 泛型类型为Student
    		 */
    		Observable<Student> observable = Observable
    				.create(new OnSubscribe<Student>() {
    					//自己写的
    					@Override
    					public void call(Subscriber<? super Student> subscriber) {
    						subscriber.onNext(students[0]);
    						subscriber.onNext(students[1]);
    						subscriber.onNext(students[2]);
    						subscriber.onCompleted();
    					}
    				});
    
    		/**
    		 * 使用flatmap进行类型转换 将泛型为Student的被观察者转换为泛型为Course
    		 */
    		Observable<Course> observable2 = observable
    				.flatMap(new Func1<Student, Observable<? extends Course>>() {
    
    					@Override
    					public Observable<? extends Course> call(Student student) {
    						System.out.println("------------------");
    						System.out.println(student.getName());
    						return Observable.from(student.getCourses());
    					}
    				});
    		// 两个不同类型的观察者和被观察者定义好啦
    		observable2.subscribe(subscriber);
    	}
    
    	/**
    	 * flatMap源码分析
    	 * 
    	 * 首先打开flatMap 方法
    	 *  public final <R> Observable<R> flatMap(Func1<? super T, ?
    	 * extends Observable<? extends R>> func) { if (getClass() ==
    	 * ScalarSynchronousObservable.class) { return
    	 * ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func); } return
    	 * merge(map(func)); }
    	 * 
    	 * 可以看到一个merge方法里有一个map方法的执行,map方法我们前面分析过,但是里面的FuncX有所不同,
    	 * 它的call方法里返回的是一个Observable被观察者对象,所以我们要重新审视一下map这个方法。
    	 * 
    	 * 原来正常的map 如果观察者订阅被观察者导致执行的话
    	 * 我们知道一起方法的执行都是起源于subscribe订阅方法的
    	 * 而subscribe最后又会执行被观察者的OnSubscribe里的call(Subscriber)方法,
    	 * call里又执行观察者的onNext方法进行通知,而在map转换里面,问题就出在OnSubscribe.call
    	 * 里啦。因为经过map转换后会产生一个新的被观察者,而这个新的被观察者会持有一个新的OnSubscribe
    	 * 监听实例OnSubscribeMap[继承于OnSubscribe],
    	 * 所以在我们的观察者订阅被观察者的时候,在被观察者的订阅方法subscribe里执行的OnSubscribe.call
    	 * 实际上是OnSubscribeMap的call方法,在这个call方法里会初始化一个观察者的实例MapSubscriber,
    	 * 它持有源被观察者和funcx,然后兜兜圈圈又神奇的执行到源Observable的OnSubscribe实例的call方法
    	 * ,然而此时这个源call方法里subscriber对象俨然变成了MapSubscriber,所以后续执行的是MapSubscriber
    	 * 的onNext方法,进行转换
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 创建
    	 * 
    	 * flatMap方法,上面的map是经过了执行的,不执行其实就是上面的新观察者的创建过程
    	 * 
    	 * 我们知道map返回了一个上述的被观察者,如果不使用flatMap的话会按照套路走,那使用flatmap后呢,
    	 * 我们看到它吧map生成的被观察者对象放入了一个叫merge的方法,进去看看
    	 * 
    	 *     @SuppressWarnings({"unchecked", "rawtypes"})
        public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
            if (source.getClass() == ScalarSynchronousObservable.class) {
                return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
            }
            /**
             * 含有使用源被观察者和FuncX实例的新被观察者source
             * OperatorMerge:Operator:经营者,操作者 Merge:合并
             * 他创建了一个合并的类,但这个类合并的是什么暂时不知道
             * 但可以到我们新的被观察者的lift里去看看
            return source.lift(OperatorMerge.<T>instance(false));
        }
    	 * 
    	 * 我们看到执行lift后又反悔了一个Observable,说明在经过map转换后又进行了一次调整
    	 * 
    	 * public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
        }
    	 * 在看看create方法 OnSubscribeLift,这是什么鬼,
    	 * 可以看出这也是一个OnSubscribe的子类,和OnSubscribeMap一样,看看声明时里面传的参数
    	 * onSubscribe:因为传入lift的是一个经过map转换的新被观察者对象[提示merge],所以这个参数是OnSubscribeMap监听实例
         * operator:暂时不知道什么作用的一个对象
         * 到这里生成了一个持有新被观察者OnSubscribeMap和一个operator参数的OnSubscribeLift监听实例
         * 
         * 继续向下,
         *     public static <T> Observable<T> create(OnSubscribe<T> f) {
            return new Observable<T>(RxJavaHooks.onCreate(f));
        }
    	 *
    	 * 果然,OnSubscribeLift被用来创建一个2级的新的被观察者,这样源被观察者被动了两次手术,但有一个手术刀使我们不知道怎么用的,就是operator
    	 * 
    	 * 这个参数: 没关系我们可以百度:
    	 * http://www.tuicool.com/articles/VrQRvur
    	 * Observable -> Operator 1 -> Operator 2 -> Operator 3 -> Subscriber
    	 * 因为subscriber通常在主线程中执行,因此设计上要求其代码尽可能简单,只对事件进行响应,而修改事件的工作全部由operator执行。
    	 * Observable生产事件,Operator修改事件,最后由Subscriber去及时响应事件,现在我们知道这个参数是干啥的啦,就可以继续往下看啦
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 执行
    	 * 
    	 * 
    	 * 创建完啦,转换也完啦,就剩执行啦,拉莫还是回头看subscribe()订阅方法
    	 * 我们知道在里面执行了 OnSubscribe.call方法,那这个OnSubscribe是啥呢。
    	 * 没有错,他就是我们flatMap转换后返回的Observable实例,经过上面的分析,我们知道flatMap返回的实例是一个
    	 * 含有OnSubscribeLift监听实例的3手的被观察者,
    	 * 所以会执行OnSubscribeLift.call(),进去看看做了什么
    	 * 
    	 *     @Override
        public void call(Subscriber<? super R> o) {
            try {
            	创建一个新的观察者,还记得map的MapSubscriber吗,它也是个新的观察者,基于源观察者o的,在map里会兜兜圈圈执行这个新观察者
            	onNext,MapSubscriber持有FuncX对象和源观察者。
                Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
                try {
                    // 看这里
                    st.onStart();
                    parent.call(st);
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    st.onError(e);
                }
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                o.onError(e);
            }
        }
    	 * 
    	 * 其实走到这里我们应该捋一捋
    	 * 1.我们首先使用map创建了一个新的被观察者,他持有OnSubscribeMap:他持有源被观察者和FuncX
    	 * 2.然后我们用又使用这个新的被观察者创建了一个3手的被观察者,而它持有一个OnSubscribeLift:它持有OnSubscribeMap和一个operator[事件操作者]
    	 * 4.然后我们调用3手被观察者的OnSubscribeLift.call()方法,到这里OnSubscribeLift又持有一个观察者对象
    	 * 
    	 * 好啦,到了关键时候啦,我们看到OnSubscribeLift只有几行重要的代码
    	 * 先看新观察者的创建过程,回顾一下前面的operator:在merge中  return source.lift(OperatorMerge.<T>instance(false));
    	 * instance 里的返回 T是源被观察者的泛型
    	 * static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(false, Integer.MAX_VALUE);
    	 * operator原来是这个样子的,都快忘啦。
    	 * 
    	 * 好,回顾完啦。我们看RxJavaHooks.onObservableLift(operator);这一句做了什么
    	 * 它传进去了一个operator,那是我们的operator,它用它做了什么呢,其实很前面一样,啥都没做,
    	 * 就是判断了一下,然后就将这个operator又原路返回啦,源码如下:
    	 * @SuppressWarnings({ "rawtypes", "unchecked" })
        public static <T, R> Operator<R, T> onObservableLift(Operator<R, T> operator) {
            Func1<Operator, Operator> f = onObservableLift;
            if (f != null) {
                return f.call(operator);
            }
            return operator;
        }
    	 *
    	 * 
    	 * onObservableLift = new Func1<Operator, Operator>() {
                @Override
                public Operator call(Operator t) {
                    return RxJavaPlugins.getInstance().getObservableExecutionHook().onLift(t);
                }
            };
            
            所以Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            这儿调用的call方法是我们前面OperatorMerge的call方法,进去看看
    	 * 
    	 * 
    	 * maxConcurrent:Integer.MAX_VALUE 65535
    	 * delayErrors:延迟错误,传过来的是false OperatorMerge.<T>instance(false)
    	 * child:源观察者
    	 *    @Override
        public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
            MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
            MergeProducer<T> producer = new MergeProducer<T>(subscriber);
            subscriber.producer = producer;
            
            //这两句楼主也没明白啥意思,有知道的请告知哦
            child.add(subscriber);
            child.setProducer(producer);
            
            return subscriber;
        }
    
    	 *
    	 *可以看出创建了一个MergeSubscriber新观察者,它持有上面三个参数,是不是和OnSubscribeMap里的内部类MapSubscriber差不多,
    	 *没错,他也是一个内部类
    	 *可以看到除了一个MergeSubscriber对象外,还有一个MergeProducer对象,他把MergeSubscriber传进了构造方法,到这里,我们的
    	 *源观察者放进了MergeSubscriber,而MergeSubscriber放进了MergeProducer,仅仅是放了进去,啥也没有做呢,再向下看
    	 *subscriber.producer = producer;将新创建的MergeProducer赋值。再往下看。又有add方法,与上面一样将我们的新观察者放进
    	 *了list里面,还不知道有什么卵用,那就看下一句,仅仅设置了一下setProducer,最后返回了这个新创建的观察者,
    	 *
    	 *ok,获得了观察者,在回头看OnSubscribeLift的call方法,它执行了新观察者的MergeSubscriber的onStart方法,其实MergeSubscriber
    	 *是没有onStart方法的,所以,执行的是父类的,然而并没有什么卵用,父类啥也没做
    	 *
    	 *
    	 *
    	 *
    	 *继续 parent.call(st);
    	 *parent是我们2手观察者的OnSubscribe,也就是OnSubscribeMap,终于到了关键的时候啦,
    	 *它将我们新创建的观察者放入了OnSubscribeMap的call方法内
    	 *回顾一下这个方法:
    	 * @Override
        public void call(final Subscriber<? super R> o) {
            MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
            o.add(parent);
            这里source还是源被观察者
            source.unsafeSubscribe(parent);
        }
    	 *在这里,它使用MergeSubscriber又创建了一个MapSubscriber,transformer是使我们自己定义的FuncX,
    	 *他在前面merge(map(funcx))里已经传入,现在终于排上用场啦
    	 *add方法不用管,lz也不知道有何意义,直接看source.unsafeSubscribe(parent);,前面map我们分析过这个东西,
    	 *在执行一遍流程。
    	 *MapSubscriber 3手观察者:持有一个MergeSubscriber观察者,MergeSubscriber持有源观察者和其他两个参数maxConcurrent,delayErrors
    	 *
    	 *
    	 *    public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
            try {
                // new Subscriber so onStart it
                subscriber.onStart();
                // allow the hook to intercept and/or decorate
                RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
                return RxJavaHooks.onObservableReturn(subscriber);
            } catch (Throwable e) {
                // special handling for certain Throwable/Error/Exception types
                Exceptions.throwIfFatal(e);
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r; // NOPMD 
                }
                return Subscriptions.unsubscribed();
            }
        }
    
    	 *看下面这一句
    	 *RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
    	 *我们在上一篇map的时候已经知道执行的是源被观察的onSubscribe的call方法,
    	 *subscriber就是上面持有我们MergeSubscriber观察者的MapSubscriber观察者,
    	 *既然执行我们源观察者的onSubscribe的call方法,那就看我们自己写的吧
    	 *
    		 * 创建一个观察者 泛型类型为Student
    		Observable<Student> observable = Observable
    				.create(new OnSubscribe<Student>() {
    					//自己写的这里面的subscriber是MapSubscriber包含MergeSubscriber的复合观察者,我们这里执行了三次
    					@Override
    					public void call(Subscriber<? super Student> subscriber) {
    						subscriber.onNext(students[0]);
    						subscriber.onNext(students[1]);
    						subscriber.onNext(students[2]);
    						subscriber.onCompleted();
    					}
    				});
    	 *转了一圈又一圈,终于有回到了原点,只不过比map又多了一个MergeSubscriber,他是OperatorMerge的内部类
    	 *下面我们看是怎么执行的
    	 *我们知道subscriber是MapSubscriber,所以首先看MapSubscriber的onNext方法
    	 *
    	 *@Override
            public void onNext(T t) {
                R result;
                
                try {
                mapper:我们的funcx
                    result = mapper.call(t);
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    unsubscribe();
                    onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                    return;
                }
                actual:这里的就是我们调用OnSubscribeMap.call传过来的观察者,这里是MergeSubscriber
                actual.onNext(result);
            }
    	 * 还是和以前一样貌美如花, result = mapper.call(t);这里的t是源观察者调用onNext传的参数,
    	 * 我们使用funcx的call方法将这个参数转换(至于转换成什么,看你喽),这里得转换成一个被观察者对象
    	 * ,得到返回结果后执行传过来的观察者MergeSubscriber的
    	 * onNext方法,并传入我们自己生成的被观察者,这才是关键,so,看看MergeSubscriber里的onNext是怎么执行的,
    	 * 
    	 *  @Override
            public void onNext(Observable<? extends T> t) {
                if (t == null) {
                    return;
                }
                if (t == Observable.empty()) {
                    emitEmpty();
                } else
                if (t instanceof ScalarSynchronousObservable) {
                要是你声明的被观察者是ScalarSynchronousObservable就看着而吧
                    tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
                } else {
                //主要看这儿
                    InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
                    addInner(inner);
                    //这里的t就是我们funcx返回的被观察者Observable
                    t.unsafeSubscribe(inner);
                    emit();
                }
            }
         *
         *可以看出InnerSubscriber内部类,窝草,又一个观察者,fuck,
         *可以看出这个内部的观察者持有一个MergeSubscriber的实例,然后又有一个uniqueId:应该是一个记录的参数,先不管他
         *下面到了addInner(inner);先不管
         *再往下t.unsafeSubscribe(inner);
         *t.unsafeSubscribe(inner)这一句代码是不是有点熟悉,我们在分析map的时候,回顾一下吧
         *在OnSubscribeMap里
         *  @Override
        public void call(final Subscriber<? super R> o) {
            MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
            o.add(parent);
            source.unsafeSubscribe(parent);
        }
         *好啦回顾完毕,看看这个鬼执行了神马,和map的一样,走进去看看,老样子
         *
         *   public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
            try {
                // new Subscriber so onStart it
                subscriber.onStart();
                // allow the hook to intercept and/or decorate
                RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
                return RxJavaHooks.onObservableReturn(subscriber);
            } catch (Throwable e) {
                // special handling for certain Throwable/Error/Exception types
                Exceptions.throwIfFatal(e);
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r; // NOPMD 
                }
                return Subscriptions.unsubscribed();
            }
        }
         *
         *RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);主要看这句
         *我们是用funcx返回的Obsevable来调用的unsafeSubscribe方法,而RxJavaHooks.onObservableStart(this, onSubscribe)
         *这一段就是进行了一下安全判断在返回这个Obserable的onSubscribe,所以我们使用Funcx的call方法获得的Obserable的
         *onSubscribe执行了call方法,具体执行了什么,看你返回的Obserable喽,到此分析完毕
         *
         *
         *好啦 到这里该总结一下流程了
         *
         *1.源观察者执行 subscriber.onNext方法将参数传入
         *2.上面的subscriber其实已经被转义为MapSubscriber的对象了,他持有。。。。
         *3.在MapSubscriber的onNext方法里进行了转换并获得了一个Obserable
         *4.Obserable是我们的FuncX的call方法转换返回的call里面的值是我们源观察者的onNext的传入参数,不懂看1和2
         *5.得到这个结果后,onNext方法里执行了actual.onNext(result);这一句actual是一个MergeSubscriber的实例
         *6.所以actual.onNext(result);执行的是MergeSubscriber里的onNext方法,其实MergeSubscriber还会被包装一下
         *成为一个InnerSubscriber楼主不做介绍
         *7。t.unsafeSubscribe(inner);然后到了这里,
         *8.最后到了RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);这里执行了返回的被观察者的onSubscribe的call方法,
    	 */
    
    	/**
    	 * 设置参数
    	 */
    	private Student[] getParams() {
    		Course course1 = new Course();
    		course1.setName("红楼梦");
    		Course course2 = new Course();
    		course2.setName("水浒传");
    		Course course3 = new Course();
    		course3.setName("西游记");
    		Course course4 = new Course();
    		course4.setName("三国演义");
    		List<Course> courses = new ArrayList<Course>();
    		courses.add(course1);
    		courses.add(course2);
    		courses.add(course3);
    		courses.add(course4);
    
    		Student student1 = new Student();
    		student1.setName("小明");
    		student1.setCourses(courses);
    
    		Student student2 = new Student();
    		student2.setName("小牧");
    		student2.setCourses(courses);
    
    		Student student3 = new Student();
    		student3.setName("小段");
    		student3.setCourses(courses);
    
    		Student[] students = { student1, student2, student3 };
    		return students;
    	}
    
    }
    


RxJava1.0 flatMap方法的源码分析

标签:

原文地址:http://blog.csdn.net/sinat_32955803/article/details/51923288

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!