码迷,mamicode.com
首页 > 编程语言 > 详细

Python开发【第九篇】:协程、异步IO

时间:2017-07-18 01:54:15      阅读:353      评论:0      收藏:0      [点我收藏+]

标签:计数   locking   扩展模块   sock   unsigned   env   响应时间   threading   代码实现   

协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程,协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切换回来的时候,恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法,进入上一次离开时所处逻辑流的位置。

子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后A执行完毕。

所以子程序调用时通过栈实现的,一个线程就是执行一个子程序。子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。

协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:

  1. def a():
  2.     print("1")
  3.     print("2")
  4.     print("3")
  5.  
  6. def b():
  7.     print("x")
  8.     print("y")
  9.     print("z")

假设由程序执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:

  1. 1
  2. 2
  3. x
  4. y
  5. 3
  6. z

但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。看起来A、B的执行有点像多线程,但协程的特点在是一个线程执行,和多线程比协程有何优势?

最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是有程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

因为协程是一个线程执行,那么怎么利用多核CPU呢?最简单的方法是多进程加协程,既充分利用多核,有充分发挥协程的高效率,可获得极高的性能。

协程的优点:

无需线程上下文切换的开销。

无需原子操作锁定及同步的开销。原子操作(atomic operation)是不需要synchronized,所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何context switch(切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。

方便切换控制流,简化编程模型。

高并发+高扩展性+低成本。一个CPU支持上万的协程都不是问题,所以很适合用于高并发处理。

协程的缺点:

无法利用多核资源。协程的本质是个单线程,它不能同时将单个CPU的多个核用上,协程需要和进程配合才能运行在多CPU上。当然我们日常所编写的绝大部分应用都没有这个必要,除非是CPU密集型应用。

进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序。

使用yield实现协程操作。

  1. import time,queue
  2.  
  3. def consumer(name):
  4.     print("-->starting eating xoxo")
  5.     while True:
  6.         new_xo = yield
  7.         print("%s is eating xoxo %s"%(name,new_xo))
  8.  
  9. def producer():
  10.     r = con.__next__()
  11.     r = con2.__next__()
  12.     n = 0
  13.     while n < 5:
  14.         n += 1
  15.         con.send(n)
  16.         con2.send(n)
  17.         print("\033[32;1mproducer\033[0m is making xoxo %s"%n)
  18.  
  19. if __name__ == "__main__":
  20.     con = consumer("c1")
  21.     con2 = consumer("c2")
  22.     p = producer()
  23. 输出:
  24. -->starting eating xoxo
  25. -->starting eating xoxo
  26. c1 is eating xoxo 1
  27. c2 is eating xoxo 1
  28. producer is making xoxo 1
  29. c1 is eating xoxo 2
  30. c2 is eating xoxo 2
  31. producer is making xoxo 2
  32. c1 is eating xoxo 3
  33. c2 is eating xoxo 3
  34. producer is making xoxo 3
  35. c1 is eating xoxo 4
  36. c2 is eating xoxo 4
  37. producer is making xoxo 4
  38. c1 is eating xoxo 5
  39. c2 is eating xoxo 5
  40. producer is making xoxo 5

协程的特点:

1、必须在只有一个单线程里实现并发。

2、修改共享数据不需加锁。

3、用户程序里自己保持多个控制流的上下文栈。

4、一个协程遇到IO操作自动切换到其它协程。

刚才yield实现的不能算是合格的协程。

Python对协程的支持是通过generator实现的。在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回到下一个值。但是python的yield不但可以返回一个值,它可以接收调用者发出的参数。

Greenlet

greenlet是一个用C实现的协程模块,相比于Python自带的yield,它可以在任意函数之间随意切换,而不需把这个函数声明为generator。

  1. from greenlet import greenlet
  2.  
  3. def f1():
  4.     print(11)
  5.     gr2.switch()
  6.     print(22)
  7.     gr2.switch()
  8.  
  9. def f2():
  10.     print(33)
  11.     gr1.switch()
  12.     print(44)
  13.  
  14. gr1 = greenlet(f1)
  15. gr2 = greenlet(f2)
  16. gr1.switch()
  17. 输出:
  18. 11
  19. 33
  20. 22
  21. 44

以上例子还有一个问题没有解决,就是遇到IO操作自动切换。

Gevent

Gevent是一个第三方库,可以轻松提供gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它是以C扩展模块形式接入Python的轻量级协程。Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

  1. import gevent
  2.  
  3. def foo():
  4.     print("Running in foo")
  5.     gevent.sleep()
  6.     print("Explicit contenxt switch to foo agin")
  7.  
  8. def bar():
  9.     print("Explicit context to bar")
  10.     gevent.sleep(1)
  11.     print("Implict context switch back to bar")
  12.  
  13. def func3():
  14.     print("running func3")
  15.     gevent.sleep(0)
  16.     print("running func3 again")
  17.  
  18. gevent.joinall([
  19.      gevent.spawn(foo),
  20.      gevent.spawn(bar),
  21.      gevent.spawn(func3),
  22.     ])
  23. 输出:
  24. Running in foo
  25. Explicit context to bar
  26. running func3
  27. Explicit contenxt switch to foo agin
  28. running func3 again
  29. Implict context switch back to bar

同步与异步的性能区别

  1. import gevent
  2.  
  3. def f1(pid):
  4.     gevent.sleep(0.5)
  5.     print("F1 %s done"%pid)
  6.  
  7. def f2():
  8.     for i in range(10):
  9.         f1(i)
  10.  
  11. def f3():
  12.     threads = [gevent.spawn(f1,i) for i in range(10)]
  13.     gevent.joinall(threads)
  14.  
  15. print("f2")
  16. f2()
  17. print("f3")
  18. f3()
  19. 输出:
  20. f2
  21. F1 0 done
  22. F1 1 done
  23. F1 2 done
  24. F1 3 done
  25. F1 4 done
  26. F1 5 done
  27. F1 6 done
  28. F1 7 done
  29. F1 8 done
  30. F1 9 done
  31. f3
  32. F1 0 done
  33. F1 4 done
  34. F1 8 done
  35. F1 7 done
  36. F1 6 done
  37. F1 5 done
  38. F1 1 done
  39. F1 3 done
  40. F1 2 done
  41. F1 9 done

上面程序的重要部分是将f1函数封装到Greenlet内部线程的gevent.spawn。初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在所有greenlet执行完后才会继续向下走。

IO阻塞自动切换任务

  1. from urllib import request
  2. import gevent,time
  3. from gevent import monkey
  4.  
  5. # 把当前程序的所有的id操作给单独的做上标记
  6. monkey.patch_all()
  7. def f(url):
  8.     print("GET:%s"%url)
  9.     resp = request.urlopen(url)
  10.     data = resp.read()
  11.     f = open("load.txt","wb")
  12.     f.write(data)
  13.     f.close()
  14.     print("%d bytes received from %s."%(len(data),url))
  15.  
  16. urls = [‘https://www.python.org/‘,
  17.         ‘http://www.cnblogs.com/yinshoucheng-golden/‘,
  18.         ‘https://github.com/‘]
  19. time_start = time.time()
  20. for url in urls:
  21.     f(url)
  22. print("同步cost",time.time() - time_start)
  23.  
  24. async_time_start = time.time()
  25. gevent.joinall([
  26.     gevent.spawn(f,‘https://www.python.org/‘),
  27.     gevent.spawn(f,‘http://www.cnblogs.com/yinshoucheng-golden/‘),
  28.     gevent.spawn(f,‘https://github.com/‘),
  29. ])
  30. print("异步cost",time.time() - async_time_start)

通过gevent实现单线程下的多socket并发

server side

  1. import sys,socket,time,gevent
  2.  
  3. from gevent import socket,monkey
  4. monkey.patch_all()
  5.  
  6. def server(port):
  7.     s = socket.socket()
  8.     s.bind(("0.0.0.0",port))
  9.     s.listen(500)
  10.     while True:
  11.         cli,addr = s.accept()
  12.         gevent.spawn(handle_request,cli)
  13.  
  14. def handle_request(conn):
  15.     try:
  16.         while True:
  17.             data = conn.recv(1024)
  18.             print("recv:",data)
  19.             if not data:
  20.                 conn.shutdown(socket.SHUT_WR)
  21.             conn.send(data)
  22.     except Exception as ex:
  23.         print(ex)
  24.     finally:
  25.         conn.close()
  26.  
  27. if __name__ == "__main__":
  28.     server(6969)

client side

  1. import socket
  2.  
  3. HOST = "localhost"
  4. PORT = 6969
  5. s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  6. s.connect((HOST,PORT))
  7. while True:
  8.     msg = bytes(input(">>:"),encoding="utf8")
  9.     s.sendall(msg)
  10.     data = s.recv(1024)
  11.     # print(data)
  12.     print("Received",repr(data))
  13.  
  14. s.close()

socket并发

  1. import socket,threading
  2.  
  3. def sock_conn():
  4.     client = socket.socket()
  5.     client.connect(("localhost",6969))
  6.     count = 0
  7.  
  8.     while True:
  9.         client.send(("hello %s"%count).encode("utf-8"))
  10.         data = client.recv(1024)
  11.         print("%s from server:%s"%(threading.get_ident(),data.decode()))
  12.         count += 1
  13.     client.close()
  14.  
  15. for i in range(100):
  16.     t = threading.Thread(target=sock_conn)
  17.     t.start()

事件驱动与异步IO

写服务器处理模型的程序时,有一下几种模型:

(1)每收到一个请求,创建一个新的进程,来处理该请求。

(2)每收到一个请求,创建一个新的线程,来处理该请求。

(3)每收到一个请求,放入一个事件列表,让主程序通过非阻塞I/O方式来处理请求。

上面的几种方式,各有千秋。

第一种方法,由于创建新的进程,内存开销比较大。所以,会导致服务器性能比较差,但实现比较简单。

第二种方法,由于要涉及到线程的同步,有可能会面临死锁等问题。

第三种方法,在写应用程序代码时,逻辑比前面两种都复杂。

综合考虑各方面因素,一般普遍认为第三种方式是大多数网络服务器采用的方式。

在UI编程中,常常要对鼠标点击进行相应响应,首先如何获得鼠标点击呢?

方式一:创建一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有以下几个缺点。

1、CPU资源浪费,可能鼠标点击的频率非常小,但是扫描线程还是会一直循环检测,这会造成很多的CPU资源浪费;如果扫描鼠标点击的接口是阻塞的呢?

2、如果是阻塞的,又会出现下面这样的问题。如果我们不但要扫描鼠标点击,还要扫描键盘是否按下,由于扫描鼠标时被阻塞了,那么可能永远不会去扫描键盘。

3、如果一个循环需要扫描的设备非常多,这又会引起响应时间的问题。

所以,这种方式非常不好。

方式二:事件驱动模型

目前大部分的UI编程都是事件驱动模型。如很多UI平台都会提供onClick()事件,这个事件就代表鼠标点击事件。事件驱动模型大体思路如下。

1、有一个事件(消息)队列。

2、鼠标按下时,往这个队列中增加一个点击事件(消息)。

3、有一个循环,不断从队列取出事件。根据不同的事件,调出不同的函数,如onClick()、onKeyDown()等。

4、事件(消息)一般都各自保存各自的处理函数指针,这样每个消息都有独立的处理函数。

技术分享

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两个常见的编程范式是同步(单线程)以及多线程编程。

对比单线程、多线程以及事件驱动编程模型。下图表示随着时间的推移,这三种模式下程序所做的工作。这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间用灰色框表示。

技术分享

在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务必须等待,直到它完成之后才能依次执行其他操作。这种明确的执行顺序和串行化处理的行为可以看出,如果各任务之间并没有相互依赖的关系,但各任务执行仍然需要互相等待,就使得程序整体运行速度降低了。

在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交替执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。多线程程序更加难以判断,因为这类程序不得不通过线程同步机制加锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的BUG。

在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或其他等待操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。

I/O多路复用

同步I/O和异步I/O,阻塞I/O和非阻塞I/O分别是什么,到底有什么区别?本文讨论的背景是Linux环境下的network I/O。

概念说明

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操作系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对Linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。

从一个进程的运行转到另一个进程上运行,这个过程中经过下面过程:

1、保存处理机上下文,包括程序计数器和其他寄存器。

2、更新PCB信息。

3、把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。

4、选择另一个进程执行,并更新其PCB。

5、更新内存管理的数据结构。

6、恢复处理机上下文。

进程控制块(Processing Control Block),是操作系统核心中一种数据结构,主要表示进程状态。其作用是使一个在多道程序环境下不能独立运行的程序(含数据),成为一个能独立运行的基本单位或与其它进程并发执行的进程。或者说,操作系统OS是根据PCB来对并发执行的进程进行控制和管理的。PCB通常是系统内存占用区中的一个连续存放区,它存放着操作系统用于描述进程情况及控制进程运行所需的全部信息。

进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新任务执行等,则由系统自动执行阻塞(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行状态的进程(获得CPU),才能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。

文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些设计底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

缓存I/O

缓存I/O又被称作标准I/O,大多数文件系统的默认I/O操作都是缓存I/O。在Linux的缓存I/O机制中,操作系统会将I/O的数据缓存在文件系统的页缓存(page cache)中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缓存I/O的缺点:

数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的CPU以及内存开销是非常大的。

IO模式

对于一次IO访问(以read为例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。当一个read操作发生时,会经历两个阶段:

1、等待数据准备(waiting for the data to be ready)。

2、将数据从内核拷贝到进程中(Copying the data from the kernel to the process)。

正是因为这两个阶段,Linux系统产生了下面五种网络模式的方案。

阻塞I/O(blocking IO)。

非阻塞I/O(nonblocking IO)

I/O多路复用(IO multiplexing)

信号驱动I/O(signal driven IO)

异步I/O(asynchronous IO)

由于信号驱动I/O(signal driven IO)在实际中并不常用,所以只剩下四种IO模式。

阻塞I/O(blocking IO)

在Linux中,默认情况下所有的Socket都是blocking,一个典型的读操作流程如下:

技术分享

当用户进程调用了recvfrom,kernel就开始了IO的第一个阶段,准备数据。对于网络IO来说,很多时候数据在一开始还没有到达。比如还没有收到一个完整的UDP包,这个时候kernel就要等待足够的数据到来。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

非阻塞I/O(nonblocking IO)

Linux下,可以通过设置Socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程如下:

技术分享

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上将数据拷贝到了用户内存,然后返回。

所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。

I/O多路复用(IO multiplexing)

IO multiplexing就是平时所说的select、poll、epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select、poll、epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

技术分享

当用户进程调用了select,那么整个进程会被block。而同时kernel会"监视"所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

所以,I/O多了复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

这个图和blocking IO的图其实并没有太大的不同。事实上还更差一些,因为这里需要使用两个system call(select和recvfrom),而blocking IO只调用了一个system call(recvfrom)。但是用select的优势在于它可以同时处理多个connection。

实际在IO multiplexing Model中,对于每一个socket一般都设置成为non-blocking。但是如上图所示整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

异步I/O(asynchronous IO)

Linux下的asynchronous IO其实用得很少。

技术分享

用户进程发起read操作之后,离开就可以开始去做其它的事。而另一个方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

总结

blocking和non-blocking的区别

调用blocking IO会一直block,直到对应的进程操作完成。而non-blocking IO在kernel还在准备数据的情况下就会立刻返回。

synchronous IO和asynchronous IO的区别

在说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。POSIX的定义:

synchronous IO会导致请求进程被阻塞,直到该输I/O操作完成。

asynchronous IO不会导致请求进程被阻塞。

两者的区别就在于synchronous IO做"IO operation"的时候会将process阻塞。按照这个定义之前所述的blocking IO、non-blocking IO、IO multiplexing都属于synchronous IO。

有人认为non-blocking IO并没有被block,这里是非常容易误解的地方。定义中所指的"IO operation"是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,这段时间内进程是被block的。

而asynchronous IO则不一样,当进程发起IO操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中进程完全没有被block。

各个IO model的比较如下图:

技术分享

通过上面的图片可以发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程主动的check,并且当数据准备完成之后,也需要进程主动的再次调用recvfrom来讲数据拷贝到用户内存。而asynchronous IO则完全不同,它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后kernel做完后发信号通知。在此期间用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。

I/O多路复用select、poll、epoll详解

select、poll、epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select、poll、epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

select

  1. select(rlist,wlist,xlist,timeout=None)

select函数监视的文件描述符分3类,分别是writefds、readfds和execptfds。调用后select函数会阻塞,直到有描述符就绪(有数据可读、可写或有except)或者超时(timeout指定等待时间,如果立即返回设为null即可)函数返回。当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。

poll

  1. int poll(struct pollfd *fds,unsigned,int nfds,int timeout)

select使用了三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现。

  1. struct pollfd{
  2.     int fd; # 文件描述符
  3.     short events; # 请求
  4.     short revents; # 响应
  5. }

pollfd结构包含了要监视的event和发生的event,不再使用select"参数-值"传递的方式。同时pollfd并没有最大数量限制(但是数量过多后性能也是会下降)。和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

从上面可以看出,select和poll都需要在返回后通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

epoll

epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

epoll操作过程需要三个接口。

  1. int epoll_create(int size); # 创建一个epoll的句柄,size用来告诉内核监听的数量
  2. int epoll_ctl(int epfd,int op,int fd,struct epoll_event *event);
  3. int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout);

int epoll_create(int size);

创建一个epoll的句柄,size用来告诉内核监听的数量,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。

当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

int epoll_ctl(int epfd,int op,int fd,struct epoll_event *event);

函数是对指定描述符fd执行op操作。

epfd:epoll_create()的返回值。

op:op操作,用三个宏来表示,添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。

fd:需要监听的fd(文件描述符)。

epoll_event:内核需要监听的目标。

int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout);

等待epfd上的io事件,最多返回maxevents个事件。

参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定)。该函数返回需要处理的事件数目,如返回0表示已超时。

select、poll、epoll三者的区别

select

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核方式提升这一限制。

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增大。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

poll

poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制与用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll

直到Linux 2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux 2.6下性能最好的多路I/O就绪通知方法。

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它就不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行描述,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

Python select

Python的select()方法直接调用操作系统的IO接口,它监控sockets、open files、pipes(所有带fileno()方法的文件句柄)何时变成readable和writeable或者通信错误,select()使得同时监控多个连接变得简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。

注意:Using Python‘s file objects with select() works for Unix, but is not supported under Windows.

select_socket_server

  1. __author__ = ‘Golden‘
  2. #!/usr/bin/env python3
  3. # -*- coding:utf-8 -*-
  4.  
  5. import select,socket,sys,queue
  6.  
  7. server = socket.socket()
  8. server.setblocking(0)
  9. server_addr = (‘localhost‘,6969)
  10. print(‘starting up on %s port %s‘%server_addr)
  11. server.bind(server_addr)
  12. server.listen(5)
  13.  
  14. # 监测自己,因为server本身也是个fd
  15. inputs = [server,]
  16. outputs = []
  17. message_queues = {}
  18. while True:
  19.     print(‘waiting for next event...‘)
  20.     # 如果没有任何fd就绪,程序会一直阻塞在这里
  21.     readable,writeable,exeptional = select.select(inputs,outputs,inputs)
  22.     # 每个s就是一个socket
  23.     for s in readable:
  24.         # 上面server自己也当做一个fd放在了inputs列表里,传给了select,如果s是server代表server这个fd就绪了,即新的连接进来
  25.         if s is server:
  26.             # 接收这个连接
  27.             conn,client_addr = s.accept()
  28.             print(‘new connection from‘,client_addr)
  29.             conn.setblocking(0)
  30.             """
  31.             为了不阻塞整个程序,不会立刻在这里开始接收客户端发来的数据,把它放到inputs里,下一次loop时,
  32.             这个新连接就会被交给select去监听,如果这个连接的客户端发来了数据,那么这个连接的fd在server端就会变成就绪的,
  33.             select就会把这个数据返回到readable列表里,然后就可以loop readable列表,取出这个连接,开始接收数据
  34.             """
  35.             inputs.append(conn)
  36.             # 接收到客户端的数据后,不立刻返回,暂存在队列里,以后发送
  37.             message_queues[conn] = queue.Queue()
  38.         # s不是server那就只会是一个与客户端建立的连接的fd
  39.         else:
  40.             # 接收客户端的数据
  41.             data = s.recv(1024)
  42.             if data:
  43.                 print(‘收到来自【%s】的数据:‘%s.getpeername()[0],data)
  44.                 # 收到的数据先放入queue里,一会返回给客户端
  45.                 message_queues[s].put(data)
  46.                 if s not in outputs:
  47.                     # 为了不影响处理与其它客户端的连接,这里不立刻返回数据给客户端
  48.                     outputs.append(s)
  49.             # 如果收不到data,代表客户端已断开
  50.             else:
  51.                 print(‘客户端已断开...‘,s)
  52.                 if s in outputs:
  53.                     # 清理已断开的连接
  54.                     outputs.remove(s)
  55.                 # 清理已断开的连接
  56.                 inputs.remove(s)
  57.                 # 清理已断开的连接
  58.                 del message_queues[s]
  59.     for s in writeable:
  60.         try:
  61.             next_msg = message_queues[s].get_nowait()
  62.         except queue.Empty:
  63.             print(‘client [%s]‘%s.getpeername()[0],‘queue is empty...‘)
  64.             outputs.remove(s)
  65.         else:
  66.             print(‘sending msg to [%s]‘%s.getpeername()[0],next_msg)
  67.             s.send(next_msg.upper())
  68.     for s in exeptional:
  69.         print(‘handling exception for‘,s.getpeername())
  70.         inputs.remove(s)
  71.         if s in outputs:
  72.             outputs.remove(s)
  73.         s.close()
  74.         del message_queues[s]

select_socket_client

  1. __author__ = ‘Golden‘
  2. #!/usr/bin/env python3
  3. # -*- coding:utf-8 -*-
  4.  
  5. import socket,sys
  6.  
  7. messages = [b‘This is the message.‘,
  8.             b‘It will be sent‘,
  9.             b‘in parts.‘,
  10.             ]
  11.  
  12. server_address = (‘localhost‘,6969)
  13. # 创建一个TCP/IP连接
  14. socks = [socket.socket(socket.AF_INET,socket.SOCK_STREAM),
  15.          socket.socket(socket.AF_INET,socket.SOCK_STREAM),
  16.          socket.socket(socket.AF_INET,socket.SOCK_STREAM),]
  17. print(‘connecting to %s port %s‘%server_address)
  18. for s in socks:
  19.     s.connect(server_address)
  20.  
  21. for message in messages:
  22.     # 发送数据
  23.     for s in socks:
  24.         print(‘%s:sending "%s"‘%(s.getsockname(),message))
  25.         s.send(message)
  26.     # 接收数据
  27.     for s in socks:
  28.         data = s.recv(1024)
  29.         print(‘%s:received "%s"‘%(s.getsockname(),data))
  30.         if not data:
  31.             print(sys.stderr,‘closing socket‘,s.getsockname())

selectors

selectors模块可以实现IO多路复用,它具有根据平台选出最佳的IO多路机制,例如在windows上默认是select模式,而在linux上默认是epoll。常分为三种模式select、poll和epoll。

selector_socket_server:

  1. __author__ = ‘Golden‘
  2. #!/usr/bin/env python3
  3. # -*- coding:utf-8 -*-
  4.  
  5. import selectors,socket
  6.  
  7. sel = selectors.DefaultSelector()
  8.  
  9. def accept(sock,mask):
  10.     conn,addr = sock.accept()
  11.     print(‘accrpted‘,conn,‘form‘,addr)
  12.     conn.setblocking(0)
  13.     sel.register(conn,selectors.EVENT_READ,read)
  14.  
  15. def read(conn,mask):
  16.     data = conn.recv(1024)
  17.     if data:
  18.         print(‘echoing‘,repr(data),‘to‘,conn)
  19.         conn.send(data)
  20.     else:
  21.         print(‘closing‘,conn)
  22.         sel.unregister(conn)
  23.         conn.close()
  24.  
  25. sock = socket.socket()
  26. sock.bind((‘localhost‘,6969))
  27. sock.listen(100)
  28. sock.setblocking(0)
  29. sel.register(sock,selectors.EVENT_READ,accept)
  30.  
  31. while True:
  32.     events = sel.select()
  33.     for key,mask in events:
  34.         callback = key.data
  35.         callback(key.fileobj,mask)

 

 

 

Python开发【第九篇】:协程、异步IO

标签:计数   locking   扩展模块   sock   unsigned   env   响应时间   threading   代码实现   

原文地址:http://www.cnblogs.com/yinshoucheng-golden/p/7197987.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!