码迷,mamicode.com
首页 > 编程语言 > 详细

RxJava模仿

时间:2015-07-11 12:06:53      阅读:231      评论:0      收藏:0      [点我收藏+]

标签:

希望,通过动手模仿,了解:
1. RxJava如何实现子流程的串行及其思想
2. Java泛型的使用
 
初步接触RxJava,被它的Observable和Subscriber分解流程所吸引。
举一个例子:泡茶的步骤
1. 烧开水
2. 洗器皿和茶叶
3. 倒水,等一段时间
最后,喝茶
用RxJava来描述
技术分享
 1 Observable<Boolean> obs = Observable.just((Boolean)null)
 2                   .flatMap(new Func1<Boolean, Observable<Boolean>>() {
 3                       @Override
 4                       public Observable<Boolean> call(Boolean v) {
 5                           // 烧开水
 6                           return Observable.just(Boolean.TRUE);
 7                       }
 8                   .flatMap(new Func1<Boolean, Observable<Boolean>>() {
 9                       @Override
10                       public Observable<Boolean> call(Boolean v) {
11                           // 洗器皿和茶叶
12 
13                           return Observable.just(Boolean.TRUE);
14                       }
15                   .flatMap(new Func1<Boolean, Observable<Boolean>>() {
16 
17                       @Override
18                       public Observable<Boolean> call(Boolean v) {
19                           // 倒水
20 
21                           return Observable.just(Boolean.TRUE);
22                       }
23                   }
24 // 一个人
25 obs.subscribe(new Action1<Boolean>() {
26                            // 喝茶
27                        },
28                        new Action1<Throwable>() {
29                            // 中途出现的特殊情况
30                        },
31                        new Action0() {
32                            // 喝完
33                        }
34                   });
35 // 另一个人
36 obs.subscribe(…)
View Code

吸引我的,有如下:
1. 很好的描述步骤
2. 类似生产者与消费者关系
3. Java的泛型提供编译时检查
4. 线程的控制
 
感觉不足,就是错误要统一处理,可以用不同类型Throwable来区分。
 
在阅读Observable.java代码中,发现关键函数 lift 和 compose,它们是串起子流程。
实现后,发现:
1. 代码简短
2. 关键思想,流程分解后,是从前到后串起子流程的(compose),还是从后到前串起(lift)。
开始使用RxJava,迷惑,原因在它是从后到前串起,(代码上就是lift函数),使用闭包,从后面的subscriber,一层层封装,得到全部流程。
3. java泛型 ? super T和 ? extends T的使用,前者说明是只能set,后者说明只能get
4. 从前到后串起子流程的(compose),和 从后到前串起(lift),都能实现相同功能,compose更符合思维,容易接收,lift更简洁,少一层闭包
5. 采用边处理边传递,如同管道;也可以处理后,缓存,再传递。
 
以上仅仅RxJava核心的简单部分,它还有Producer来控制数据量。
 1 public interface OnSubscriber<T> {
 2     public void call(Sub<? super T> sub);
 3 }
 4 
 5 public class Obs<T> {
 6     // 思考点:产生源方式的多样性
 7     private OnSubscriber<T> mOnSubscriber;
 8 
 9     private Obs(OnSubscriber<T> onSubscriber) {
10         mOnSubscriber = onSubscriber;
11     }
12 
13     // 思考点:Sub只有一个
14     // Sub的数据类型只需是Obs的父类
15     public void subscribe(Sub<? super T> sub) {
16         mOnSubscriber.call(sub);
17     }
18 
19     // 思考点:Obs的串行流程,Obs1, Obs2的顺序有 Obs1-->Obs2,Obs2-->Obs1,由功能决定顺序
20     // Sub的串行流程,主要用于数据的整理,跟Obs的串行流程有什么差别呢;----,函数“转换”,与流程串行不相同的操作
21     // 串行逻辑:f2(f1(data)),函数转换:f(method),前者是数据,后者是回调;前者也看作对Obs的转换
22     //
23 
24     // Obs的转换的实现,Obs<R>-->Obs<T>
25     public interface Transform<R, T> {
26         Obs<R> call(Obs<T> obs);
27     }
28     public final <R> Obs<R> compose(Transform<R, T> tr) {
29         return tr.call(this);
30     }
31 
32     // Sub的转换的实现 Sub<R>-->Sub<T>,Sub<R>在Obs<R>中,Sub<T>在Obs<T>,所以需要把Obs转换;
33     public interface Operator<R, T> {
34         // super,由于需onNext,设置数据
35         Sub<? super T> call(Sub<? super R> sub);
36     }
37 
38     // 关键函数
39     // 形象讲,是外面封装多一层,如  Obs<R1>.lift<R2>(sub<R1>).lift<T>(sub<R2>).Sub<T>,其中sub<R1>是Obs<R1>到Obs<R2>流程, sub<R2>是Obs<R2>到Obs<T>流程
40     public final <R> Obs<R> lift(final Operator<R, T> op) {
41         return create(new OnSubscriber<R>() {
42 
43             @Override
44             public void call(Sub<? super R> sub) {
45                 Sub<? super T> tSub = op.call(sub);
46                 mOnSubscriber.call(tSub);
47             }
48         });
49     }
50 
51     // 类似工厂模式
52     public static <T> Obs<T> create(OnSubscriber<T> onSubscriber) {
53         return new Obs<T>(onSubscriber);
54     }
55 
56     public static <T> Obs<T> just(final T t) {
57         return new Obs<T>(new OnSubscriber<T>() {
58 
59             @Override
60             public void call(Sub<? super T> sub) {
61                 sub.onNext(t);
62                 sub.onComplete();
63             }
64         });
65     }
66 
67 }
68 
69 // 思考点,使用interface
70 public class Sub<T> {
71     public void onNext(T data) {
72     }
73 
74     public void onError(Throwable t) {
75     }
76 
77     public void onComplete() {
78     }
79 }

 lift的使用

 1 Obs.just("s1").lift(new Obs.Operator<Boolean, String>() {
 2 
 3     @Override
 4     public Sub<? super String> call(final Sub<? super Boolean> sub) {
 5         // 逻辑代码
 6         return new Sub<String>() {
 7 
 8             @Override
 9             public void onNext(String data) {
10                 // 上面流程的结果,处理,传给下个流程
11                 sub.onNext(data != null);
12             }
13 
14             @Override
15             public void onError(Throwable t) {
16                 sub.onError(t);
17             }
18 
19             @Override
20             public void onComplete() {
21                 sub.onComplete();
22             }
23 
24         };
25 }).subscribe(new Sub<Boolean>() {
26 
27     @Override
28     public void onNext(Boolean data) {
29         // onNext:TRUE
30     }
31 
32     @Override
33     public void onComplete() {
34         // onComplete
35     }
36 
37 });

 

compose的使用

 1 Obs.just("s1").compose(new Obs.Transform<Boolean, String>() {
 2 
 3     @Override
 4     public Obs<Boolean> call(final Obs<String> obs) {
 5         // 与lift效果一样,觉得,compose是从前到后思考,lift是从后到前
 6         // 比lift多一个闭包
 7         return Obs.create(new OnSubscriber<Boolean>() {
 8 
 9             @Override
10             public void call(final Sub<? super Boolean> sub) {
11                 obs.subscribe(new Sub<String>() {
12 
13                     @Override
14                     public void onNext(String data) {
15                         //  边处理数据,边传递
16                         sub.onNext(data != null);
17                     }
18 
19                     @Override
20                     public void onError(Throwable t) {
21                         sub.onError(t);
22                     }
23 
24                     @Override
25                     public void onComplete() {
26                         sub.onComplete();
27                     }
28 
29                 });
30             }
31 
32         });
33     }
34 }).subscribe(new Sub<Boolean>() {
35 
36     @Override
37     public void onNext(Boolean data) {
38         // onNext:TRUE
39     }
40 
41     @Override
42     public void onComplete() {
43         // onComplete
44     }
45 });

 

 

RxJava模仿

标签:

原文地址:http://www.cnblogs.com/shifting/p/4638208.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!