标签:缓冲 一个 info extends ace 接受 tst mis 需要
在学习webflux 之前我们先要了解一个概念
对我们以前使用的发布订阅模式来说.我们的以前的模式是 消费只能通过提供者提供的数据来持续消费 如果一直发送消息,那么就只能一直消费
我们对背压做一个详细的比喻
比如我们每家每户,都有一个水龙头.自来水公司相当于发布者者,我们家庭就是个订阅者,水就是数据,在之前的模式我们的订阅者是一个被动接受的概念
背压就是相当于我们家里安装了一个水龙头,我们需要的时候就把他打开.不需要的时候就把他关闭
响应式流.这是jdk9 引入的一套标准,他能够很好的实现背压,但是我去官网的时候,发现jdk9已经结束.我们看看jdk11吧
jdk11有一个flow接口 里面有4个方法
1.publisher 就是发布者
subscribe:就是和订阅者产生一个关系
2.subscribe 就是订阅者
onSubscribe:签署一个订阅关系传入subscription
onNext(): 接受到一条数据
onError(): 就是出错
onComplete(): 完成
3.Subscription接口中就是其中实现背压的关键 其中request方法就是告诉发布者我需要多少资源,发布者那里 就会发布多少资源
4.Processor 既可以做发布者,也可以做订阅者,具体是用来中间环节的数据处理工作
简单的例子我们来运行下
每次处理完之后告诉发布者我还可以处理的数据是多少
public static void main(String[] args) throws InterruptedException { //1.定义发布者 SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(); //2. 定义订阅者 Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>() { private Flow.Subscription subscription; int total = 0; @Override public void onSubscribe(Flow.Subscription subscription) { //保存订阅关系 this.subscription = subscription; //请求一个数据 subscription.request(1); } @Override public void onNext(Integer item) { System.out.println("接受到: "+ item); total++; System.out.println("接受的条数为 : "+ total); this.subscription.request(1); //或者到达一定数量告诉发布者不接受数据了 if(total ==10){ this.subscription.cancel(); System.out.println("接受数据已经足够"); } } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); //抛出异常就返回 this.subscription.cancel(); } @Override public void onComplete() { System.out.println("数据处理完了."); } }; //3发布和订阅 建立订阅关系 publisher.subscribe(subscriber); //4.生产数据 for (int i = 0; i < 100; i++) { publisher.submit(i); } //5.关闭发布者 publisher.close(); Thread.currentThread().join(5000); }
public static void main(String[] args) throws InterruptedException { //1.定义发布者 SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(); //2.定义一个处理器 对数据进行过滤,并转化为string的类型 MyProcessor myProcessor = new MyProcessor(); //3.发布者与处理器建立关系 publisher.subscribe(myProcessor); //4. 定义最终订阅者 Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { //保存订阅关系 this.subscription = subscription; //请求一个数据 subscription.request(1); } LinkedList<String > list = new LinkedList<>(); @Override public void onNext(String item) { list.add(item); this.subscription.request(1); //或者到达一定数量告诉发布者不接受数据了 System.out.println(item); if(list.size() == 10){ this.subscription.cancel(); System.out.println("接受数据已经足够"); this.onComplete(); } } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); //抛出异常就返回 this.subscription.cancel(); } @Override public void onComplete() { System.out.println("数据处理完了."+list.toString()); } }; //5 处理器和最终的订阅者建立关系 myProcessor.subscribe(subscriber); //4.生产数据 for (int i = 0; i < 100; i++) { publisher.submit(i); } //5.关闭发布者 publisher.close(); Thread.currentThread().join(5000); } static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer,String> { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(Integer item) { // System.out.println("processor-> 处理器接收到的数据.."+item); if(item % 2 ==0){ this.submit("转->" +item); } this.subscription.request(1); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); this.subscription.cancel(); } @Override public void onComplete() { System.out.println("processor 处理器已经处理完成!"); } }
里面的运行机制
publiser.submit():是一个阻塞方法
订阅者有一个缓冲池.当缓冲池满了之后 submit()方法就会被阻塞.这样就不会再去生产数据了
subscription 缓冲的capacity默认是256个.
标签:缓冲 一个 info extends ace 接受 tst mis 需要
原文地址:https://www.cnblogs.com/bj-xiaodao/p/11044732.html