标签:rocketmq 适配 代码 溢出 筛选 生产者 when flow card
为什么要响应式编程?
传统的Java服务器编程遵循的是J2EE的Servlet规范,是一种基于线程的模型:每一次http请求都由一个线程来处理。
线程模型的缺陷在于,每一条线程都要自行处理套接字的读写操作。对于大部分请求来讲,本地处理请求的速度很快,请求的读取和返回是最耗时间的。也就是说大量的线程浪费在了远程连接上,而没有发挥出计算能力。但是需要注意一点,线程的创建是有开销的,每一条线程都需要独立的内存资源。JVM里的-Xss参数就是用来调整线程堆栈大小的。而JVM堆的总大小局限在了-Xmx参数上,因此一个正在运行的JVM服务器能够同时运行的线程数是固定的。
即便通过调整JVM参数,使其能够运行更多线程。但是JVM的线程会映射成为操作系统的用户线程,而操作系统依然只能调度有限数量的线程。例如,Linux系统可以参考这里的讨论:Maximum number of threads per process in Linux?。
此外,大量线程在切换的时候,也会产生上下文加载卸载的开销,同样会降低系统的性能。
Doug Lea大神有一篇很经典的PPTScalable IO in Java讲述了一个更为优秀的服务器模型。
一个可伸缩的网络服务系统应当满足以下条件:
- 能够随着计算资源(CPU、内存、磁盘容量、网络带宽等)的增加提高负载能力。
- 当网络负载增加超过能力的时候,能够优雅降级,避免直接崩溃。例如,拒绝为超过能力范围的请求提供服务,但对于能力范围内的请求,依然提供服务。当流量洪峰过去之后,依然能够正常运行。
- 当然高可用、高性能依然是必须的:例如低响应延迟、随负载变化请求或释放计算资源等。
作者给出的解决方案就是Reactor模式。
Reactor模式将耗时的IO资源封装为handle对象。handle对象注册在操作系统的内核里,当对象满足一定的条件时(可读或者可写),才会处理handle对象。在Reactor模式中,同步多路复用器负责处理handle对象的状态变更,当满足条件时,会调用handle对象注册时提供的回调函数。
同步多路复用器在一个单独的线程里专门处理IO链接。当请求读取完毕之后,任务提交至工作线程池完成请求的解码、处理、编码等工作,最后将由多路复用器负责将结果返回给客户端,而池内线程继续处理下一个任务。相比JDK1.5之前的对每一次请求新建一个线程的方式,线程池能够实现线程复用,降低创建回收线程的开销,在应对密集计算负载的时候有更好的表现。同时,在多个线程上分别部署一个同步多路复用器,也可以更好地利用多核CPU的处理能力。
这样,线程的任务分工就很明确,分别专门处理IO密集任务和专门处理CPU密集任务。
从最早的select到后来Linux的epoll和BSD的Kqueue,操作系统的多路复用性能一直在不断增强。
JDK 1.4引入了NIO模块,可以屏蔽了操作系统层面的细节,将各个系统的多路复用API做了同一封装。JDK的NIO有以下几个核心组件:
在JVM之外的世界里,多路复用通过Nginx、基于V8引擎的Node.js早就大放异彩。但是Java NIO在生产环境里的发展却很慢。例如,Tomcat直到2016年发布8.5版本的时候,才彻底移除BIO连接器,完全拥抱NIO。
JDK NIO主要有这样几个问题比较麻烦:
作为一个第三方框架,Netty做到了JDK本应做到的事情。
Netty的数据容器ByteBuf更为优秀。
ByteBuf同时维护两个索引:读索引和写索引。从而保证容器对象能够同时适配读写同时进行的场景。而NIO的Buffer却需要执行一次flip操作来适应读写场景的切换。同时ByteBuf容器使用引用计数来手工管理,可以在引用计数归零时通过反射调用jdk.internal.ref.Cleaner来回收内存,避免泄露。在GC低效的时候,选择使用手工方式来管理内存,完全没问题。
Netty的API封装度更高。
观察一下Netty官网Tutorial给出的demo,只要几十行代码就完成了一个具备Reactor模式的服务器。ServerBootstrap的group方法定义了主套接字和子套接字的处理方式,例中使用的NioEventLoopGroup类为Java NIO + 多线程的实现方式。对于NIO的epoll bug,NioEventLoopGroup的解决方案是rebuildSelectors对象方法。这个方法允许在selector失效时重建新的selector,将旧的释放掉。此外,Netty还通过JNI实现了自己的EpollEventLoopGroup,规避了NIO版本的bug。
Netty使用责任链模式实现了对server进出站消息的处理,使得server的代码能够更好的扩展和维护。
Netty在生产领域得到大量应用,Hadoop Avro、Dubbo、RocketMQ、Undertow等广泛应用于生产领域的产品的通信组件都选择了Netty作为基础,并经受住了考验。
Netty是一个优秀的异步通信框架,但是主要应用在基础组件中。因为Netty向开发者暴露出大量的细节,对于业务系统的开发仍然形成了困扰,所以没法得到进一步的普及。
举个例子。Netty使用ChannelFuture来接收传入的请求。相比于其继承的java.util.concurrent.Future来说,ChannelFuture可以添加一组GenericFutureListener来管理对象状态,避免了反复对Future对象状态的询问。这是个进步。但是,这些Listener都带来了另一个问题——Callback hell。而嵌套的回调代码往往难以维护。
Netty做一个优秀的基础组件就很好了。业务层面的问题就让我们用业务层面的API来解决。
在JDK7以及之前,Java多线程的编程工具主要就是Thread、ExecutorService、Future以及相关的同步工具,实现出来的代码较为繁琐、且性能不高。
举个例子A,考虑一个场景有X、P、Q三个逻辑需要执行,其中X的执行需要在P、Q一起完成之后才启动执行。
如果使用Thread,那么代码会是这个样子:
/* 创建线程 */
Thread a = new Thread(new Runnable() {
@Override
public void run() {
/* P逻辑 */
}
});
Thread b = new Thread(new Runnable() {
@Override
public void run() {
/* Q逻辑 */
}
});
/* 启动线程 */
a.start();
b.start();
/* 等候a、b线程执行结束 */
try {
a.join();
b.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
/* 启动X逻辑的执行 */
Thread c = new Thread(new Runnable() {
@Override
public void run() {
/* X逻辑 */
}
});
c.start();
...
上面这个代码,先不论线程创建的开销,单从形式上看,线程内部的执行逻辑、线程本身的调度逻辑,还有必须捕获的InterruptedException的异常处理逻辑混杂在一起,整体很混乱。假想一下,当业务逻辑填充在其中的时候,代码更难维护。
ThreadPoolExecutor和Future有助于实现线程复用,但对于代码逻辑的规范没什么帮助。
ExecutorService pool = Executors.newCachedThreadPool();
Future<?> a = pool.submit(new Runnable() {
@Override
public void run() {
/* P逻辑 */
}
});
Future<?> b = pool.submit(new Runnable() {
@Override
public void run() {
/* Q逻辑 */
}
});
/* 获取线程执行结果
* 依然要捕获异常,处理逻辑
*/
try {
a.get();
b.get();
Future<?> c = pool.submit(new Runnable() {
@Override
public void run() {
/* X逻辑 */
}
});
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
JDK8借鉴了相当多的函数式编程的特点,提供了几样很称手的工具。
如果要用CompleteableFuture实现上一个例子,可以这样写。
CompletableFuture<?> a = CompletableFuture.runAsync(() -> {
/* P逻辑 */
}).exceptionally(ex -> {
/* 异常处理逻辑 */
return ...;
});
CompletableFuture<?> b = CompletableFuture.runAsync(() -> {
/* Q逻辑 */
});
CompletableFuture<?> c = CompletableFuture.allOf(a, b).thenRun(() -> {
/* X逻辑 */
});
有了lambda表达式的加持,例中的代码整体以线程内部逻辑为主,调度逻辑通过allOf()、thenRun()等方法名直观地展示出来。特别是可选的异常捕获逻辑,更是使得代码可读性得到了极大的提高。
需要注意的是,CompletableFuture是可以使用指定ExecutorService来执行的。如果像上例那样没有指定ExecutorService对象,那么会默认使用ForkJoinPool里的静态对象commonPool来执行。而ForkJoinPool.commonPool作为一个JVM实例中唯一的对象,也是Stream并发流的执行器,因此应当尽量保证CompletableFuture里的逻辑不会阻塞线程。如果无法规避,可以使用ManagedBlocker来降低影响。
ForkJoinPool是JDK1.7提供的并发线程池,可以很好地应对计算密集型并发任务,特别适用于可以“分-治”的任务。传统的ThreadPoolExecutor需要指定线程池里的线程数量,而ForkJoinPool使用了一个相似但更有弹性的概念——“并发度”。并发度指的是池内的活跃线程数。对于可能的阻塞任务,ForkJoinPool设计了一个ManagedBlocker接口。当池内线程执行到ForkJoinPool.managedBlock(ForkJoinPool.ManagedBlocker blocker)
方法时,线程池会新建一个线程去执行队列里的其他任务,并轮询该对象的isReleasable方法,决定是否恢复线程继续运行。JDK1.7里的Phaser类源码用到了这个方法。
关于CompleteableFuture的用法,推荐看看这篇博客:理解CompletableFuture,总结的很好。
而对于ForkJoinPool,可以看看这篇博客:Java 并发编程笔记:如何使用 ForkJoinPool 以及原理。
Stream流也是JDK8引入的一个很好的编程工具。
Stream对象通常通过Iterator、Collection来构造。也可以用StreamSupport的stream静态方法来创建自定义行为的实例。
Stream流对象采用链式编程风格,可以制定一系列对流的定制行为,例如过滤、排序、转化、迭代,最后产生结果。看个例子。
List<Integer> intList = List.of(1, 2, 3);
List<String> strList = intList.stream()
.filter(k -> k>1)
.map(String::valueOf)
.collect(Collectors.toList());
上面这段代码中,intList通过stream方法获取到流对象,然后筛选出大于1的元素,并通过String的valueOf静态方法生成String对象,最后将各个String对象收集为一个列表strList。就像CompletableFuture的方法名一样,Stream的方法名都是自描述的,使得代码可读性极佳。
除此之外,Stream流的计算还是惰性的。Stream流对象的方法大致分为两种:
只有在执行终结方法的时候,流的计算才会真正执行。之前的中间方法,都作为步骤记录下来,但没有实时地执行修改操作。
如果将例子里的stream方法修改为parallelStream,那么得到的流对象就是一个并发流,而且总在ForkJoinPool.commonPool中执行。
关于Stream,极力推荐Brian Goetz大神的系列文章Java Streams。
ForkJoinPool是一款强大的线程池组件,只要使用的得当,线程池总会保持一个合理的并发度,充分利用计算资源。
但是,CompleteableFuture也好,Stream也好,他们都存在一个相同的问题:无法通过后端线程池的负载变化,来调整前端的调用压力。打比方说,当后端的ForkJoinPool.commonPool在全力运算而且队列里有大量的任务排队时,新提交的任务很可能会有很高的响应延迟,但是前端的CompleteableFuture或者Stream没有途径去获取这样一个状态,来延缓任务的提交。这种情况就违背了“响应式系统”的“灵敏性”要求。
Reactive Streams是一套标准,定义了一个运行于JVM平台上的响应式编程框架实现所应该具备的行
为。
Reactive Streams规范衍生自“观察者模式”。将前后依赖的逻辑流,拆解为事件和订阅者。只有当事件发生变更时,感兴趣的观察者才随之执行随后的逻辑。所以说,Reactive Stream和JDK的Stream的理念有点接近,两者都是注重对数据流的控制。将紧耦合的逻辑流拆分为“订阅-发布”方式其实是一大进步。代码变得维护性更强,而且很容易随着业务的需要使用消息驱动模式拆解。
Reactive Streams规范定义了四种接口:
当Subscriber调用Publisher.subscribe方法订阅消息时,Publisher就会调用Subscriber的onSubscribe方法,回传一个Subscription菜单。
Subscription菜单包含两个选择:
一个Subscription对象只能由同一个Subscriber调用,所以不存在对象共享的问题。因此即便Subscription对象有状态,也不会危及逻辑链路的线程安全。
订阅者Subscriber还需要定义三种行为:
相比于Future、Thread那样将业务逻辑和异常处理逻辑混杂在一起,Subscriber将其分别定义在三个方法里,代码显得更为清晰。java.util.Observer(在JDK9中开始废弃)只定义了update方法,相当于这里的onNext方法,缺少对流整体的管理和对异常的处理。特别是,异常如果随着调用链传递出去,调试定位会非常麻烦。因此要重视onError方法,尽可能在订阅者内部就处理这个异常。
尽管Reactive Streams规范和Stream都关注数据流,但两者有一个显著的区别。那就是Stream是基于生产一方的,生产者有多大能力,Stream就制造多少数据流。而Reactive Streams规范是基于消费者的。逻辑链下游可以通过对request方法参数的变更,通知上游调整生产数据流的速度。从而实现了“响应式系统”的“灵敏性”要求。这在响应式编程中,用术语“背压”来描述。
Reactive Streams规范仅仅是一个标准,其实现又依赖其他组织的成果。其意义在于各家实现能够通过这样一个统一的接口,相互调用,有助于响应式框架生态的良性发展。Reactive Streams规范虽然是Netflix、Pivatol、RedHat等第三方大厂合作推出的,但已经随着JDK9的发布收编为官方API,位于java.util.concurrent.Flow之内。JDK8也可以在项目中直接集成相应的模块调用。
顺便吐槽一下,JDK9官方文档给出的demo里的数据流居然从Subscription里生产出来,吓得我反复确认了一下Reactive Streams官方规范。
RxJava由Netfilx维护,实现了ReactiveX API规范。该规范有很多语言实现,生态很丰富。
Rx范式最先是微软在.NET平台上实现的。2014年11月,Netfilx将Rx移植到JVM平台,发布了1.0稳定版本。而Reactive Streams规范是在2015年首次发布,2017年才形成稳定版本。所以RxJava 1.x和Reactive Streams规范有很大出入。1.x版本迭代至2018年3月的1.3.8版本时,宣布停止维护。
Netflix在2016年11月发布2.0.1稳定版本,实现了和Reactive Streams规范的兼容。2.x如今是官方的推荐版本。
RxJava框架里主要有这些概念:
RxJava 2.x在Zuul 2、Hystrix、Jersey等项目都有使用,在生产领域已经得到了应用。
Reactor3有Pivotal来开发维护,也就是Spring的同门师弟。
整体上,Reactor3框架里的概念和RxJava都是类似的。Mono和Flux都等同于RxJava的Single和Observable。Reactor3也使用自描述的操作符函数实现链式编程。
RxJava 2.x支持JVM 6+平台,对老旧项目很友好;而Reactor3要求必须是JVM8+。所以说,如果是新项目,使用Reactor3更好,因为它使用了很多新的API,支撑很多函数式接口,代码可读性维护性都更好。
背靠Spring大树,Reactor3的设计目标是服务器端的Java项目。Reactor社区针对服务器端,不断推出新产品,例如Reactor Netty、Reactor Kafka等等。但如果是Android项目,RxJava2更为合适(来自Reactor3官方文档的建议)。
老实讲,Reactor3的文档内容更丰富。
响应式宣言里面说的很清楚,一个响应式系统应当是:
Vert.X目前由Eclipse基金会维护,打造了一整套响应式Web系统开发环境,包括数据库管理、消息队列、微服务、权限认证、集群管理器、Devops等等,生态很丰富。
Vert.X Core框架基于Netty开发,是一种事件驱动框架:每当事件可行时都会调用其对应的handler。在Vert.X里,有专门的线程负责调用handler,被称作eventloop。每一个Vert.X实例都维护了多个eventloop。
Vert.X Core框架有两个重要的概念:Verticle和Event Bus。
Verticle类似于Actor模型的Actor角色。
Actor是什么?
这里泛泛的说一下吧。
Actor模型主要针对于分布式计算系统。Actor是其中最基本的计算单元。每一个Actor都有一个私有的消息队列。Actor之间的通信依靠发送消息。每一个Actor都可以并发地做到:
- 向其他Actor发送消息
- 创建新的Actor
- 指定当接收到下一个消息时的行为
Verticle之间的消息传递依赖于下面要说的Event Bus。
Vert.X为Verticle的部署提供了高可用特性:在Vert.X集群中,如果一个节点的上运行的Veticle实例失效,其他节点就会重新部署一份新的Verticle实例。
Verticle只是Vert.X提供的一种方案,并非强制使用。
Event Bus是Vert.X框架的中枢系统,能够实现系统中各组件的消息传递和handler的注册与注销。其消息传递既支持“订阅-发布”模式,也支持“请求-响应”模式。
当多个Vert.X实例组成集群的时候,各系统的Event Bus能够组成一个统一的分布式Event Bus。各Event Bus节点相互之间通过TCP协议通信,没有依赖Cluster Manager。这是一种可以实现节点发现,提供了分布式基础组件(锁、计数器、map)等的组件。
Spring5的亮点之一就是响应式框架Spring WebFlux,使用自家的Reactor3开发,但同样支持RxJava。
Spring WebFlux的默认服务端容器是Reactor Netty,也可以使用Undertow或者Tomcat、Jetty的实现了Servlet 3.1 非阻塞API接口的版本。Spring WebFlux分别为这些容器实现了与Reactive Streams规范实现框架交互的适配器(Adapter),没有向用户层暴露Servlet API。
Spring WebFlux的注解方式和Spring MVC很像。这有助于开发团队快速适应新框架。而且Spring WebFlux兼容Tomcat、Jetty,有助于项目运维工作的稳定性。
但如果是新的项目、新的团队,给我,我大概会选Vert.X,因为Event Bus确实很吸引人。
标签:rocketmq 适配 代码 溢出 筛选 生产者 when flow card
原文地址:https://www.cnblogs.com/rim99/p/9026500.html