标签:
IO:input和output的缩写,即输入/输出端口。每个设备都会有一个专用的I/O地址,用来处理自己的输入输出信息
对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,它会经历两个阶段:
1 等待数据准备 (Waiting for the data to be ready)
2 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
记住这两点很重要,因为这些IO Model的区别就是在两个阶段上各有不同的情况
同步阻塞IO(Blocking IO):即传统的IO模型
?
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如没有收到一个完整的TCP/UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除 block的状态,重新运行起来。
所以阻塞:blocking IO的特点是I/O执行时的两个操作(等待数据准备 (Waiting for the data to be ready)、将数据从内核拷贝到进程中(Copying the data from the kernel to the process))都是阻塞的。
python socket中:accept() recv() 是阻塞的
所以,所谓阻塞型接口是指系统调用(一般是IO接口)如果不返回结果就一直阻塞,就是socket经常说的,有发就有收,收发必相等.
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。
所以,用户进程其实是需要不断的主动询问kernel数据好了没有
* IO多路复用(IO Multiplexing):即经典的Reactor设计模式,有时也称为异步阻塞IO,Java中的Selector和Linux中的epoll都是这种模型。
?
用户首先将需要进行IO操作的socket添加到select中,然后阻塞等待select系统调用返回。当数据到达时,socket被激活,select函数返回。用户线程正式发起read请求,读取数据并继续执行。
从流程上来看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的
* 异步IO(Asynchronous IO):即经典的Proactor设计模式,也称为异步非阻塞IO
?
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
IO多路复用是通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
IO多路复用适用如下场合:
(1)当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。
(2)当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。
(3)如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。
(4)如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。
(5)如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。
与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。
epoll
概述
sokect基础中,默认服务端只能同时处理一个客户端的请求,当一个客户端连接到服务器之后,其他客户端只能处于等待状态,而IO多路复用可以实现服务端同时处理多个客户端请求,
Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用
根据系统不同:支持的方法也不同
Windows Python:
提供: select
Mac Python:
提供: select
Linux Python:
提供: select、poll、epoll
注意:网络操作、文件操作、终端操作等均属于IO操作,对于windows只支持Socket操作,其他系统支持其他IO操作,但是无法检测 普通文件操作 自动上次读取是否已经变化。
普通文件操作所有系统都是完成不了的,普通文件是属于I/O操作!但是对于python来说文件变更python是监控不了的,所以我们能用的只有是“终端的输入输出,Socket的输入输出”
实现原理
先看一段同步阻塞型代码:
import socket ip_port = (‘127.0.0.1‘,8080) s = socket.socket() s.bind(ip_port) s.listen(5) conn,addr = s.accept() res = conn.recv(1024) conn.send(res) conn.close()
下面需要理清楚几个概念:
- IO多路复用主要作用是监听socket对象内部是否变化
- socket对象什么时候变化? 连接或和客户端交互收发消息的时候socket对象会发生变化
- socket对象变化的实质 服务端创建的socket对象sk发生变化--->表示有新连接过来 服务端和客户端建立的conn连接对象变化了--->表示客户端发送消息过来了。要收发数据
- 所以,IO多路复用是不断监听服务端sk对象和conn对象是否发生变化,变化之后,调用对应的线程或进程进行相应的操作。这里用到了select模块中select()方法。该方法支持四个参数: select(rlist, wlist, xlist, timeout=None)
- 第一个参数是select监听的对象列表,在此列表中的对象一旦发生变化,select会返回变化的对象。如rlist为[sk1,sk2,sk3],如果sk1和sk3发生变化。则返回[sk1对象,sk3对象]
- 第二个参数是select永远认为变化的对象列表,即一旦对象出现在此列表中,每次循环检查select会认为此对象一直发生变化,所以每次都会返回该对象,例如wlist为[sk1],那么,每次循环select都会返回[sk1对象]
- 第三个参数是select检测对象是否发生错误的列表,在该列表中的对象如果发生错误,则返回该对象
- 第四个参数是超时时间
下面通过python代码逐步实现IO多路复用
1.实现动态检测对象是否发生变化
import socket import select sk = socket.socket() ip_port = ("127.0.0.1",8080) sk.bind(ip_port) sk.listen(5) inputs = [sk,] #定义监听的对象列表。默认只监听服务端socket对象 while True: #循环检测对象 #监听sk(服务端)对象,如果sk发生变化,表示有新连接来了,此时rlist的值为sk rlist,wlist,e = select.select(inputs,[],[],1) print("inputs:",len(inputs),"rlist:",len(rlist),inputs) #打印inputs 便于查看效果 for i in rlist: #循环变化的列表,然后对每个对象进行操作 if i == sk: #如果是服务端socket对象发生变化,表示新连接建立了 conn,ip = sk.accept() #建立连接 inputs.append(conn) #将新建立的连接加入监听的列表中 else: #否则是客户端给服务端发消息了。所以需要收消息 recv_data = i.recv(1024) i.send(recv_data)
运行服务端程序,在没有新连接之前,输出是这样
inputs: 1 rlist: 0 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080)>]
一个客户端和服务器建立连接:
inputs: 1 rlist: 0 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080)>] inputs: 1 rlist: 1 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080)>] inputs: 2 rlist: 0 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080)>, <socket.socket fd=320, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52810)>]
发现,当一个客户端和服务端建立连接之后,select检测到sk发生变化,所以rlist等于1,然后将新建立的连接对象加入到监控的inputs列表中,所以第二行inputs长度变为2,而此次循环没有检测到变化的sk,所以rlist为0,最后打印inputs列表得知,列表里存储的是对象
第二个客户端和服务器建立连接:
inputs: 2 rlist: 0 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080)>, <socket.socket fd=320, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52810)>] inputs: 2 rlist: 1 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080)>, <socket.socket fd=320, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52810)>] inputs: 3 rlist: 0 [<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080)>, <socket.socket fd=320, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52810)>, <socket.socket fd=248, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52873)>]
发现情况和第一个客户端一样
客户端发多条信息:
客户端: >> adasdasasd adasdasasd >> sdfsd sdsdfsd 服务端: inputs: 3 rlist: 0 [<socket.socket fd=284, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080)>, <socket.socket fd=292, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52955)>, <socket.socket fd=276, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52957)>] inputs: 3 rlist: 1 [<socket.socket fd=284, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080)>, <socket.socket fd=292, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52955)>, <socket.socket fd=276, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52957)>] inputs: 3 rlist: 0 [<socket.socket fd=284, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080)>, <socket.socket fd=292, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52955)>, <socket.socket fd=276, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52957)>] inputs: 3 rlist: 1 [<socket.socket fd=284, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080)>, <socket.socket fd=292, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52955)>, <socket.socket fd=276, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52957)>] inputs: 3 rlist: 0 [<socket.socket fd=284, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080)>, <socket.socket fd=292, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52955)>, <socket.socket fd=276, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8080), raddr=(‘127.0.0.1‘, 52957)>]
利用第二个参数,如果客户端发消息,则将客户端放在outputs中,select每次循环都会将对象返回到wlist,最后循环wlist,处理消息,发送至客户端,再在outpu中移除该对象
import socket import select sk = socket.socket() ip_port = ("127.0.0.1",8080) sk.bind(ip_port) sk.listen(5) inputs = [sk,] #定义监听的对象列表。默认只监听服务端socket对象 outputs = [] #定义消息输出列表 message = {} #定义一个字典,用于记录客户端对象和收到的消息 key为客户端对象,value为消息 while True: #循环检测对象 #监听sk(服务端)对象,如果sk发生变化,表示有新连接来了,此时rlist的值为sk rlist,wlist,e = select.select(inputs,outputs,[],1) print("inputs:",len(inputs),"rlist:",len(rlist),"outputs:",len(outputs),"wlist:",len(wlist)) #打印inputs 便于查看效果 #读数据 for i in rlist: #循环变化的列表,然后对每个对象进行操作 if i == sk: #如果是服务端socket对象发生变化,表示新连接建立了 conn,ip = sk.accept() #建立连接 inputs.append(conn) #将新建立的连接加入监听的列表中 message[conn] = [] #创建关于新连接的key else: #否则是客户端给服务端发消息了。所以需要收消息 try: recv_data = i.recv(1024) #接受数据 if not recv_data: #判断数据是否为空 raise Exception("disconnect") #为空的话,主动抛出异常 else: outputs.append(i) #将发消息的对象存入outputs message[i].append(recv_data) #将发消息的对象和消息存入message except: inputs.remove(i) #捕捉到异常,客户端断开连接,则在监控列表中删除此对象 del message[i] #在消息字典中删除此对象key/value #发数据 for i in wlist: #循环wlist列表。如果wlist列表中有值的话,说明收到客户端消息 msg = message[i].pop() #取出该客户端对象发的消息,并在列表中删除 resp = ‘respond‘.encode() + msg #处理客户端的消息 i.send(resp) #发送给客户端 outputs.remove(i) #outputs列表中删除该对象,如果不删除,下次循环还会误认为其发消息了 sk.close()
客户端建立连接,并发送消息:
inputs: 1 rlist: 0 outputs: 0 wlist: 0 inputs: 1 rlist: 0 outputs: 0 wlist: 0 inputs: 1 rlist: 0 outputs: 0 wlist: 0 inputs: 1 rlist: 1 outputs: 0 wlist: 0 #第一个客户端建立连接 inputs: 2 rlist: 0 outputs: 0 wlist: 0 inputs: 2 rlist: 1 outputs: 0 wlist: 0 #第二个客户端建立连接 inputs: 3 rlist: 0 outputs: 0 wlist: 0 inputs: 3 rlist: 0 outputs: 0 wlist: 0 inputs: 3 rlist: 0 outputs: 0 wlist: 0 inputs: 3 rlist: 0 outputs: 0 wlist: 0 inputs: 3 rlist: 0 outputs: 0 wlist: 0 inputs: 3 rlist: 1 outputs: 0 wlist: 0 #第一个客户端发送一条消息 inputs: 3 rlist: 0 outputs: 1 wlist: 1 #消息记录至outpu inputs: 3 rlist: 0 outputs: 0 wlist: 0 #返回客户端消息,outputs清空 inputs: 3 rlist: 0 outputs: 0 wlist: 0 inputs: 3 rlist: 1 outputs: 0 wlist: 0 inputs: 3 rlist: 0 outputs: 1 wlist: 1 inputs: 3 rlist: 0 outputs: 0 wlist: 0 inputs: 3 rlist: 1 outputs: 0 wlist: 0 inputs: 3 rlist: 0 outputs: 1 wlist: 1 inputs: 3 rlist: 1 outputs: 0 wlist: 0 inputs: 3 rlist: 0 outputs: 1 wlist: 1 inputs: 3 rlist: 1 outputs: 0 wlist: 0 inputs: 3 rlist: 0 outputs: 1 wlist: 1 inputs: 3 rlist: 0 outputs: 0 wlist: 0 inputs: 3 rlist: 0 outputs: 0 wlist: 0
最后,这就根据select方法实现了IO多路复用,即一个简单的服务端和客户端伪并发的通信
在socket基础中,我们最后说的使用socketserver实现多线程并发处理客户端请求,下面进行深入剖析实现原理
import socketserver import subprocess class MyServer(socketserver.BaseRequestHandler): #继承 def handle(self): #handle方法。注意此时send和recv时调用的self.request方法 self.request.sendall(bytes(‘Welcome‘,encoding=‘utf-8‘)) while True: try: recv_data = self.request.recv(1024) if not recv_data: break p = subprocess.Popen(str(recv_data, encoding=‘utf-8‘), shell=True, stdout=subprocess.PIPE,stderr=subprocess.PIPE) res = p.stdout.read() if not res: send_data = p.stderr.read() else: send_data = res if not send_data: send_data = ‘no output‘.encode() data_size = len(send_data) self.request.send(bytes(str(data_size), encoding=‘utf-8‘)) self.request.recv(1024) self.request.send(send_data) except Exception: break if __name__ == ‘__main__‘: server = socketserver.ThreadingTCPServer((‘127.0.0.1‘,8080),MyServer) #启动server server.serve_forever()
下面开始从外到内进行剖析
2.执行ThreadingTCPServer类的构造方法__init__,根据类的多继承原则,子类不存在的话依次查找父类
查找 父类TreadingMinIn是否有__init__方法,结果没有,其没有父类,本段查询终止
```python class ThreadingMixIn: """Mix-in class to handle each request in a new thread.""" # Decides how threads will act upon termination of the # main process daemon_threads = False def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request) def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
查找父类TCPServer是否有__init__方法,
class TCPServer(BaseServer): address_family = socket.AF_INET socket_type = socket.SOCK_STREAM request_queue_size = 5 allow_reuse_address = False def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) if bind_and_activate: try: self.server_bind() self.server_activate() except: self.server_close() raise
TCPServer有构造方法,但 BaseServer.__init__(self, server_address, RequestHandlerClass)实现了首先执行父类BaseServer的构造方法
class BaseServer: timeout = None def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" self.server_address = server_address self.RequestHandlerClass = RequestHandlerClass self.__is_shut_down = threading.Event() self.__shutdown_request = False
从此构造方法中,server_address为定义的ip和端口,RequestHandlerClass为自己定义的class类MyServer
执行sever_bind()方法,实现绑定ip和端口
def server_bind(self): """Called by constructor to bind the socket. May be overridden. """ if self.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) self.server_address = self.socket.getsockname()
执行server_activate()方法,实现监听
def server_activate(self): """Called by constructor to activate the server. May be overridden. """ self.socket.listen(self.request_queue_size)
此时socket已经创建完毕
3.构造方法执行完之后,开始执行下面的server的serve_forever()方法
def serve_forever(self, poll_interval=0.5): self.__is_shut_down.clear() try: with _ServerSelector() as selector: selector.register(self, selectors.EVENT_READ) while not self.__shutdown_request: ready = selector.select(poll_interval) if ready: self._handle_request_noblock() self.service_actions() finally: self.__shutdown_request = False self.__is_shut_down.set()
def _handle_request_noblock(self): try: request, client_address = self.get_request() except OSError: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except: self.handle_error(request, client_address) self.shutdown_request(request)
def process_request(self, request, client_address): t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request)
def finish_request(self, request, client_address): """Finish one request by instantiating RequestHandlerClass.""" self.RequestHandlerClass(request, client_address, self)
class BaseRequestHandler: def __init__(self, request, client_address, server): self.request = request self.client_address = client_address self.server = server self.setup() try: self.handle() finally: self.finish() def setup(self): pass def handle(self): pass def finish(self): pass
总结
上述只是一个简单的查找源码实现多线程的过程,查找过程中一定要捋清楚每个父类的方法,根据多继承原则进行查找和执行,上述过程简单做成流程图如下:
?
标签:
原文地址:http://www.cnblogs.com/pycode/p/sokect2.html