标签:subscript 联系 bool add override 专用 时间间隔 生产者和消费者 iat
RxJava2.0的使用详解
1,初识RxJava
RxJava就是一种用Java语言实现的响应式编程,来创建基于事件的异步程序
RxJava是一个基于事件订阅的异步执行的一个类库,目前比较火的一些技术框架!
参考资料:
Github上RxJava的项目地址:
https://github.com/ReactiveX/RxJava
技术文档Api:
http://reactivex.io/RxJava/javadoc/
RxAndroid,用于 Android 开发:
https://github.com/ReactiveX/RxAndroid
简书博客推荐:
http://www.jianshu.com/p/ba61c047c230
1.1使用前所添加的依赖(build.gradle):
compile ‘io.reactivex.rxjava2:rxjava:2.1.3‘
compile ‘io.reactivex.rxjava2:rxandroid:2.0.1‘
1.2作用:
RxJava的目的就是异步。
RxJava的特点就是可以非常简便的实现异步调用,可以在逻辑复杂的代码逻辑中以比较轻易的方式实现异步调用。随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性
1.3概念:
RxJava是利用观察者模式来实现一些列的操作,所以对于观察者模式中的观察者,被观察者,以及订阅、事件需要有一个了解.
Observable:在观察者模式中称为“被观察者”;
Observer:观察者模式中的“观察者”,可接收Observable发送的数据;
subscribe:订阅,观察者与被观察者,通过Observable的subscribe()方法进行订阅;
Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用,该部分 内容是2.0新增的,后续文章再介绍。Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable.
1.4观察者模式的理解:
A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应.
在程序的观察者模式,观察者不需要时刻盯着被观察者,而是采用注册或者称为订阅的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我!
RxJava 有四个基本概念:
Observable (被观察者)、
Observer (观察者)、
subscribe (订阅)
Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在完成某些操作,获得一些结果后,回调触发事件,即 发出事件来通知 Observer。
关于回调,如果理解则可以跳过这一段,如果不理解,在RxJava中可以简单的理解为:为了方便Observable和Observer交互,在Observable中,将 Observer对象传入,在完成某些操作后调用Observer对象的方法,此时将触发Observer中具体实现的对应方法。
注意:Observer是个接口,Observable是个类。
RxJava中定义的事件方法:
onNext(),普通事件,按照队列依次进行处理.
onComplete(),事件队列完结时调用该方法
onError(),事件处理过程中出现异常时,onError()触发,同时队列终止,不再有事件发出.
onSubscribe(),RxJava 2.0 中新增的,传递参数为Disposable,可用于切断接收事件
让Observable (被观察者)开启子线程执行耗操作,完成耗时操作后,触发回调,通知Observer (观察者)进行主线程UI更新
2,简单使用步骤:
步骤:
创建数据发射源,上游Observable
创建数据接收处,下游Observer
数据源关联接收处,上游衔接下游!
3,Observable
数据发射源,可观察的,被观察的,
Observable有两种形式启动形式:
1热启动Observable任何时候都会发送消息,即使没有任何观察者监听它。
2冷启动Observable只有在至少有一个订阅者的时候才会发送消息
Observable的几种创建方式:
01,just()方式
使用just( ),将创建一个Observable并自动调用onNext( )发射数据。
也就是通过just( )方式 直接触发onNext(),just中传递的参数将直接在Observer的onNext()方法中接收到。
02,fromIterable()方式
使用fromIterable(),遍历集合,发送每个item.多次自动调用onNext()方法,每次传入一个item.
注意:Collection接口是Iterable接口的子接口,所以所有Collection接口的实现类都可以作为Iterable对象直接传入fromIterable() 方法。
03,defer()方式
当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable.
通过Callable中的回调方法call(),决定使用以何种方式来创建这个Observable对象,当订阅后,发送事件.
04,interval( )方式
创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。按照固定时间间隔来调用onNext()方法。
05,timer( )方式
通过此种方式创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟指定时间后,调用onNext()方法。
06,range( )方式,range(x,y)
创建一个发射特定整数序列的Observable,第一个参数x为起始值,第二个y为发送的个数,如果y为0则不发送,y为负数则抛异常。
range(1,5)
上述表示发射1到5的数。即调用5次Next()方法,依次传入1-5数字。
07,repeat( )方式
创建一个Observable,该Observable的事件可以重复调用。
部分方法介绍:
表示下游不关心任何事件,你上游尽管发你的数据
Disposable subscribe()
表示下游只关心onNext事件,其他不管
Disposable subscribe(Consumer<? super T> onNext)
表示下游只关心onNext事件,onError事件
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
表示只关心onNext事件,onError事件,onComplete事件
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
表示处理所有事件
subscribe(Observer<? super T> observer)
4,ObservableEmitter
Emitter是发射器的意思,就是用来发出事件的,它可以发出三种类型的事件
通过调用onNext(T value),发出next事件
通过调用onComplete(),发出complete事件
通过调用onError(Throwable error),发出error事件
注意事项:
onComplete和onError唯一并且互斥
发送多个onComplete, 第一个onComplete接收到,就不再接收了.
发送多个onError, 则收到第二个onError事件会导致程序会崩溃.
不可以随意乱七八糟发射事件,需要满足一定的规则:
上游可以发送无限个onNext, 下游也可以接收无限个onNext.
当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
上游可以不发送onComplete或onError.
最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError
5,Disposable
一次性,它理解成两根管道之间的一个机关, 当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件.
在RxJava中,用它来切断Observer(观察者)与Observable(被观察者)之间的连接,当调用它的dispose()方法时, 它就会将Observer(观察者)与Observable(被观察者)之间的连接切断, 从而导致Observer(观察者)收不到事件。
注意: 调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件
我们让上游依次发送1,2,3,complete,4,在下游收到第二个事件之后, 切断水管, 看看运行结果
Disposable的对象通过观察者获得,具体分为两种方式
1,Observer接口
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//此方法接收到Disposable的实例!
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
通过创建Observer(观察者)接口,重写onSubscribe方法,当订阅后,建立与Observable(被观察者)的联系后,在onSubscribe(Disposable d)方法中便可以获得Disposable对象。
2.Consumer等其他函数式接口
Disposable disposable = Observable.just("你好").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});
当调用Observable的subscribe()方法后直接返回一个Disposable 对象
6,线程控制——Scheduler
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。
Schedulers.immediate():
直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。
Schedulers.newThread():
总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O
操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
Schedulers.computation():
计算所使用的Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation()中,否则 I/O 操作的等待时间会浪费 CPU。
AndroidSchedulers.mainThread(),
Android专用线程,指定操作在主线程运行。
如何切换线程呢?RxJava中提供了两个方法:
subscribeOn() 和 observeOn() ,
两者的不同点在于:
subscribeOn(): 指定subscribe()订阅所发生的线程,或者叫做事件产生的线程。
observeOn(): 指定Observer所运行在的线程,即onNext()执行的线程。或者叫做事件消费的线程。
7,以Consumer为例,我们可以实现简便式的观察者模式
Observable.just("hello").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
其中Consumer中的accept()方法接收一个来自Observable的单个值。Consumer就是一个观察者。其他函数式接口可以类似应用
8,RxJava中的操作符
01,操作符就是用于在Observable和最终的Observer之间,通过转换Observable为其他观察者对象的过程,修改发出的事件,
最终将最简洁的数据传递给Observer对象.
每次调用一次操作符,就进行一次观察者对象的改变,同时将需要传递的数据进行转变,最终Observer对象获得想要的数据。
以网络加载为例,我们通过Observable开启子线程,进行一些网络请求获取数据的操作,获得到网络数据后,然后通过操作符进行转换,获得我们想要的形式的数据,然后传递给Observer对象
02,比较常用的操作符:
map()操作符
map()操作符,就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。
举例:
Observable<Integer> observable = Observable
.just("hello")
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return s.length();
}
});
flatMap()操作符
flatMap()对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想 要的数据形式。它可以返回任何它想返回的Observable对象。
举例:
Observable.just(list)
.flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
});
filter()操作符
filter()操作符根据它的test()方法中,根据自己想过滤的数据加入相应的逻辑判断,返回true则表示数据满足条件,返回false则表示数据需要被过滤。
最后过滤出的数据将加入到新的Observable对象中,方便传递给Observer想要的数据形式。
举例:
Observable
.just(list)
.flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).filter(new Predicate<Object>() {
@Override
public boolean test(Object s) throws Exception {
String newStr = (String) s;
if (newStr.charAt(5) - ‘0‘ > 5) {
return true;
}
return false;
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println((String)o);
}
});
take()操作符
输出最多指定数量的结果.(接收指定数量的结果)
举例:
Observable.just(new ArrayList<String>(){
{
for (int i = 0; i < 8; i++) {
add("data"+i);
}
}
}).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).take(5).subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
DemonstrateUtil.showLogResult(s.toString());
}
});
doOnNext()
允许我们在每次输出一个元素之前做一些额外的事情
举例:
Observable.just(new ArrayList<String>(){
{
for (int i = 0; i < 6; i++) {
add("data"+i);
}
}
}).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).take(5).doOnNext(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
DemonstrateUtil.showLogResult("额外的准备工作!");
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
DemonstrateUtil.showLogResult(s.toString());
}
});
8,Flowable的理解
Flowable是一个被观察者,与Subscriber(观察者)配合使用,解决Backpressure问题
Backpressure(背压)。所谓背压,即生产者的速度大于消费者的速度带来的问题。
什么情况下才会产生Backpressure问题?
1.如果生产者和消费者在一个线程的情况下,无论生产者的生产速度有多快,每生产一个事件都会通知消费者,等待消费者消费完毕,再生产下一个事件。
所以在这种情况下,根本不存在Backpressure问题。即同步情况下,Backpressure问题不存在。
2.如果生产者和消费者不在同一线程的情况下,如果生产者的速度大于消费者的速度,就会产生Backpressure问题。
即异步情况下,Backpressure问题才会存在。
现象演示说明:
被观察者是事件的生产者,观察者是事件的消费者.假如生产者无限生成事件,而消费者以很缓慢的节奏来消费事件,会造成事件无限堆积,形成背压,最后造成OOM!
Flowable悠然而生,专门用来处理这类问题。
Flowable是为了应对Backpressure而产生的。Flowable是一个被观察者,
与Subscriber(观察者)配合使用,解决Backpressure问题。
注意:处理Backpressure的策略仅仅是处理Subscriber接收事件的方式,并不影响Flowable发送事件的方法。
即使采用了处理Backpressure的策略,Flowable原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是Subscriber接收事件的方式。
处理Backpressure问题的策略,或者来解决Backpressure问题
BackpressureStrategy.ERROR
如果缓存池溢出,就会立刻抛出MissingBackpressureException异常
request()用来向生产者申请可以消费的事件数量,这样我们便可以根据本身的消费能力进行消费事件.
虽然并不限制向request()方法中传入任意数字,但是如果消费者并没有这么多的消费能力,依旧会造成资源浪费,最后产生OOM
at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:33)
在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。
无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件。
当然如果本身并没有这么多事件需要发送,则不会存128个事件。
应用举例:
BackpressureStrategy.BUFFER
是把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存更多的数据.
消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存.
注意:
这种方式任然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。
BUFFER要慎用
BackpressureStrategy.DROP
顾名思义,当消费者处理不了事件,就丢弃!
例如,当数据源创建了200个事件,先不进行消费临时进行缓存实际缓存128个,我们第一次申请消费了100个,再次申请消费100个,
那么实际只消费了128个,而其余的72个被丢弃了!
BackpressureStrategy.LATEST
LATEST与DROP功能基本一致,当消费者处理不了事件,就丢弃!
唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。
例如,当数据源创建了200个事件,先不进行消费临时进行缓存,我们第一次申请消费了100个,再次申请消费100个,
那么实际只消费了129个,而其余的71个被丢弃了,但是第200个(最后一个)会被消费.
BackpressureStrategy.MISSING
生产的事件没有进行缓存和丢弃,下游接收到的事件必须进行消费或者处理!
在RxJava中会经常遇到一种情况就是被观察者发送消息十分迅速以至于观察者不能及时的响应这些消息
举例:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
while (true){
e.onNext(1);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
System.out.println(integer);
}
});
被观察者是事件的生产者,观察者是事件的消费者。上述例子中可以看出生产者无限生成事件,而消费者每2秒才能消费一个事件,这会造成事件无限堆积,最后造成OOM。
Flowable就是由此产生,专门用来处理这类问题
代码实现:
1 public class RxJavaDemo1Activity extends AppCompatActivity implements View.OnClickListener { 2 3 protected Button btnSend1; 4 protected Button btnSend2; 5 protected Button btnSend3; 6 protected Button btnSend4; 7 protected Button btnSend5; 8 protected Button btnSend6; 9 10 @Override 11 protected void onCreate(Bundle savedInstanceState) { 12 super.onCreate(savedInstanceState); 13 super.setContentView(R.layout.activity_rx_java_demo1); 14 initView(); 15 } 16 17 @Override 18 public void onClick(View view) { 19 if (view.getId() == R.id.btn_send1) { 20 test1();//普通使用 21 } else if (view.getId() == R.id.btn_send2) { 22 test2();//链式调用 23 } else if (view.getId() == R.id.btn_send3) { 24 test3();//发送中,中断. 25 } else if (view.getId() == R.id.btn_send4) { 26 test4();//只关心onnext事件的操作 27 } else if (view.getId() == R.id.btn_send5) { 28 test5();//几种被观察者的创建方式 29 } else if (view.getId() == R.id.btn_send6) { 30 test6();//常用的操作符 31 } 32 } 33 34 private void test6() { 35 DialogUtil.showListDialog(this, "rxjava的操作符号使用", new String[]{ 36 "0map()操作符", 37 "1flatMap()操作符", 38 "2filter()操作符", 39 "3take()操作符", 40 "4doOnNext()操作符", 41 }, new DialogInterface.OnClickListener() { 42 @Override 43 public void onClick(DialogInterface dialog, int which) { 44 switch (which) { 45 case 0: 46 map0(); 47 break; 48 case 1: 49 map1(); 50 break; 51 case 2: 52 map2(); 53 break; 54 case 3: 55 map3(); 56 break; 57 case 4: 58 map4(); 59 break; 60 } 61 } 62 }); 63 } 64 65 private void map4() { 66 Observable.just(new ArrayList<String>(){ 67 { 68 for (int i = 0; i < 6; i++) { 69 add("data"+i); 70 } 71 } 72 }).flatMap(new Function<List<String>, ObservableSource<?>>() { 73 @Override 74 public ObservableSource<?> apply(List<String> strings) throws Exception { 75 return Observable.fromIterable(strings); 76 } 77 }).take(5).doOnNext(new Consumer<Object>() { 78 @Override 79 public void accept(Object o) throws Exception { 80 DemonstrateUtil.showLogResult("额外的准备工作!"); 81 } 82 }).subscribe(new Consumer<Object>() { 83 @Override 84 public void accept(Object s) throws Exception { 85 DemonstrateUtil.showLogResult(s.toString()); 86 } 87 }); 88 } 89 90 private void map3() { 91 Observable.just(new ArrayList<String>(){ 92 { 93 for (int i = 0; i < 8; i++) { 94 add("data"+i); 95 } 96 } 97 }).flatMap(new Function<List<String>, ObservableSource<?>>() { 98 @Override 99 public ObservableSource<?> apply(List<String> strings) throws Exception { 100 return Observable.fromIterable(strings); 101 } 102 }).take(10).subscribe(new Consumer<Object>() { 103 @Override 104 public void accept(Object s) throws Exception { 105 DemonstrateUtil.showLogResult(s.toString()); 106 } 107 }); 108 } 109 110 private void map2() { 111 Observable 112 .just(new ArrayList<String>(){ 113 { 114 for (int i = 0; i < 5; i++) { 115 add("data"+i); 116 } 117 } 118 }) 119 .flatMap(new Function<List<String>, ObservableSource<?>>() { 120 @Override 121 public ObservableSource<?> apply(List<String> strings) throws Exception { 122 return Observable.fromIterable(strings); 123 } 124 }).filter(new Predicate<Object>() { 125 @Override 126 public boolean test(Object s) throws Exception { 127 String newStr = (String) s; 128 if (newStr.contains("3")){ 129 return true; 130 } 131 return false; 132 } 133 }).subscribe(new Consumer<Object>() { 134 @Override 135 public void accept(Object o) throws Exception { 136 DemonstrateUtil.showLogResult((String)o); 137 } 138 }); 139 } 140 141 private void map1() { 142 Observable.just(new ArrayList<String>(){ 143 { 144 for (int i = 0; i < 3; i++) { 145 add("data"+i); 146 } 147 } 148 }).flatMap(new Function<List<String>, ObservableSource<?>>() { 149 @Override 150 public ObservableSource<?> apply(List<String> strings) throws Exception { 151 return Observable.fromIterable(strings); 152 } 153 }).subscribe(new Observer<Object>() { 154 @Override 155 public void onSubscribe(Disposable d) { 156 157 } 158 159 @Override 160 public void onNext(Object o) { 161 DemonstrateUtil.showLogResult("flatMap转换后,接收到的"+o); 162 } 163 164 @Override 165 public void onError(Throwable e) { 166 167 } 168 169 @Override 170 public void onComplete() { 171 172 } 173 }); 174 } 175 176 private void map0() { 177 Observable.just("hellorxjava") 178 .map(new Function<String, Integer>() { 179 @Override 180 public Integer apply(String s) throws Exception { 181 return s.length(); 182 } 183 }).subscribe(new Observer<Integer>() { 184 @Override 185 public void onSubscribe(Disposable d) { 186 187 } 188 189 @Override 190 public void onNext(Integer integer) { 191 DemonstrateUtil.showLogResult("接收到被转换的数据结果:"+integer); 192 } 193 194 @Override 195 public void onError(Throwable e) { 196 197 } 198 199 @Override 200 public void onComplete() { 201 202 } 203 }); 204 } 205 206 private void test5() { 207 DialogUtil.showListDialog(this, "rxjava的其他操作", new String[]{ 208 "0just()方式创建Observable", 209 "1fromIterable()方式创建Observable", 210 "2defer()方式创建Observable", 211 "3interval( )方式创建Observable", 212 "4timer( )方式创建Observable", 213 "5range( )方式创建Observable", 214 "6repeat( )方式创建Observable", 215 }, new DialogInterface.OnClickListener() { 216 @Override 217 public void onClick(DialogInterface dialog, int which) { 218 switch (which) { 219 case 0: 220 other0(); 221 break; 222 case 1: 223 other1(); 224 break; 225 case 2: 226 other2(); 227 break; 228 case 3: 229 other3(); 230 break; 231 case 4: 232 other4(); 233 break; 234 case 5: 235 other5(); 236 break; 237 case 6: 238 other6(); 239 break; 240 } 241 } 242 }); 243 } 244 245 private void other6() { 246 Observable.just(123).repeat().subscribe(new Observer<Integer>() { 247 @Override 248 public void onSubscribe(Disposable d) { 249 250 } 251 252 @Override 253 public void onNext(Integer integer) { 254 DemonstrateUtil.showLogResult("重复integer" + integer); 255 } 256 257 @Override 258 public void onError(Throwable e) { 259 260 } 261 262 @Override 263 public void onComplete() { 264 265 } 266 }); 267 } 268 269 private void other5() { 270 Observable.range(1, 5).subscribe(new Observer<Integer>() { 271 @Override 272 public void onSubscribe(Disposable d) { 273 274 } 275 276 @Override 277 public void onNext(Integer integer) { 278 DemonstrateUtil.showLogResult("连续收到:" + integer); 279 } 280 281 @Override 282 public void onError(Throwable e) { 283 284 } 285 286 @Override 287 public void onComplete() { 288 289 } 290 }); 291 } 292 293 private void other4() { 294 Observable.timer(5, TimeUnit.SECONDS).subscribe(new Observer<Long>() { 295 @Override 296 public void onSubscribe(Disposable d) { 297 298 } 299 300 @Override 301 public void onNext(Long aLong) { 302 DemonstrateUtil.showLogResult("延迟5s后调用了:onNext"); 303 } 304 305 @Override 306 public void onError(Throwable e) { 307 308 } 309 310 @Override 311 public void onComplete() { 312 313 } 314 }); 315 } 316 317 private void other3() { 318 Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() { 319 @Override 320 public void onSubscribe(Disposable d) { 321 322 } 323 324 @Override 325 public void onNext(Long aLong) { 326 DemonstrateUtil.showLogResult("数字是:" + aLong); 327 //DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this,"数字是:"+aLong); 328 } 329 330 @Override 331 public void onError(Throwable e) { 332 333 } 334 335 @Override 336 public void onComplete() { 337 338 } 339 }); 340 } 341 342 private void other2() { 343 Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() { 344 @Override 345 public ObservableSource<? extends String> call() throws Exception { 346 return Observable.just("hello,defer"); 347 } 348 }); 349 350 //上游衔接下游! 351 observable.subscribe(new Observer<String>() { 352 @Override 353 public void onSubscribe(Disposable d) { 354 355 } 356 357 @Override 358 public void onNext(String s) { 359 DemonstrateUtil.showLogResult(s); 360 DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s); 361 } 362 363 @Override 364 public void onError(Throwable e) { 365 366 } 367 368 @Override 369 public void onComplete() { 370 371 } 372 }); 373 } 374 375 private void other1() { 376 Observable.fromIterable(new ArrayList<String>() { 377 { 378 for (int i = 0; i < 5; i++) { 379 add("Hello," + i); 380 } 381 } 382 }).subscribe(new Observer<String>() { 383 @Override 384 public void onSubscribe(Disposable d) { 385 386 } 387 388 @Override 389 public void onNext(String s) { 390 DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s); 391 DemonstrateUtil.showLogResult(s); 392 } 393 394 @Override 395 public void onError(Throwable e) { 396 397 } 398 399 @Override 400 public void onComplete() { 401 402 } 403 }); 404 } 405 406 private void other0() { 407 Observable.just("hello,you hao!").subscribe(new Observer<String>() { 408 @Override 409 public void onSubscribe(Disposable d) { 410 411 } 412 413 @Override 414 public void onNext(String s) { 415 DemonstrateUtil.showLogResult(s); 416 DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s); 417 } 418 419 @Override 420 public void onError(Throwable e) { 421 422 } 423 424 @Override 425 public void onComplete() { 426 427 } 428 }); 429 } 430 431 private void test4() { 432 Observable.create(new ObservableOnSubscribe<Integer>() { 433 @Override 434 public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { 435 DemonstrateUtil.showLogResult("emitter 1"); 436 emitter.onNext(1); 437 438 DemonstrateUtil.showLogResult("emitter 2"); 439 emitter.onNext(2); 440 441 DemonstrateUtil.showLogResult("emitter 3"); 442 emitter.onNext(3); 443 444 DemonstrateUtil.showLogResult("complete"); 445 emitter.onComplete(); 446 447 DemonstrateUtil.showLogResult("emitter 4"); 448 emitter.onNext(4); 449 } 450 }).subscribe(new Consumer<Integer>() { 451 @Override 452 public void accept(Integer integer) throws Exception { 453 DemonstrateUtil.showLogResult("accept:" + integer); 454 } 455 }); 456 } 457 458 private void test3() { 459 Observable.create(new ObservableOnSubscribe<Integer>() { 460 @Override 461 public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { 462 DemonstrateUtil.showLogResult("emitter 1"); 463 emitter.onNext(1); 464 465 DemonstrateUtil.showLogResult("emitter 2"); 466 emitter.onNext(2); 467 468 DemonstrateUtil.showLogResult("emitter 3"); 469 emitter.onNext(3); 470 471 DemonstrateUtil.showLogResult("complete"); 472 emitter.onComplete(); 473 474 DemonstrateUtil.showLogResult("emitter 4"); 475 emitter.onNext(4); 476 } 477 }).subscribe(new Observer<Integer>() { 478 private Disposable mDisposable; 479 private int i; 480 481 @Override 482 public void onSubscribe(Disposable d) { 483 DemonstrateUtil.showLogResult("subscribe"); 484 mDisposable = d; 485 } 486 487 @Override 488 public void onNext(Integer value) { 489 DemonstrateUtil.showLogResult("onNext:" + value); 490 i++; 491 if (i == 2) { 492 DemonstrateUtil.showLogResult("dispose:" + value); 493 mDisposable.dispose(); 494 DemonstrateUtil.showLogResult("isDisposed : " + mDisposable.isDisposed()); 495 } 496 } 497 498 @Override 499 public void onError(Throwable e) { 500 DemonstrateUtil.showLogResult("error:"); 501 } 502 503 @Override 504 public void onComplete() { 505 DemonstrateUtil.showLogResult("complete"); 506 } 507 }); 508 509 } 510 511 private void test2() { 512 //链式调用 513 Observable.create(new ObservableOnSubscribe<Integer>() { 514 @Override 515 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 516 e.onNext(1); 517 e.onNext(2); 518 e.onNext(3); 519 } 520 }).subscribe(new Observer<Integer>() { 521 522 @Override 523 public void onSubscribe(Disposable d) { 524 DemonstrateUtil.showLogResult("onSubscribe"); 525 } 526 527 @Override 528 public void onNext(Integer integer) { 529 DemonstrateUtil.showLogResult("onNext-->integer" + integer); 530 } 531 532 @Override 533 public void onError(Throwable e) { 534 DemonstrateUtil.showLogResult("onError"); 535 } 536 537 @Override 538 public void onComplete() { 539 DemonstrateUtil.showLogResult("onComplete"); 540 } 541 }); 542 } 543 544 private void test1() { 545 546 //创建上游,数据发射源! 547 //ObservableOnSubscribe对象作为参数,它的作用相当于一个计划表,当 Observable被订阅的时候, 548 // ObservableOnSubscribe的subscribe()方法会自动被调用,事件序列就会依照设定依次触发 549 //ObservableEmitter,发射器,触发事件. 550 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { 551 552 @Override 553 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 554 e.onNext(1); 555 e.onNext(2); 556 e.onNext(3); 557 } 558 }); 559 560 //创建下游,数据接收处! 561 Observer<Integer> observer = new Observer<Integer>() { 562 563 @Override 564 public void onSubscribe(Disposable d) { 565 DemonstrateUtil.showLogResult("onSubscribe"); 566 } 567 568 @Override 569 public void onNext(Integer integer) { 570 DemonstrateUtil.showLogResult("onNext--integer" + integer); 571 } 572 573 @Override 574 public void onError(Throwable e) { 575 DemonstrateUtil.showLogResult("onError"); 576 } 577 578 @Override 579 public void onComplete() { 580 DemonstrateUtil.showLogResult("onComplete"); 581 } 582 }; 583 584 //数据源连接接收处,上游衔接下游! 585 //只有当上游和下游建立连接之后, 上游才会开始发送事件 586 observable.subscribe(observer); 587 } 588 589 private void initView() { 590 btnSend1 = (Button) findViewById(R.id.btn_send1); 591 btnSend1.setOnClickListener(RxJavaDemo1Activity.this); 592 btnSend2 = (Button) findViewById(R.id.btn_send2); 593 btnSend2.setOnClickListener(RxJavaDemo1Activity.this); 594 btnSend3 = (Button) findViewById(R.id.btn_send3); 595 btnSend3.setOnClickListener(RxJavaDemo1Activity.this); 596 btnSend4 = (Button) findViewById(R.id.btn_send4); 597 btnSend4.setOnClickListener(RxJavaDemo1Activity.this); 598 btnSend5 = (Button) findViewById(R.id.btn_send5); 599 btnSend5.setOnClickListener(RxJavaDemo1Activity.this); 600 btnSend6 = (Button) findViewById(R.id.btn_send6); 601 btnSend6.setOnClickListener(RxJavaDemo1Activity.this); 602 } 603 }
1 public class RxJavaDemo2Activity extends AppCompatActivity implements View.OnClickListener { 2 3 protected Button btn; 4 protected ImageView iv; 5 6 @Override 7 protected void onCreate(Bundle savedInstanceState) { 8 super.onCreate(savedInstanceState); 9 super.setContentView(R.layout.activity_rx_java_demo2); 10 initView(); 11 } 12 13 @Override 14 public void onClick(View view) { 15 if (view.getId() == R.id.btn) { 16 DialogUtil.showListDialog(this, "rxJava操作!", new String[]{ 17 "0发送事件io线程并变换主线程接收", 18 "1子线程发送事件主线程接收", 19 "2默认线程发送事件默认线程接收", 20 }, new DialogInterface.OnClickListener() { 21 @Override 22 public void onClick(DialogInterface dialog, int which) { 23 switch (which) { 24 case 0: 25 show0(); 26 break; 27 case 1: 28 show1(); 29 break; 30 case 2: 31 show2(); 32 break; 33 } 34 } 35 }); 36 } 37 } 38 39 private void show2() { 40 Observable.create(new ObservableOnSubscribe<Integer>() { 41 @Override 42 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 43 DemonstrateUtil.showLogResult("发送的线程名称:" + Thread.currentThread().getName()); 44 DemonstrateUtil.showLogResult("发送的线程id:" + Thread.currentThread().getId()); 45 46 DemonstrateUtil.showLogResult("发送的数据:" + 1); 47 e.onNext(1); 48 } 49 }).subscribe(new Consumer<Integer>() { 50 @Override 51 public void accept(Integer integer) throws Exception { 52 DemonstrateUtil.showLogResult("接收的线程:" + Thread.currentThread().getName()); 53 DemonstrateUtil.showLogResult("接收的线程id:" + Thread.currentThread().getId()); 54 DemonstrateUtil.showLogResult("接收到的数据:-integer:" + integer); 55 } 56 }); 57 } 58 59 private void show1() { 60 Observable.create(new ObservableOnSubscribe<Integer>() { 61 @Override 62 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 63 DemonstrateUtil.showLogResult("发送的线程名称:" + Thread.currentThread().getName()); 64 DemonstrateUtil.showLogResult("发送的线程id:" + Thread.currentThread().getId()); 65 66 DemonstrateUtil.showLogResult("发送的数据:" + 1); 67 e.onNext(1); 68 } 69 }).subscribeOn(Schedulers.newThread()) 70 .observeOn(AndroidSchedulers.mainThread()) 71 .subscribe(new Consumer<Integer>() { 72 @Override 73 public void accept(Integer integer) throws Exception { 74 DemonstrateUtil.showLogResult("接收的线程:" + Thread.currentThread().getName()); 75 DemonstrateUtil.showLogResult("接收的线程id:" + Thread.currentThread().getId()); 76 DemonstrateUtil.showLogResult("接收到的数据:-integer:" + integer); 77 } 78 }); 79 } 80 81 private void show0() { 82 Observable.create(new ObservableOnSubscribe<Integer>() { 83 @Override 84 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 85 DemonstrateUtil.showLogResult("所在的线程:", Thread.currentThread().getName()); 86 DemonstrateUtil.showLogResult("发送的数据:", 1 + ""); 87 e.onNext(1); 88 } 89 }).subscribeOn(Schedulers.io()) 90 .observeOn(AndroidSchedulers.mainThread()) 91 .subscribe(new Consumer<Integer>() { 92 @Override 93 public void accept(Integer integer) throws Exception { 94 DemonstrateUtil.showLogResult("所在的线程:", Thread.currentThread().getName()); 95 DemonstrateUtil.showLogResult("接收到的数据:", "integer:" + integer); 96 } 97 }); 98 } 99 100 private void initView() { 101 btn = (Button) findViewById(R.id.btn); 102 btn.setOnClickListener(RxJavaDemo2Activity.this); 103 iv = (ImageView) findViewById(R.id.iv); 104 } 105 }
1 public class RxJavaDemo3Activity extends AppCompatActivity implements View.OnClickListener { 2 3 protected Button btnBackpressure; 4 private Flowable mFlowable; 5 private Subscriber mSubscriber; 6 private Subscription mSubscription; 7 private Flowable flowableLATEST; 8 private Subscriber subscriberLatest; 9 private Subscription subscriptionLatest; 10 11 @Override 12 protected void onCreate(Bundle savedInstanceState) { 13 super.onCreate(savedInstanceState); 14 super.setContentView(R.layout.activity_rx_java_demo3); 15 initView(); 16 init4(); 17 init6(); 18 } 19 20 private void init6() { 21 flowableLATEST = Flowable.create(new FlowableOnSubscribe<Integer>() { 22 @Override 23 public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { 24 25 for (int i = 1; i<=200; i++) { 26 emitter.onNext(i); 27 DemonstrateUtil.showLogResult("LATEST生产onNext:"+i); 28 } 29 } 30 }, BackpressureStrategy.LATEST); 31 32 //mSubscription = s; 33 subscriberLatest = new Subscriber<Integer>() { 34 @Override 35 public void onSubscribe(Subscription s) { 36 subscriptionLatest = s; 37 s.request(100); 38 } 39 40 @Override 41 public void onNext(Integer integer) { 42 DemonstrateUtil.showLogResult("Latest消费onNext:" + integer); 43 } 44 45 @Override 46 public void onError(Throwable t) { 47 DemonstrateUtil.showLogResult("onError"); 48 DemonstrateUtil.showLogResult(t.getMessage()); 49 DemonstrateUtil.showLogResult(t.toString()); 50 } 51 52 @Override 53 public void onComplete() { 54 DemonstrateUtil.showLogResult("onComplete"); 55 } 56 }; 57 } 58 59 private void init4() { 60 mFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() { 61 @Override 62 public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { 63 64 for (int i = 1; i <= 200; i++) { 65 emitter.onNext(i); 66 DemonstrateUtil.showLogResult("生产onNext:"+i); 67 } 68 } 69 }, BackpressureStrategy.DROP); 70 71 //mSubscription = s; 72 mSubscriber = new Subscriber<Integer>() { 73 @Override 74 public void onSubscribe(Subscription s) { 75 mSubscription = s; 76 s.request(100); 77 } 78 79 @Override 80 public void onNext(Integer integer) { 81 DemonstrateUtil.showLogResult("消费onNext:" + integer); 82 } 83 84 @Override 85 public void onError(Throwable t) { 86 DemonstrateUtil.showLogResult("onError"); 87 DemonstrateUtil.showLogResult(t.getMessage()); 88 DemonstrateUtil.showLogResult(t.toString()); 89 } 90 91 @Override 92 public void onComplete() { 93 DemonstrateUtil.showLogResult("onComplete"); 94 } 95 }; 96 97 } 98 99 100 @Override 101 public void onClick(View view) { 102 if (view.getId() == R.id.btn_backpressure) { 103 DialogUtil.showListDialog(this, "Flowable的理解使用", new String[]{ 104 "0事件堆积现象", 105 "1正常使用策略ERROR!", 106 "2使用策略ERROR出现的异常!", 107 "3使用策略BUFFER,更大的缓存池", 108 "4使用策略DROP,事件关联100", 109 "5使用策略DROP,再申请100", 110 "6使用策略LATEST,事件关联100", 111 "7使用策略LATEST,再申请100", 112 "8使用策略MISSING", 113 }, new DialogInterface.OnClickListener() { 114 @Override 115 public void onClick(DialogInterface dialog, int which) { 116 switch (which) { 117 case 0: 118 show0(); 119 break; 120 case 1: 121 show1(); 122 break; 123 case 2: 124 show2(); 125 break; 126 case 3: 127 show3(); 128 break; 129 case 4: 130 show4(); 131 break; 132 case 5: 133 show5(); 134 break; 135 case 6: 136 show6(); 137 break; 138 case 7: 139 show7(); 140 break; 141 case 8: 142 show8(); 143 break; 144 } 145 } 146 }); 147 } 148 } 149 150 private void show8() { 151 Flowable.create(new FlowableOnSubscribe<Integer>() { 152 @Override 153 public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { 154 for (int i = 0; i < 200; i++) { 155 DemonstrateUtil.showLogResult("MISSING-生成emitter" + i); 156 emitter.onNext(i); 157 } 158 } 159 }, BackpressureStrategy.MISSING).subscribeOn(Schedulers.io()) 160 .observeOn(AndroidSchedulers.mainThread()) 161 .subscribe(new Subscriber<Integer>() { 162 @Override 163 public void onSubscribe(Subscription s) { 164 //mSubscription = s; 165 //s.request(0); 166 } 167 168 @Override 169 public void onNext(Integer integer) { 170 DemonstrateUtil.showLogResult("MISSING-消费onNext" + integer); 171 } 172 173 @Override 174 public void onError(Throwable t) { 175 DemonstrateUtil.showLogResult("onError" + t.getMessage()); 176 DemonstrateUtil.showLogResult("onError" + t.toString()); 177 t.printStackTrace(); 178 } 179 180 @Override 181 public void onComplete() { 182 DemonstrateUtil.showLogResult("onComplete"); 183 } 184 }); 185 } 186 187 private void show7() { 188 subscriptionLatest.request(100); 189 } 190 191 private void show6() { 192 flowableLATEST.subscribeOn(Schedulers.io()) 193 .observeOn(AndroidSchedulers.mainThread()) 194 .subscribe(subscriberLatest); 195 } 196 197 private void show5() { 198 //128-100-100= -72. 199 mSubscription.request(100); 200 } 201 202 private void show4() { 203 mFlowable.subscribeOn(Schedulers.io()) 204 .observeOn(AndroidSchedulers.mainThread()) 205 .subscribe(mSubscriber); 206 } 207 208 209 private void show3() { 210 Flowable.create(new FlowableOnSubscribe<Integer>() { 211 @Override 212 public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { 213 for (int i = 0; i < 200; i++) { 214 DemonstrateUtil.showLogResult("emitter" + i); 215 emitter.onNext(i); 216 } 217 } 218 }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()) 219 .observeOn(AndroidSchedulers.mainThread()) 220 .subscribe(new Subscriber<Integer>() { 221 @Override 222 public void onSubscribe(Subscription s) { 223 //mSubscription = s; 224 //s.request(0); 225 } 226 227 @Override 228 public void onNext(Integer integer) { 229 DemonstrateUtil.showLogResult("onNext" + integer); 230 } 231 232 @Override 233 public void onError(Throwable t) { 234 DemonstrateUtil.showLogResult("onError" + t.getMessage()); 235 DemonstrateUtil.showLogResult("onError" + t.toString()); 236 t.printStackTrace(); 237 } 238 239 @Override 240 public void onComplete() { 241 DemonstrateUtil.showLogResult("onComplete"); 242 } 243 }); 244 } 245 246 private void show2() { 247 Flowable.create(new FlowableOnSubscribe<Integer>() { 248 @Override 249 public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { 250 for (int i = 0; i < 200; i++) { 251 DemonstrateUtil.showLogResult("emitter" + i); 252 emitter.onNext(i); 253 } 254 } 255 }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) 256 .observeOn(AndroidSchedulers.mainThread()) 257 .subscribe(new Subscriber<Integer>() { 258 @Override 259 public void onSubscribe(Subscription s) { 260 //mSubscription = s; 261 //s.request(0); 262 } 263 264 @Override 265 public void onNext(Integer integer) { 266 DemonstrateUtil.showLogResult("onNext" + integer); 267 } 268 269 @Override 270 public void onError(Throwable t) { 271 DemonstrateUtil.showLogResult("onError" + t.getMessage()); 272 DemonstrateUtil.showLogResult("onError" + t.toString()); 273 t.printStackTrace(); 274 } 275 276 @Override 277 public void onComplete() { 278 DemonstrateUtil.showLogResult("onComplete"); 279 } 280 }); 281 } 282 283 private void show1() { 284 Flowable.create(new FlowableOnSubscribe<Integer>() { 285 @Override 286 public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { 287 for (int i = 0; i < 127; i++) {//128--- 0--->126 288 DemonstrateUtil.showLogResult("emitter " + i); 289 emitter.onNext(i); 290 } 291 DemonstrateUtil.showLogResult("emitter complete"); 292 emitter.onComplete(); 293 } 294 }, BackpressureStrategy.ERROR) //增加了一个参数,设置处理策略. 295 .subscribeOn(Schedulers.io()) 296 .observeOn(AndroidSchedulers.mainThread()) 297 .subscribe(new Subscriber<Integer>() { 298 @Override 299 public void onSubscribe(Subscription s) { 300 DemonstrateUtil.showLogResult("onSubscribe"); 301 //用来向生产者申请可以消费的事件数量,这样我们便可以根据本身的消费能力进行消费事件. 302 s.request(Long.MAX_VALUE); 303 } 304 305 @Override 306 public void onNext(Integer integer) { 307 DemonstrateUtil.showLogResult("onNext: " + integer); 308 } 309 310 @Override 311 public void onError(Throwable t) { 312 DemonstrateUtil.showLogResult("onError: " + t.getMessage()); 313 DemonstrateUtil.showLogResult("onError: " + t.toString()); 314 t.printStackTrace(); 315 } 316 317 @Override 318 public void onComplete() { 319 DemonstrateUtil.showLogResult("onComplete: "); 320 } 321 }); 322 } 323 324 private void show0() { 325 Observable.create(new ObservableOnSubscribe<Integer>() { 326 @Override 327 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 328 while (true) { 329 for (int i = 0; i < 129; i++) { 330 e.onNext(1); 331 } 332 } 333 } 334 }).subscribeOn(Schedulers.io()) 335 .observeOn(AndroidSchedulers.mainThread()) 336 .subscribe(new Consumer<Integer>() { 337 @Override 338 public void accept(Integer integer) throws Exception { 339 Thread.sleep(5000); 340 DemonstrateUtil.showLogResult("接受到" + integer); 341 } 342 }); 343 } 344 345 private void initView() { 346 btnBackpressure = (Button) findViewById(R.id.btn_backpressure); 347 btnBackpressure.setOnClickListener(RxJavaDemo3Activity.this); 348 } 349 }
标签:subscript 联系 bool add override 专用 时间间隔 生产者和消费者 iat
原文地址:http://www.cnblogs.com/SongYongQian/p/7978653.html