码迷,mamicode.com
首页 > 其他好文 > 详细

Chapter 6-03

时间:2016-04-30 06:38:32      阅读:171      评论:0      收藏:0      [点我收藏+]

标签:

Please indicate the source: http://blog.csdn.net/gaoxiangnumber1
Welcome to my github: https://github.com/gaoxiangnumber1
6.6 详解muduo多线程模型
?本节以Sudoku Solver为例,回顾了并发网络服务程序的多种设计方案,并介绍了使用muduo网络库编写多线程服务器的两种最常用手法。本节代码参见:examples/sudoku/。
6.6.1 数独求解服务器
?假设有一个网络编程任务:写一个求解数独的程序,并把它做成一个网络服务。
?挑战在于怎样做才能发挥现在多核硬件的能力?在谈这个问题之前,让我们先写一个基本的单线程版。
协议
?一个简单的以\r\n分隔的文本行协议,使用 TCP 长连接,客户端在不需要服务时主动断开连接。
请求:[id:]<81digits>\r\n
响应:[id:]<81digits>\r\n
或者:[id:]NoSolution\r\n


‘\r’是回车,使光标到行首(carriage return)
‘\n’是换行,使光标下移一格(line feed)


?[id:]表示可选的id,用于区分先后的请求,以支持Parallel Pipelining,响应中会回显请求中的id。Parallel Pipelining见《以小见大——那些基于Protobuf的五花八门的RPC(2) 》26(http://blog.csdn.net/lanphaday/archive/2011/04/11/6316099.aspx),或者见《分布式系统的工程化开发方法》27 http://blog.csdn.net/solstice/article/details/5950190第54页。
?<81digits> 是 Sudoku 的棋盘,9 × 9 个数字,从左上角到右下角按行扫描,未知数字以 0 表示。如果 Sudoku 有解,那么响应是填满数字的棋盘;如果无解,则返回 NoSolution。
?例子 1 请求:
000000010400000000020000000000050407008000300001090000300400200050100000000806000\r\n
响应:
693784512487512936125963874932651487568247391741398625319475268856129743274836159\r\n
?例子 2 请求:
a:000000010400000000020000000000050407008000300001090000300400200050100000000806000\r\n
响应:
a:693784512487512936125963874932651487568247391741398625319475268856129743274836159\r\n
?例子 3 请求:
b:000000010400000000020000000000050407008000300001090000300400200050100000000806005\r\n
响应:b:NoSolution\r\n
?基于这个文本协议,我们可以用telnet模拟客户端来测试Sudoku Solver,不需要单独编写 Sudoku Client。Sudoku Solver 的默认端口号是 9981,因为它有 9 × 9 = 81个格子。
基本实现
?Sudoku 的算法见《谈谈数独(Sudoku)》28 http://blog.csdn.net/Solstice/archive/2008/02/15/2096209.aspx。假设已经有一个函数能求解,原型如下:
string solveSudoku(const string& puzzle);
?函数的输入是上文的“<81digits>” ,输出是“<81digits>”或“NoSolution”。这个函数是个pure function,同时也是线程安全的。


?纯函数(Pure Function)与外界交换数据只有唯一渠道——参数和返回值。其在逻辑上没有副作用(副作用就是潜在地和函数的外部环境交换数据)——这只是从抽象角度来说。从现实角度讲,任何函数在CPU级别上都存在副作用,大多数函数在堆级别上也有副作用。就算有这些副作用,这种在逻辑层面上的抽象也是有意义的。
?纯函数不读写全局变量,无状态、无IO,不改变传入的任何参数。理想情况下,不会给他传入任何外部数据,因为一旦传入一个诸如My_Globals的指针,上面的这些规则就都被打破了。
?纯函数具有线程安全性。只有传值参数的纯函数是完全线程安全的。如果有引用或者指针类型的参数,就算是const类型的,你也得注意:另一个会执行non-pure操作的线程可能会改变或者释放这些引用或者指针,这很危险。即使在这种情况下存在隐患,纯函数仍不失为一种安全编写多线程程序的有力工具。


?有了这个函数,以§ 6.4.2中EchoServer为蓝本,稍加修改就能得到SudokuServer。完整的代码见examples/sudoku/server_basic.cc。onMessage()的主要功能是处理协议格式,并调用solveSudoku()求解问题。这个函数应该能正确处理TCP分包。

    void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
    {
        LOG_DEBUG << conn->name();
        size_t len = buf->readableBytes();
        while (len >= kCells + 2)
        {
            const char* crlf = buf->findCRLF();
            if (crlf)
            {
                string request(buf->peek(), crlf);
                buf->retrieveUntil(crlf + 2);
                len = buf->readableBytes();
                if (!processRequest(conn, request))
                {
                    conn->send("Bad Request!\r\n");
                    conn->shutdown();
                    break;
                }
            }
            else if (len > 100) // id + ":" + kCells + "\r\n"
            {
                conn->send("Id too long!\r\n");
                conn->shutdown();
                break;
            }
            else
            {
                break;
            }
        }
    }

    bool processRequest(const TcpConnectionPtr& conn, const string& request)
    {
        string id;
        string puzzle;
        bool goodRequest = true;

        string::const_iterator colon = find(request.begin(), request.end(), ‘:‘);
        if (colon != request.end())
        {
            id.assign(request.begin(), colon);
            puzzle.assign(colon+1, request.end());
        }
        else
        {
            puzzle = request;
        }

        if (puzzle.size() == implicit_cast<size_t>(kCells))
        {
            LOG_DEBUG << conn->name();
            string result = solveSudoku(puzzle);
            if (id.empty())
            {
                conn->send(result+"\r\n");
            }
            else
            {
                conn->send(id+":"+result+"\r\n");
            }
        }
        else
        {
            goodRequest = false;
        }
        return goodRequest;
    }

?server_basic.cc是一个并发服务器,可以同时服务多个客户连接。但是它是单线程的,无法发挥多核硬件的能力。
?Sudoku是一个计算密集型的任务(见§ 7.4中关于其性能的分析),其瓶颈在CPU。为了让这个单线程 server_basic 程序充分利用 CPU 资源,一个简单的办法是在同一台机器上部署多个 server_basic 进程,让每个进程占用不同的端口,比如在一台 8 核机器上部署 8 个 server_basic 进程,分别占用 9981,9982,…,9988 端口。这样做是把难题推给了客户端,因为客户端 (s) 要自己做负载均衡。
?能不能在一个端口上提供服务,并且又能发挥多核处理器的计算能力呢?
6.6.2 常见的并发网络服务程序设计方案
?《UNIX网络编程-2nd》第27章 “Client-Server Design Alternatives”介绍了十来种当时(20世纪90年代末)流行的编写并发网络程序的方案。[UNP]第3版第30章还是这几种。以下简称 UNP CSDA方案。
?目前高性能 httpd 普遍采用的是单线程Reactor方式。
?可伸缩网络编程,POSA2进行了相当全面的总结,另外以下几篇文章也值得参考。
http://bulk.fefe.de/scalable-networking.pdf
http://www.kegel.com/c10k.html
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

?“互通”指的是如果开发chat服务,多个客户连接之间是否能方便地交换数据。
?“顺序性”指的是在httpd/Sudoku这类请求响应服务中,如果客户连接顺序发送多个请求,那么计算得到的多个响应是否按相同的顺序发还给客户(这里指的是在自然条件下,不含刻意同步)。
?UNP CSDA方案归入 0 ~ 5。方案5是目前用得很多的单线程Reactor方案。方案6和方案7不是实用的方案,只是作为过渡品。


?当网络通信采用TCP协议时,在真正的读写操作之前,server与client之间必须建立一个连接,当读写操作完成后,双方不再需要这个连接时它们可以释放这个连接,连接的建立是需要三次握手的,而释放则需要4次握手,所以说每个连接的建立都是需要资源消耗和时间消耗的。
?长连接:指在一个TCP连接上可以连续发送多个数据包,在TCP连接保持期间,如果没有数据包发送,需要双方发检测包以维持此连接;一般需要自己做在线维持。
?短连接: 指通信双方有数据交互时,就建立一个TCP连接,数据发送完成后,则断开此TCP连接。优点是:管理起来比较简单,存在的连接都是有用的连接,不需要额外的控制手段。比如http的,只是连接、请求、关闭,过程时间较短,服务器若是一段时间内没有收到请求即可关闭连接。
?长连接与短连接的操作过程
短连接操作步骤是:连接→数据传输→关闭连接;
长连接:连接→数据传输→保持连接(心跳)→数据传输→保持连接(心跳) →……→关闭连接;
?长连接多用于操作频繁,点对点的通讯,而且连接数不能太多情况。每个TCP连接都需要三步握手,这需要时间,如果每个操作都是先连接,再操作的话那么处理速度会降低很多,所以每个操作完后都不断开,下次次处理时直接发送数据包就OK了,不用建立TCP连接。例如:数据库的连接用长连接, 如果用短连接频繁的通信会造成socket错误,而且频繁的socket 创建也是对资源的浪费。


方案0 accept+read/write
?这不是并发服务器,而是iterative服务器,因为它一次只能服务一个客户。代码见[UNP] Figure 1.9。这个方案不适合长连接,很适合daytime 这种 write-only 短连接服务。以下 Python代码展示用方案 0 实现 echo server 的大致做法(Python代码均没有考虑错误处理): recipes/python/echo-iterative.py

1.#!/usr/bin/python
2.
3.import socket
4.
5.def handle(client_socket, client_address):
6.    while True:
7.        data = client_socket.recv(4096)
8.        if data:
9.            sent = client_socket.send(data)
10.        else:
11.            print "disconnect", client_address
12.            client_socket.close()
13.            break
14.
15.if __name__ == "__main__":
16.    listen_address = ("0.0.0.0", 2007)
17.    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
18.    server_socket.bind(listen_address)
19.    server_socket.listen(5)
20.
21.    while True:
22.        (client_socket, client_address) = server_socket.accept()
23.        print "got connection from", client_address
24.        handle(client_socket, client_address)

?L6~L13是echo服务的“业务逻辑循环”,L21~L24可以看出它一次只能服务一个客户连接。
?后面列举的方案都是在保持这个循环的功能不变的情况下,设法能高效地同时服务多个客户端。
方案1 accept+fork
?[UNP]称之为child-per-client或fork()-per-client,也俗称process-per-connection。这种方案适合并发连接数不大的情况。这种方案适合“计算响应的工作量远大于fork()的开销”这种情况,比如数据库服务器。这种方案适合长连接,但不太适合短连接,因为 fork()开销大于求解Sudoku的用时。
?L9~L16是前面的业务逻辑循环,self.request代替了前面的client_socket。ForkingTCPServer会对每个客户连接新建一个子进程,在子进程中调用EchoHandler.handle(),从而同时服务多个客户端。在这种编程方式中,业务逻辑已经初步从网络框架分离出来,但是仍然和IO紧密结合。recipes/python/echo-fork.py

1.#!/usr/bin/python
2.
3.from SocketServer import BaseRequestHandler, TCPServer
4.from SocketServer import ForkingTCPServer, ThreadingTCPServer
5.
6.class EchoHandler(BaseRequestHandler):
7.    def handle(self):
8.        print "got connection from", self.client_address
9.        while True:
10.            data = self.request.recv(4096)
11.            if data:
12.                sent = self.request.send(data)    # sendall?
13.            else:
14.                print "disconnect", self.client_address
15.                self.request.close()
16.                break
17.
18.if __name__ == "__main__":
19.    listen_address = ("0.0.0.0", 2007)
20.    server = ForkingTCPServer(listen_address, EchoHandler)
21.    server.serve_forever()

方案 2 accept+thread
?这是thread-per-connection。它的初始化开销比方案1要小很多,但仍然不适合短连接服务。这种方案的伸缩性受到线程数的限制,一两百个还行,几千个的话对操作系统的scheduler是个不小的负担。
?Python示例如下,只改动了一行代码。ThreadingTCPServer 会对每个客户连接新建一个线程,在该线程中调用 EchoHandler.handle()。

xiang :~/Gao/Books/Muduo/recipes/python $ diff -U2 echo-fork.py echo-thread.py 
--- echo-fork.py    2015-04-02 15:36:58.000000000 +0800
+++ echo-thread.py  2015-04-02 15:36:58.000000000 +0800
@@ -184 +184 @@
 if __name__ == "__main__":
     listen_address = ("0.0.0.0", 2007)
-    server = ForkingTCPServer(listen_address, EchoHandler)
+    server = ThreadingTCPServer(listen_address, EchoHandler)
     server.serve_forever()

?这里体现了将“并发策略”与业务逻辑(EchoHandler.handle())分离的思路。用同样的思路重写方案 0 的代码,可得到:

xiang :~/Gao/Books/Muduo/recipes/python $ diff -U2 echo-fork.py echo-thread.py
--- echo-fork.py    2015-04-02 15:36:58.000000000 +0800
+++ echo-thread.py  2015-04-02 15:36:58.000000000 +0800
@@ -184 +184 @@
 if __name__ == "__main__":
     listen_address = ("0.0.0.0", 2007)
-    server = ForkingTCPServer(listen_address, EchoHandler)
+    server = ThreadingTCPServer(listen_address, EchoHandler)
     server.serve_forever()

方案 3 prefork
?这是针对方案 1 的优化,[UNP] 详细分析了几种变化,包括对accept(2)“惊群”问题(thundering herd)的考虑。
方案 4 pre threaded
?这是对方案 2 的优化,[UNP] 详细分析了它的几种变化。方案 3 和方案4 这两个方案都是 Apache httpd 长期使用的方案。
?以上几种方案都是阻塞式网络编程,程序流程(thread of control)通常阻塞在read()上,等待数据到达。但是 TCP 是个全双工协议,同时支持 read() 和 write()操作,当一个线程/进程阻塞在 read() 上,但程序又想给这个 TCP 连接发数据,那该怎么办?比如说echo client,既要从 stdin 读,又要从网络读,当程序正在阻塞地读网络的时候,如何处理键盘输入?
?一种方法是用两个线程/进程,一个负责读,一个负责写。[UNP] 在实现 echo client 时介绍了这种方案。§ 7.13 举了一个 Python 双线程 TCP relay 的例子,另外见Python Pinhole 的代码: http://code.activestate.com/recipes/114642/
?另一种方法是使用IO multiplexing,也就是select/poll/epoll/kqueue系列的“多路选择器”,让一个 thread of control 能处理多个连接。“IO 复用”其实复用的不是IO连接,而是复用线程。使用 select/poll 几乎肯定要配合 non-blocking IO,而使用 non-blocking IO 肯定要使用应用层 buffer,原因见 § 7.4。
?先用一小段Python代码回顾“以IO multiplexing方式实现并发echo server”的基本做法。以下代码没有开启non-blocking,也没有考虑数据发送不完整(L28)等情况。这个例子参照了http://scotdoyle.com/python-epoll-howto.html#async-examples
?首先定义一个从文件描述符到socket对象的映射(L14),程序的主体是一个事件循环(L15~L32) ,每当有 IO 事件发生时,就针对不同的文件描述符(fileno)执行不同的操作(L16, L17)。对于 listening fd,接受(accept)新连接,并注册到 IO 事件关注列表(watch list),然后把连接添加到connections 字典中(L18~L23)。对于客户连接,则读取并回显数据,并处理连接的关闭(L24~L32)。对于 echo 服务而言,真正的业务逻辑只有 L28:将收到的数据原样发回客户端。recipes/python/echo-poll.py

1.#!/usr/bin/python
2.
3.import socket
4.import select
5.
6.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
7.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
8.server_socket.bind((‘‘, 2007))
9.server_socket.listen(5)
10.# server_socket.setblocking(0)
11.poll = select.poll() # epoll() should work the same
12.poll.register(server_socket.fileno(), select.POLLIN)
13.
14.connections = {}
15.while True:
16.    events = poll.poll(10000)  # 10 seconds
17.    for fileno, event in events:
18.        if fileno == server_socket.fileno():
19.            (client_socket, client_address) = server_socket.accept()
20.            print "got connection from", client_address
21.            # client_socket.setblocking(0)
22.            poll.register(client_socket.fileno(), select.POLLIN)
23.            connections[client_socket.fileno()] = client_socket
24.        elif event & select.POLLIN:
25.            client_socket = connections[fileno]
26.            data = client_socket.recv(4096)
27.            if data:
28.                client_socket.send(data) # sendall() partial?
29.            else:
30.                poll.unregister(fileno)
31.                client_socket.close()
32.                del connections[fileno]

?以上代码不是功能完善的 IO multiplexing 范本,它没有考虑错误处理,也没有实现定时功能,而且只适合侦听(listen)一个端口的网络服务程序。如果需要侦听多个端口,或者要同时扮演客户端,那么代码的结构需要推倒重来。
?这个代码骨架可用于实现多种TCP服务器。例如写一个聊天服务只需改动3行代码,如下。业务逻辑是L28~L30:将本连接收到的数据转发给其他客户连接。

xiang :~/Gao/Books/Muduo/recipes/python $  diff echo-poll.py chat-poll.py -U4
--- echo-poll.py    2015-04-02 15:36:58.000000000 +0800
+++ chat-poll.py    2015-04-02 15:36:58.000000000 +0800
@@ -249 +2411 @@
         elif event & select.POLLIN:
             client_socket = connections[fileno]
             data = client_socket.recv(4096)
             if data:
-                client_socket.send(data) # sendall() partial?
+                for (fd, other_socket) in connections.iteritems():
+                    if other_socket != client_socket:
+                        other_socket.send(data) # sendall() partial?
             else:
                 poll.unregister(fileno)
                 client_socket.close()
                 del connections[fileno]

?但是把业务逻辑隐藏在一个大循环中的做法不利于将来功能的扩展,能不能把业务逻辑抽取出来,与网络基础代码分离呢?
?Doug Schmidt(Reactor模式之父)指出,网络编程中有很多是事务性(routine)的工作,可以提取为公用的框架或库,而用户只需要填上关键的业务逻辑代码,并将回调注册到框架中,就可以实现完整的网络服务,这正是Reactor模式的主要思想。
?Reactor的意义在于将消息(IO事件)分发到用户提供的处理函数,并保持网络部分的通用代码不变,独立于用户的业务逻辑。
?单线程Reactor的程序执行顺序如图6-11 (左图)所示。在没有事件的时候,线程等待在select/poll/epoll_wait等函数上。事件到达后由网络库处理IO,再把消息通知(回调)客户端代码。Reactor事件循环所在的线程通常叫 IO 线程。通常由网络库负责读写 socket,用户代码负载解码、计算、编码。
?由于只有一个线程,因此事件是顺序处理的,一个线程同时只能做一件事情。在这种协作式多任务中,事件的优先级得不到保证,因为从“poll返回之后”到“下一次调用 poll 进入等待之前”这段时间内,线程不会被其他连接上的数据或事件抢占(图6-11右图)。如果我们想要延迟计算(把 compute() 推迟 100ms),那么也不能用 sleep() 之类的阻塞调用,而应该注册超时回调,以避免阻塞当前 IO 线程。

方案 5 poll(reactor)
?基本的单线程 Reactor 方案(见图 6-11) ,即前面的 server_basic.cc 程序。
?这种方案的优点是由网络库搞定数据收发,程序只关心业务逻辑;缺点:适合IO密集的应用,不太适合CPU密集的应用,因为较难发挥多核的威力。另外,与方案 2(accept+thread)相比,方案 5 处理网络消息的延迟可能要略大一些,因为方案2直接一次 read(2) 系统调用就能拿到请求数据,而方案 5 要先 poll(2) 再 read(2),多了一次系统调用。
?这里用Python代码展示 Reactor 模式的雏形。直接使用了全局变量,没有处理异常。程序的核心仍然是事件循环(L42~L46),与前面不同的是,事件的处理通过handlers转发到各个函数中,不再集中在一坨。例如listening fd 的处理函数是handle_accept,它会注册客户连接的 handler。普通客户连接的处理函数是 handle_request,其中又把连接断开和数据到达这两个事件分开,后者由 handle_input 处理。业务逻辑位于单独的 handle_input 函数,实现了分离。

1.#!/usr/bin/python
2. 
3.import socket
4.import select
5. 
6.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
7.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
8.server_socket.bind((‘‘, 2007))
9.server_socket.listen(5)
10.# serversocket.setblocking(0)
11. 
12.poll = select.poll() # epoll() should work the same
13.connections = {}
14.handlers = {}
15. 
16.def handle_input(socket, data):
17.    socket.send(data) # sendall() partial?
18. 
19.def handle_request(fileno, event):
20.    if event & select.POLLIN:
21.        client_socket = connections[fileno]
22.        data = client_socket.recv(4096)
23.        if data:
24.            handle_input(client_socket, data)
25.        else:
26.            poll.unregister(fileno)
27.            client_socket.close()
28.            del connections[fileno]
29.            del handlers[fileno]
30. 
31.def handle_accept(fileno, event):
32.    (client_socket, client_address) = server_socket.accept()
33.    print "got connection from", client_address
34.    # client_socket.setblocking(0)
35.    poll.register(client_socket.fileno(), select.POLLIN)
36.    connections[client_socket.fileno()] = client_socket
37.    handlers[client_socket.fileno()] = handle_request
38. 
39.poll.register(server_socket.fileno(), select.POLLIN)
40.handlers[server_socket.fileno()] = handle_accept
41. 
42.while True:
43.    events = poll.poll(10000)  # 10 seconds
44.    for fileno, event in events:
45.        handler = handlers[fileno]
46.        handler(fileno, event)

?如果要改成聊天服务,重新定义handle_input函数即可,程序的其余部分保持不变。

xiang :~/Gao/Books/Muduo/recipes/python $  diff echo-reactor.py chat-reactor.py -U1
--- echo-reactor.py 2015-04-02 15:36:58.000000000 +0800
+++ chat-reactor.py 2015-04-02 15:36:58.000000000 +0800
@@ -163 +165 @@
 def handle_input(socket, data):
-    socket.send(data) # sendall() partial?
+    for (fd, other_socket) in connections.iteritems():
+        if other_socket != socket:
+            other_socket.send(data) # sendall() partial?

?完善的非阻塞IO网络库远比上面的玩具代码复杂,需要考虑各种错误场景。特别是要真正接管数据的收发,而不是像上面的示例那样直接在事件处理回调函数中发送网络数据。
?注意在使用非阻塞 IO +事件驱动方式编程的时候,一定要注意避免在事件回调中执行耗时的操作,包括阻塞 IO 等,否则会影响程序的响应。
方案 6 reactor + thread-per-task
?这是一个过渡方案,收到 Sudoku 请求之后,不在 Reactor 线程计算,而是创建一个新线程去计算,以充分利用多核 CPU。这是初级的多线程应用,因为它为每个请求(而不是每个连接)创建了一个新线程。这个开销可以用线程池来避免,即方案8。
?这个方案有一个特点是out-of-order,即同时创建多个线程去计算同一个连接上收到的多个请求,那么算出结果的次序是不确定的,可能第 2 个Sudoku 比较简单,比第 1 个先算出结果。这也是我们在一开始设计协议的时候使用了 id 的原因,以便客户端区分 response 对应的是哪个 request。
方案 7 reactor + worker thread
?为了让返回结果的顺序确定,可以为每个连接创建一个计算线程,每个连接上的请求固定发给同一个线程去算,先到先得。这也是一个过渡方案,因为并发连接数受限于线程数目,这个方案不如直接使用阻塞IO的thread-per-connection方案2。
?方案7与方案6的另外一个区别是单个client的最大CPU占用率。在方案 6 中,一个TCP连接上发来的一长串突发请求(burst requests)可以占满全部 8 个 core;而在方案 7 中,由于每个连接上的请求固定由同一个线程处理,那么它最多占用 12.5%的 CPU 资源。这两种方案各有优劣,取决于应用场景的需要(到底是公平性重要还是突发性能重要)。这个区别在方案 8 和方案 9 中同样存在,需要根据应用来取舍。
方案 8 reactor + thread poll
?为了弥补方案 6 中为每个请求创建线程的缺陷,我们使用固定大小线程池,程序结构如图 6-12 所示。

?全部的IO工作都在一个Reactor线程完成,而计算任务交给thread pool。如果计算任务彼此独立,而且IO的压力不大,那么这种方案是非常适用的。Sudoku Solver 正好符合。代码参见:examples/sudoku/server_threadpool.cc。
?方案8使用线程池的代码与单线程Reactor的方案5相比变化不大,只是把原来onMessage()中涉及计算和发回响应的部分抽出来做成一个函数,然后交给ThreadPool去计算。方案8有乱序返回的可能,客户端要根据id来匹配响应。
?线程池的另外一个作用是执行阻塞操作。比如有的数据库的客户端只提供同步访问,那么可以把数据库查询放到线程池中,可以避免阻塞IO线程,不会影响其他客户连接。另外也可以用线程池来调用一些阻塞的IO函数,如 fsync(2)/fdatasync(2),这两个函数没有非阻塞的版本30。
?如果IO的压力比较大,一个Reactor处理不过来,可以用方案9,它采用多个Reactor来分担负载。
方案 9 reactors in threads
?这是muduo的多线程方案。特点是one loop per thread,有一个main Reactor负责accept(2)连接,然后把连接挂在某个sub Reactor中(muduo采用 round-robin的方式来选择sub Reactor),这样该连接的所有操作都在那个sub Reactor所处的线程中完成。多个连接可能被分派到多个线程中,以充分利用CPU。
?muduo采用固定大小的Reactor pool,池子的大小根据CPU数目确定,就是说线程数是固定的,这样程序的总体处理能力不会随连接数增加而下降。另外,由于一个连接完全由一个线程管理,那么请求的顺序性有保证,突发请求也不会占满全部8个核(如果需要优化突发请求,可以考虑方案11)。这种方案把IO分派给多个线程,防止出现一个Reactor的处理能力饱和。
?与方案8的线程池相比,方案9减少了进出thread pool的两次上下文切换,在把多个连接分散到多个Reactor线程之后,小规模计算可以在当前IO线程完成并发回结果,从而降低响应的延迟。这是一个适应性很强的多线程 IO 模型,因此把它作为muduo的默认线程模型(见图 6-13)。

?方案9代码见:examples/sudoku/server_multiloop.cc。与server_basic.cc的区别很小,最关键的只有一行代码:server_.setThreadNum(numThreads);

--- server_basic.cc 2016-04-06 01:42:50.000000000 +0800
+++ server_multiloop.cc 2016-04-06 01:42:50.000000000 +0800
@@ -2018 +2021 @@ using namespace muduo::net;
 class SudokuServer
 {
  public:
-  SudokuServer(EventLoop* loop, const InetAddress& listenAddr)
+  SudokuServer(EventLoop* loop, const InetAddress& listenAddr, int numThreads)
     : server_(loop, listenAddr, "SudokuServer"),
+      numThreads_(numThreads),
       startTime_(Timestamp::now())
   {
     server_.setConnectionCallback(
         boost::bind(&SudokuServer::onConnection, this, _1));
     server_.setMessageCallback(
         boost::bind(&SudokuServer::onMessage, this, _1, _2, _3));
+    server_.setThreadNum(numThreads);
   }

方案 10 reactors in processes
?这是 Nginx 的内置方案。如果连接之间无交互,这种方案也是很好的选择。工作进程之间相互独立,可以热升级。
方案 11 reactors + thread pool
?把方案8和9混合,既使用多个Reactor来处理IO,又使用线程池来处理计算。这种方案适合既有突发 IO (利用多线程处理多个连接上的 IO),又有突发计算的应用(利用线程池把一个连接上的计算任务分配给多个线程去做),见图 6-14。

?这种方案写起来只要把方案8的代码加一行server_.setThreadNum(numThreads); 就行。
?一个程序到底是使用一个event loop还是使用多个event loops呢?ZeroMQ的手册给出的建议是31http://www.zeromq.org/area:faq#toc3按照每千兆比特每秒的吞吐量配一个event loop的比例来设置event loop的数目,即muduo::TcpServer::setThreadNum()的参数。依据这条经验规则,在编写运行于千兆以太网上的网络程序时,用一个 event loop 就足以应付网络IO。如果程序本身没有多少计算量,而主要瓶颈在网络带宽,那么可以按这条规则来办,只用一个 event loop。另一方面,如果程序的 IO 带宽较小,计算量较大,而且对延迟不敏感,那么可以把计算放到 thread pool 中,也可以只用一个 event loop。
?注意,以上假定了TCP连接是同质的,没有优先级之分,我们看重的是服务程序的总吞吐量。但是如果 TCP 连接有优先级之分,那么单个 event loop 可能不适合,正确的做法是把高优先级的连接用单独的 event loop 来处理。
?在muduo中,属于同一个event loop的连接之间没有事件优先级的差别。原因是为了防止优先级反转。比方说一个服务程序有10个心跳连接,有10个数据请求连接,都归属同一个event loop,我们认为心跳连接有较高的优先级,心跳连接上的事件应该优先处理。但是由于事件循环的特性,如果数据请求连接上的数据先于心跳连接到达(早到1ms),那么这个event loop就会调用相应的event handler去处理数据请求,而在下一次epoll_wait()的时候再来处理心跳事件。因此在同一个event loop中区分连接的优先级并不能达到预想的效果。我们应该用单独的event loop来管理心跳连接,这样就能避免数据连接上的事件阻塞了心跳事件,因为它们分属不同的线程。
?总结起来, 我推荐的C++多线程服务端编程模式为: one loop per thread + thread pool。
? event loop用作non-blocking IO和定时器。
? thread pool用来做计算,具体可以是任务队列或生产者消费者队列。
?实用的方案有5种,muduo直接支持后4种,见表6-2。此表参考了《Characteristics of multithreading models for high-performance IO driven network applications》(http://arxiv.org/ftp/arxiv/papers/0909/0909.4934.pdf) 。

?用银行柜台办理业务为比喻,简述各种模型的特点。
?银行有旋转门,办理业务的客户人员从旋转门进出(IO) ;银行也有柜台,客户在柜台办理业务(计算)。要想办理业务,客户要先通过旋转门进入银行;办理完之后,客户要再次通过旋转门离开银行。一个客户可以办理多次业务,每次都必须从旋转门进出(TCP 长连接)。另外,旋转门一次只允许一个客户通过(无论进出) ,因为read()/write()只能同时调用其中一个。
方案5
?这间银行有一个旋转门、一个柜台,每次只允许一名客户办理业务。而且当有人在办理业务时,旋转门是锁住的(计算和 IO 在同一线程)。为了维持工作效率,银行要求客户应该尽快办理业务,最好不要在取款的时候打电话去问家里人密码,这会阻塞其他堵在门外的客户。如果客户很少,这是很经济且高效的方案;但是如果场地较大(多核),则这种布局就浪费了不少资源,只能并发(concurrent)不能并行(parallel)。如果确实一次办不完,应该离开柜台,到门外等着,等银行通知再来继续办理(分阶段回调)。
方案8
?这间银行有一个旋转门,一个或多个柜台。银行进门之后有一个队列,客户在这里排队到柜台(线程池)办理业务。即在单线程Reactor后面接了一个线程池用于计算,可以利用多核。旋转门基本是不锁的,随时都可以进出。但是排队会消耗一点时间,相比之下,方案5中客户一进门就能立刻办理业务。另外一种做法是线程池里的每个线程有自己的任务队列,而不是整个线程池共用一个任务队列。这样的好处是避免全局队列的锁争用,坏处是计算资源有可能分配不平均,降低并行度。
方案9
?这间银行相当于包含方案5中的多家小银行,每个客户进大门的时候就被固定分配到某一间小银行中,他的业务只能由这间小银行办理,他每次都要进出小银行的旋转门。但总体来看,大银行可以同时服务多个客户。这时同样要求办理业务时不能空等(阻塞),否则会影响分到同一间小银行的其他客户。而且必要的时候可以为VIP客户单独开一间或几间小银行,优先办理VIP业务。这跟方案5不同,当普通客户在办理业务的时候,VIP客户也只能在门外等着(见图 6-11 的右图)。这是一种适应性很强的方案,也是 muduo 原生的多线程 IO 模型。
方案11
?这间银行有多个旋转门,多个柜台。旋转门和柜台之间没有一一对应关系,客户进大门的时候就被固定分配到某一旋转门中(奇怪的安排,易于实现线程安全的IO,见§ 4.6) ,进入旋转门之后,有一个队列,客户在此排队到柜台办理业务。这种方案的资源利用率可能比方案9更高,一个客户不会被同一小银行的其他客户阻塞,但延迟也比方案9略大。
Please indicate the source: http://blog.csdn.net/gaoxiangnumber1
Welcome to my github: https://github.com/gaoxiangnumber1

Chapter 6-03

标签:

原文地址:http://blog.csdn.net/gaoxiangnumber1/article/details/51285041

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!