标签:人工智能 例子 cal follow server rac hub 消费 n+1
英文原文:https://github.com/ReactiveX/RxJava/wiki/Backpressure
在rxjava中会经常遇到一种情况就是被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息。那么随之而来的就是如何处理这些未处理的消息。
举个例子,使用zip
操作符将两个无限大的Observable压缩在一起,其中一个被观察者发送消息的速度是另一个的两倍。一个比较不靠谱的做法就是把发送比较快的消息缓存起来,当比较慢的Observable发送消息的时候取出来并将他们结合在一起。这样做就使得rxjava变得笨重而且十分占用系统资源。
在rxjava中有多重控制流以及背压(backpressure)策略用来应对当一个快速发送消息的被观察者遇到一个处理消息缓慢的观察者。下面的解释将会向你展示你应当怎么设计属于你自己的被观察者和操作符去应对流量控制(flow control)。
Observable 数据流有两种类型:hot 和 cold。这两种类型有很大的不同。本节介绍他们的区别,以及作为 Rx 开发者应该如何正确的使用他们。
只有当有订阅者订阅的时候, Cold Observable 才开始执行发射数据流的代码。并且每个订阅者订阅的时候都独立的执行一遍数据流代码。 Observable.interval 就是一个 Cold Observable。每一个订阅者都会独立的收到他们的数据流。
我们经常用到的Observable.create 就是 Cold Observable,而 just, range, timer 和 from 这些创建的同样是 Cold Observable。
Hot observable 不管有没有订阅者订阅,他们创建后就开发发射数据流。 一个比较好的示例就是 鼠标事件。 不管系统有没有订阅者监听鼠标事件,鼠标事件一直在发生,当有订阅者订阅后,从订阅后的事件开始发送给这个订阅者,之前的事件这个订阅者是接受不到的;如果订阅者取消订阅了,鼠标事件依然继续发射。
了解更多Hot and cold Observables,参考:
http://blog.csdn.net/jdsjlzx/article/details/51839090
当一个cold observable是multicast(多路广播)(当转换完成时或者方法被调用)的时候,为了应对背压,应当把cold observable转换成hot observable。
cold observable 相当于响应式拉(就是observer处理完了一个事件就从observable拉取下一个事件),hot observable通常不能很好的处理响应式拉模型,但它却是处理流量控制问题的不二候选人,例如使用onBackpressureBuffer或者onBackpressureDrop 操作符,和其他操作符比如operators, throttling, buffers, or windows
.
此段过于抽象,特提供原文如下,如有好的翻译建议请提出。
Cold Observables are ideal for the reactive pull model of backpressure described below. Hot Observables typically do not cope well with a reactive pull model, and are better candidates for some of the other flow control strategies discussed on this page, such as the use of the onBackpressureBuffer or onBackpressureDrop operators, throttling, buffers, or windows.
防止过度创建observable的第一道防线就是使用普通数组去减少observable发送消息的数量,在这一节会使用一些操作符去应对突发的observable发送爆发性数据(一会没有,一会很多)就像下面的这张图片所示:
这些操作符可以通过微调参数确保slow-consuming观察者不被生产可观测的。
操作符中比如 sample(?) 、 throttleLast(?)、 throttleFirst(?)、 throttleWithTimeout(?) 、 debounce(?) 允许你通过调节速率来改变Observable发射消息的速度。
以下图表展示如何使用这些操作符。
sample
操作符定期收集observable发送的数据items,并发射出最后一个数据item。
Observable<Integer> burstySampled = bursty.sample(500, TimeUnit.MILLISECONDS);
上面代码解释,定期且一次收集5个item,发射出最后一个item。
官网解释:http://reactivex.io/documentation/operators/sample.html
跟sample有点类似,但是并不是把观测到的最后一个item发送出去,而是把该时间段第一个item发送出去。
Observable<Integer> burstyThrottled = bursty.throttleFirst(500, TimeUnit.MILLISECONDS);
debounce操作符会只发送两个在规定间隔内的时间发送的序列的最后一个。
Observable<Integer> burstyDebounced = bursty.debounce(10, TimeUnit.MILLISECONDS);
可以使用操作符比如buffer(?) 或者window(?) 收集过度生成消息的Observable的数据items,然后发射出较少使用的数据。缓慢的消费者可以决定是否处理每个集合中的某一个特定的项目,或处理集合中的某种组合,或为集合中的每一项预定计划工作,这都要视情况处理。
以下图表展示如何使用这些操作符。
你可以定期关闭并释放突发性的 Observable 缓冲区。
Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
在突发期间你可以得到的想要的,并在缓冲区收集数据和最终在突发结束的时候释放缓存。使用debounce操作符释放缓存并关闭指示器buffer操作符。
此段超过本人翻译水平,特提供原文如下,如有好的翻译建议请提出。
Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the debounce operator to emit a buffer closing indicator to the buffer operator:
处理过快生产item的其他策略就是使用线程阻塞,但是这么做违背了响应式设计和非阻塞模型设计,但是它的确是一个可行的选择。在rxJava中并没有操作符可以做到这一点。
如果observable发送消息,subscriber消耗消息都是在同一个线程这将很好的处理这个问题,但是你要知道,在rxJava中,很多时候生产者和消费者都不在同一个线程。
当subscribe订阅observable的时候可以通过调用subscribe.request(n),n是你想要的observable发送出来的量。
当在onNext()方法里处理完数据itme后,你能重新调用 request()方法,通知Observable发射数据items。下面是个例子。
someObservable.subscribe(new Subscriber<t>() {
@Override
public void onStart() {
request(1);
}
@Override
public void onCompleted() {
// gracefully handle sequence-complete
}
@Override
public void onError(Throwable e) {
// gracefully handle error
}
@Override
public void onNext(t n) {
// do something with the emitted item "n"
// request another item:
request(1);
}
});
你可以通过一个神奇数字request, request(Long.MAX_VALUE),禁用反应拉背力和要求Observable按照自己的步伐发射数据。request(0)是一个合法的调用,但没有奏效。请求值小于零的请求会导致抛出一个异常。
backpressure 不会使得过度生产的observable的问题消失,这只是提供了一种更好的解决问题的方法。 让我们更仔细的研究刚刚说到的zip操作符的问题。
这里有两个observable,a和b,b发射item比a更加的频繁,当你想zip这两个observable的时候,你需要把a发送出来的第n个和b发送出来的第n个对象处理,然而由于b发送出来的速率更快,这时候b已经发送出了n+1~n+m个消息了,这时候你要想要把a的n+1~n+m个消息结合的话,就必须持有b已经发送出来的n+1~n+m消息,同时,这意味着缓存的数量在不断的增长。
当然你可以给b添加操作符throttling,但是这意味着你将丢失某些从b发送出来的项,你真正想要做的其实就是告诉b:“b你需要慢下来,但是你要保持你给我的数据是完整的”。
响应式拉(reective pull)模型可以当你做到这一点,subscriber从observable那里拉取数据,这比较通常在observable那里推送数据这种模式形成鲜明的对比。
在rxJava中,zip操作符正是使用了这种技巧。它给每个源observable维护了一个小的缓存池,当它的缓存池满了以后,它将不会从源observable那里拉取item。每当zip发送一个item的时候,他从它的缓存池里面移除相应的项,并从源observable那里拉取下一个项。
在rxJava中,很多操作符都使用了这种模式(响应式拉),但是有的操作符并没有使用这种模式,因为他们也许执行的操作跟源observable处于相同的进程。在这种情况下,由于消耗事件会阻塞本进程,所以这一项的工作完成后,才有机会收到下一项。还有另外一种情况,backpressure也是不适合的,因为他们有指定的其他方式去处理流量控制,这些特殊的情况在rxJava的java文档里面都会有详细说明为毛。
但是,observable a和b必须正确的响应request()方法,如果一个observable还没有被支持响应式拉(并不是每个observable都会支持),你可以采取以下其中一种操作都可以达到backpressure的行为:
给observable发送出来的数据持有一个缓存,当request方法被调用的时候,给下层流发送一个item。
这个操作符还有一个实验性的版本允许去设置这个缓存池的大小,但当缓存池满了以后将会终止执行并抛出异常。
命令observable丢弃后来的事件,直到subscriber再次调用request(n)方法的时候,就发送给它的subscriber调用时间以后的n个事件。
源Observable的线程操作直到Subscriber发出请求,然后只要有挂起的请求就结束线程。
如果你不允许这些操作符操作不支持背压的Observable,或者Subscriber或一些操作符尝试申请活性拉反压力,你会遇到一个MissingBackpressureException,你将被告知通过onError()进行回调。
最后,为了大家更好的理解backpressure概念,这里补充说一下Flowable。
Observable在RxJava2.0中新的实现叫做Flowable, 同时旧的Observable也保留了。因为在 RxJava1.x 中,有很多事件不被能正确的背压,从而抛出MissingBackpressureException。
举个简单的例子,在 RxJava1.x 中的 observeOn, 因为是切换了消费者的线程,因此内部实现用队列存储事件。在 Android 中默认的 buffersize 大小是16,因此当消费比生产慢时, 队列中的数目积累到超过16个,就会抛出MissingBackpressureException, 初学者很难明白为什么会这样,使得学习曲线异常得陡峭。
而在2.0 中,Observable 不再支持背压,而Flowable 支持非阻塞式的背压。Flowable是RxJava2.0中专门用于应对背压(Backpressure)问题。所谓背压,即生产者的速度大于消费者的速度带来的问题,比如在Android中常见的点击事件,点击过快则经常会造成点击两次的效果。其中,Flowable默认队列大小为128。并且规范要求,所有的操作符强制支持背压。幸运的是, Flowable 中的操作符大多与旧有的 Observable 类似。
再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!https://blog.csdn.net/jiangjunshow
RxJava 2 0中backpressure 背压 概念的理解
标签:人工智能 例子 cal follow server rac hub 消费 n+1
原文地址:https://www.cnblogs.com/skiwnchhw/p/10349260.html