标签:roc 顺序 交互 leo register 清理 互斥锁 stream 编写
回顾:
进程
一个程序需要运行所需的资源的集合
每个进程数据是独立的
每个进程里至少有一个线程
进程里可以有多个线程
线程数据是共享的
一个进程的多个线 6程可以充分利用多核cpu
multiprocessing
pipe
queue
实现的是进程间的数据传递,通信
manager 实现了多进程间的数据共享
进程间共享数据的代价是高昂的,所以要尽量避免进程间的数据共享
线程间的数据本来就是共享的
线程要修改同一份数据,必须加锁,互斥锁mutex
event
线程间交互
生产者消费者模型
解耦 (降低进程间的依赖性)
提高程序运行效率
queue
FIFO
LIFO
优先级queue
适用场景:
线程:
I/O密集型(I/O不占用cpu),socket 爬虫 web
进程:cpu运算密集型,金融分析
协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
协程的好处:
缺点:
协程标准定义,即符合什么条件就能称之为协程:
1 # -*- coding:utf-8 -*- 2 3 4 from greenlet import greenlet 5 6 7 def test1(): 8 print(12) 9 gr2.switch() 10 print(34) 11 gr2.switch() 12 13 14 def test2(): 15 print(56) 16 gr1.switch() 17 print(78) 18 19 20 gr1 = greenlet(test1) 21 gr2 = greenlet(test2) 22 gr1.switch()
1 import gevent 2 3 def func1(): 4 print(‘\033[31;1m李闯在跟海涛搞...\033[0m‘) 5 gevent.sleep(2) 6 print(‘\033[31;1m李闯又回去跟继续跟海涛搞...\033[0m‘) 7 8 def func2(): 9 print(‘\033[32;1m李闯切换到了跟海龙搞...\033[0m‘) 10 gevent.sleep(1) 11 print(‘\033[32;1m李闯搞完了海涛,回来继续跟海龙搞...\033[0m‘) 12 13 14 gevent.joinall([ 15 gevent.spawn(func1), 16 gevent.spawn(func2), 17 #gevent.spawn(func3), 18 ])
import gevent def task(pid): """ Some non-deterministic task """ gevent.sleep(0.5) print(‘Task %s done‘ % pid) def synchronous(): for i in range(1,10): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in range(10)] gevent.joinall(threads) print(‘Synchronous:‘) synchronous() print(‘Asynchronous:‘) asynchronous()
select 多并发socket 例子
server
1 #_*_coding:utf-8_*_ 2 __author__ = ‘Alex Li‘ 3 4 import select 5 import socket 6 import sys 7 import queue 8 9 10 server = socket.socket() 11 server.setblocking(0) 12 13 server_addr = (‘localhost‘,10000) 14 15 print(‘starting up on %s port %s‘ % server_addr) 16 server.bind(server_addr) 17 18 server.listen(5) 19 20 21 inputs = [server, ] #自己也要监测呀,因为server本身也是个fd 22 outputs = [] 23 24 message_queues = {} 25 26 while True: 27 print("waiting for next event...") 28 29 readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里 30 31 for s in readable: #每个s就是一个socket 32 33 if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了, 34 #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀 35 #新连接进来了,接受这个连接 36 conn, client_addr = s.accept() 37 print("new connection from",client_addr) 38 conn.setblocking(0) 39 inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接 40 #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到 41 #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的 42 43 message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送 44 45 else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了 46 #客户端的数据过来了,在这接收 47 data = s.recv(1024) 48 if data: 49 print("收到来自[%s]的数据:" % s.getpeername()[0], data) 50 message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端 51 if s not in outputs: 52 outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端 53 54 55 else:#如果收不到data代表什么呢? 代表客户端断开了呀 56 print("客户端断开了",s) 57 58 if s in outputs: 59 outputs.remove(s) #清理已断开的连接 60 61 inputs.remove(s) #清理已断开的连接 62 63 del message_queues[s] ##清理已断开的连接 64 65 66 for s in writeable: 67 try : 68 next_msg = message_queues[s].get_nowait() 69 70 except queue.Empty: 71 print("client [%s]" %s.getpeername()[0], "queue is empty..") 72 outputs.remove(s) 73 74 else: 75 print("sending msg to [%s]"%s.getpeername()[0], next_msg) 76 s.send(next_msg.upper()) 77 78 79 for s in exeptional: 80 print("handling exception for ",s.getpeername()) 81 inputs.remove(s) 82 if s in outputs: 83 outputs.remove(s) 84 s.close() 85 86 del message_queues[s]
client
1 #_*_coding:utf-8_*_ 2 __author__ = ‘Alex Li‘ 3 4 5 import socket 6 import sys 7 8 messages = [ b‘This is the message. ‘, 9 b‘It will be sent ‘, 10 b‘in parts.‘, 11 ] 12 server_address = (‘localhost‘, 10000) 13 14 # Create a TCP/IP socket 15 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), 16 socket.socket(socket.AF_INET, socket.SOCK_STREAM), 17 ] 18 19 # Connect the socket to the port where the server is listening 20 print(‘connecting to %s port %s‘ % server_address) 21 for s in socks: 22 s.connect(server_address) 23 24 for message in messages: 25 26 # Send messages on both sockets 27 for s in socks: 28 print(‘%s: sending "%s"‘ % (s.getsockname(), message) ) 29 s.send(message) 30 31 # Read responses on both sockets 32 for s in socks: 33 data = s.recv(1024) 34 print( ‘%s: received "%s"‘ % (s.getsockname(), data) ) 35 if not data: 36 print(sys.stderr, ‘closing socket‘, s.getsockname() ) 37 复制代码
selectors模块
1 import selectors 2 import socket 3 4 sel = selectors.DefaultSelector() 5 6 def accept(sock, mask): 7 conn, addr = sock.accept() # Should be ready 8 print(‘accepted‘, conn, ‘from‘, addr) 9 conn.setblocking(False) 10 sel.register(conn, selectors.EVENT_READ, read) 11 12 def read(conn, mask): 13 data = conn.recv(1000) # Should be ready 14 if data: 15 print(‘echoing‘, repr(data), ‘to‘, conn) 16 conn.send(data) # Hope it won‘t block 17 else: 18 print(‘closing‘, conn) 19 sel.unregister(conn) 20 conn.close() 21 22 sock = socket.socket() 23 sock.bind((‘localhost‘, 10000)) 24 sock.listen(100) 25 sock.setblocking(False) 26 sel.register(sock, selectors.EVENT_READ, accept) 27 28 while True: 29 events = sel.select() 30 for key, mask in events: 31 callback = key.data 32 callback(key.fileobj, mask)
标签:roc 顺序 交互 leo register 清理 互斥锁 stream 编写
原文地址:http://www.cnblogs.com/heshaochuan/p/6216652.html