标签:star abstract protected 结束 pool 优雅 list ext 数据封装
Thrift
提供的网络服务模型:单线程、多线程、事件驱动,从另一个角度划分为:阻塞服务模型、非阻塞服务模型。
阻塞服务模型:TSimpleServer
、TThreadPoolServer
。
非阻塞服务模型:TNonblockingServer
、THsHaServer
和TThreadedSelectorServer
。
TServer
类的层次关系:
TServer
定义了静态内部类Args
,Args
继承自抽象类AbstractServerArgs
。AbstractServerArgs
采用了建造者模式,向TServer
提供各种工厂:
工厂属性 | 工厂类型 | 作用 |
---|---|---|
ProcessorFactory | TProcessorFactory | 处理层工厂类,用于具体的TProcessor对象的创建 |
InputTransportFactory | TTransportFactory | 传输层输入工厂类,用于具体的TTransport对象的创建 |
OutputTransportFactory | TTransportFactory | 传输层输出工厂类,用于具体的TTransport对象的创建 |
InputProtocolFactory | TProtocolFactory | 协议层输入工厂类,用于具体的TProtocol对象的创建 |
OutputProtocolFactory | TProtocolFactory | 协议层输出工厂类,用于具体的TProtocol对象的创建 |
下面是TServer
的部分核心代码:
TServer
的三个方法:serve()
、stop()
和isServing()
。serve()
用于启动服务,stop()
用于关闭服务,isServing()
用于检测服务的起停状态。
TServer
的不同实现类的启动方式不一样,因此serve()
定义为抽象方法。不是所有的服务都需要优雅的退出, 因此stop()
方法没有被定义为抽象。
TSimpleServer
的工作模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,但是一次只能接收和处理一个socket
连接,效率比较低。它主要用于演示Thrift
的工作过程,在实际开发过程中很少用到它。
服务端:
客户端:
查看上述流程的源代码,即TSimpleServer.java
中的serve()
方法如下:
serve()
方法的操作:
TServerSocket
的listen()
方法启动连接监听。TTransport
对象。TServerEventHandler
对象处理具体的业务请求。TThreadPoolServer
模式采用阻塞socket
方式工作,主线程负责阻塞式监听是否有新socket
到来,具体的业务处理交由一个线程池来处理。
服务端:
客户端:
ThreadPoolServer
解决了TSimpleServer
不支持并发和多连接的问题,引入了线程池。实现的模型是One Thread Per Connection
。查看上述流程的源代码,先查看线程池的代码片段:
TThreadPoolServer.java
中的serve()
方法如下:
serve()
方法的操作:
TServerSocket
的listen()
方法启动连接监听。WorkerProcess
对象(WorkerProcess
实现了Runnabel
接口),并提交到线程池。WorkerProcess
的run()
方法负责业务处理,为客户端创建了处理器对象、输入传输通道对象、输出传输通道对象、输入协议对象和输出协议对象。TServerEventHandler
对象处理具体的业务请求。WorkerProcess
的run()
方法:
拆分了监听线程(Accept Thread
)和处理客户端连接的工作线程(Worker Thread
),数据读取和业务处理都交给线程池处理。因此在并发量较大时新连接也能够被及时接受。
线程池模式比较适合服务器端能预知最多有多少个客户端并发的情况,这时每个请求都能被业务线程池及时处理,性能也非常高。
线程池模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待。
TNonblockingServer
模式也是单线程工作,但是采用NIO
的模式,借助Channel/Selector
机制, 采用IO
事件模型来处理。
所有的socket
都被注册到selector
中,在一个线程中通过seletor
循环监控所有的socket
。
每次selector
循环结束时,处理所有的处于就绪状态的socket
,对于有数据到来的socket
进行数据读取操作,对于有数据发送的socket则进行数据发送操作,对于监听socket
则产生一个新业务socket
并将其注册到selector
上。
注意:TNonblockingServer要求底层的传输通道必须使用TFramedTransport。
服务端:
客户端:
TNonblockingServer
继承于AbstractNonblockingServer
,这里我们更关心基于NIO
的selector
部分的关键代码。
相比于TSimpleServer
效率提升主要体现在IO
多路复用上,TNonblockingServer
采用非阻塞IO
,对accept/read/write
等IO
事件进行监控和处理,同时监控多个socket
的状态变化。
TNonblockingServer
模式在业务处理上还是采用单线程顺序来完成。在业务处理比较复杂、耗时的时候,例如某些接口函数需要读取数据库执行时间较长,会导致整个服务被阻塞住,此时该模式效率也不高,因为多个调用请求任务依然是顺序一个接一个执行。
鉴于TNonblockingServer
的缺点,THsHaServer
继承于TNonblockingServer
,引入了线程池提高了任务处理的并发能力。THsHaServer
是半同步半异步(Half-Sync/Half-Async
)的处理模式,Half-Aysnc
用于IO
事件处理(Accept/Read/Write
),Half-Sync
用于业务handler
对rpc
的同步处理上。
注意:THsHaServer和TNonblockingServer一样,要求底层的传输通道必须使用TFramedTransport。
服务端:
客户端:
THsHaServer
继承于TNonblockingServer
,新增了线程池并发处理工作任务的功能,查看线程池的相关代码:
任务线程池的创建过程:
下文的TThreadedSelectorServer囊括了THsHaServer的大部分特性,源码分析可参考TThreadedSelectorServer。
THsHaServer
与TNonblockingServer
模式相比,THsHaServer
在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升。
主线程仍然需要完成所有socket
的监听接收、数据读取和数据写入操作。当并发请求数较大时,且发送数据量较多时,监听socket
上新连接请求不能被及时接受。
TThreadedSelectorServer
是对THsHaServer
的一种扩充,它将selector
中的读写IO
事件(read/write
)从主线程中分离出来。同时引入worker
工作线程池,它也是种Half-Sync/Half-Async
的服务模型。
TThreadedSelectorServer
模式是目前Thrift
提供的最高级的线程服务模型,它内部有如果几个部分构成:
AcceptThread
线程对象,专门用于处理监听socket
上的新连接。SelectorThread
对象专门用于处理业务socket
的网络I/O
读写操作,所有网络数据的读写均是有这些线程来完成。SelectorThreadLoadBalancer
对象,主要用于AcceptThread
线程接收到一个新socket
连接请求时,决定将这个新连接请求分配给哪个SelectorThread
线程。ExecutorService
类型的工作线程池,在SelectorThread
线程中,监听到有业务socket
中有调用请求过来,则将请求数据读取之后,交给ExecutorService
线程池中的线程完成此次调用的具体执行。主要用于处理每个rpc
请求的handler
回调处理(这部分是同步的)。
服务端:
客户端:
以上工作流程的三个组件AcceptThread
、SelectorThread
和ExecutorService
在源码中的定义如下:
TThreadedSelectorServer
模式中有一个专门的线程AcceptThread
用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络I/O操作分散到多个SelectorThread
线程中来完成,因此能够快速对网络I/O
进行读写操作,能够很好地应对网络I/O
较多的情况。
TThreadedSelectorServer
默认参数定义如下:
创建、初始化并启动AcceptThread
和SelectorThreads
,同时启动selector
线程的负载均衡器(selectorThreads
)。
AcceptThread
继承于Thread
,可以看出包含三个重要的属性:非阻塞式传输通道(TNonblockingServerTransport
)、NIO
选择器(acceptSelector
)和选择器线程负载均衡器(threadChooser
)。
查看AcceptThread
的run()
方法,可以看出accept
线程一旦启动,就会不停地调用select()
方法:
查看select()
方法,acceptSelector
选择器等待IO
事件的到来,拿到SelectionKey
即检查是不是accept
事件。如果是,通过handleAccept()
方法接收一个新来的连接;否则,如果是IO
读写事件,AcceptThread
不作任何处理,交由SelectorThread
完成。
在handleAccept()
方法中,先通过doAccept()
去拿连接通道,然后Selector
线程负载均衡器选择一个Selector
线程,完成接下来的IO
读写事件。
接下来继续查看doAddAccept()
方法的实现,毫无悬念,它进一步调用了SelectorThread
的addAcceptedConnection()
方法,把非阻塞传输通道对象传递给选择器线程做进一步的IO
读写操作。
SelectorThreadLoadBalancer
如何创建?
SelectorThreadLoadBalancer
是一个基于轮询算法的Selector
线程选择器,通过线程迭代器为新进来的连接顺序分配SelectorThread
。
SelectorThread
和AcceptThread
一样,是TThreadedSelectorServer
的一个成员内部类,每个SelectorThread
线程对象内部都有一个阻塞式的队列,用于存放该线程被接收的连接通道。
阻塞队列的大小可由构造函数指定:
上面看到,在AcceptThread
的doAddAccept()
方法中调用了SelectorThread
的addAcceptedConnection()
方法。
这个方法做了两件事:
SelectorThread
线程接收的连接通道放入阻塞队列中。wakeup()
方法唤醒SelectorThread
中的NIO
选择器selector
。
既然SelectorThread
也是继承于Thread
,查看其run()
方法的实现:
SelectorThread
方法的select()
监听IO
事件,仅仅用于处理数据读取和数据写入。如果连接有数据可读,读取并以frame
的方式缓存;如果需要向连接中写入数据,缓存并发送客户端的数据。且在数据读写处理完成后,需要向NIO
的selector
清空和注销自身的SelectionKey
。
rpc
调用过程也就结束了,handleWrite()
方法如下:
Thrift
会利用已读数据执行目标方法,handleRead()
方法如下:
handleRead
方法在执行read()
方法,将数据读取完成后,会调用requestInvoke()
方法调用目标方法完成具体业务处理。requestInvoke()
方法将请求数据封装为一个Runnable
对象,提交到工作任务线程池(ExecutorService
)进行处理。
select()
方法完成后,线程继续运行processAcceptedConnections()
方法处理下一个连接的IO
事件。
这里比较核心的几个操作:
SelectorThread
的阻塞队列acceptedQueue
中获取一个连接的传输通道。如果获取成功,调用registerAccepted()
方法;否则,进入下一次循环。registerAccepted()
方法将传输通道底层的连接注册到NIO
的选择器selector
上面,获取到一个SelectionKey
。FrameBuffer
对象,并绑定到获取的SelectionKey
上面,用于数据传输时的中间读写缓存。本文对Thrift
的各种线程服务模型进行了介绍,包括2种阻塞式服务模型:TSimpleServer
、TThreadPoolServer
,3种非阻塞式服务模型:TNonblockingServer
、THsHaServer
和TThreadedSelectorServer
。对各种服务模型的具体用法、工作流程、原理和源码实现进行了一定程度的分析。
标签:star abstract protected 结束 pool 优雅 list ext 数据封装
原文地址:https://www.cnblogs.com/sea520/p/12165778.html