标签:task 情况下 win pre 并发编程 文件系统 als 官方 经理
首先明确一点,无论是多进程还是多线程,主进程或主线程都会等待子进程或子线程退出才会退出。
无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁. 需要强调的是:运行完毕并非终止运行
1.对主进程来说,运行完毕指的是主进程代码运行完毕
2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
也就是说:
import os
import time
from multiprocessing import Process
def task1():
while True:
print('task1', os.getpid())
time.sleep(1)
def task2():
while True:
print('task2')
time.sleep(1.5)
if __name__ == '__main__':
p1 = Process(target=task1)
p1.daemon = True
p1.start()
p2 = Process(target=task2)
p2.start()
# task1 不会被执行,因为进程的开启是比线程慢的,所以一般情况下是主进程代码执行完毕再执行子进程
print('main over')
GIL锁是一把cpython解释器帮我们加的互斥锁,加这把锁是基于内存管理机制考虑的。如果没有这把锁,再考虑多个线程同时只能被一个CPU处理的情况,因为没有锁的多线程的并发肯定会设计到资源抢占。垃圾回收机制的活还没干完就被另外一个线程抢走了CPU的执行权限,恰好这个线程又要对垃圾回收要处理的一个变量做相关操作(比如加1),那这种情况垃圾回收就没有意义了。所以,在进程空间的外面加了一把锁,如果垃圾回收机制抢到这把锁,把垃圾回收的活干完释放锁,这样垃圾回收机制才能实现。
GIL锁和我们自己在程序申明的锁有什么区别?GIL锁可以看做是整个进程空间出口上的锁,而我们自己申明的锁是用来锁住进程里面的的数据的。
多进程是数据隔离的,为啥还需要锁呢?因为多进程虽然是数据隔离,但是却共享文件系统和打印终端。如果是开启多个进程队文件进程读写操作,那么就需要用到锁
from multiprocessing import Process,Lock
import time,json
def search():
dic=json.load(open('ticket.txt'))
print('\033[43m剩余票数%s\033[0m' %dic['count'])
def get():
dic=json.load(open('ticket.txt'))
time.sleep(0.1) # 模拟读数据的网络延迟,这里是为了等待其他进程开启并完成load操作
if dic['count'] > 0:
dic['count'] -= 1
time.sleep(0.2) # 模拟写数据的网络延迟,以防dump的w模式清空文件但是另一个进程在search的时候load空文件会报错
json.dump(dic,open('ticket.txt','w'))
print('\033[43m购票成功\033[0m')
def task(lock):
search()
get()
if __name__ == '__main__':
lock=Lock()
for i in range(100): #模拟并发100个客户端抢票
p=Process(target=task,args=(lock,))
p.start()
加锁
from multiprocessing import Process,Lock
import time,json
from multiprocessing import Lock
def search():
dic=json.load(open('ticket.txt'))
print('\033[43m剩余票数%s\033[0m' %dic['count'])
def get():
dic=json.load(open('ticket.txt'))
time.sleep(0.1) # 模拟读数据的网络延迟,这里是为了等待其他进程开启并完成load操作
if dic['count'] > 0:
dic['count'] -= 1
time.sleep(0.2) # 模拟写数据的网络延迟,以防dump的w模式清空文件但是另一个进程在search的时候load空文件会报错
json.dump(dic,open('ticket.txt','w'))
print('\033[43m购票成功\033[0m')
def task(lock):
search()
lock.acquire()
get()
lock.release()
if __name__ == '__main__':
lock=Lock()
for i in range(100): #模拟并发100个客户端抢票
p=Process(target=task,args=(lock,))
p.start()
这里不是在get函数里面加锁,而是另外封装了一个task函数,在task函数里面给get() 加锁
无论是进程还是线程的队列,都已经帮我们做过加锁处理了,而且队列里可以丢进去自定义的对象,还可以丢None对象。线程可以共享全局变量,进程虽然不能共享全局变量,可以有可以共享数据的方式。同线程共享
数据就会有抢占资源的情况,进程如果使用共享数据的方式,也会出现资源竞争的情况。multiprocessing的Manager其实就是另外开一个进程,在这个进程里面开辟一块共享内存。
同步使用方式(不常用)
from multiprocessing import Pool
import os,time
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l=[]
for i in range(10):
res=p.apply(work,args=(i,)) #同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限
res_l.append(res)
print(res_l)
异步
from multiprocessing import Pool
import os,time
def work(n):
time.sleep(1)
return n**2
if __name__ == '__main__':
p=Pool(os.cpu_count()) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res
res_l.append(res)
#异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
p.close()
p.join()
# join之后get() 就能立即拿到值,如果注释掉上面两句代码,那么是每3个打印一次,没有结果的get会出现阻塞
for res in res_l:
print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
回调
from multiprocessing import Pool
import os,time
def work(n):
time.sleep(1)
return n**2
def get_data(data):
# 会把work return的结果传给get_data 做参数
print('得到的数据是:', data)
if __name__ == '__main__':
p=Pool(os.cpu_count()) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l=[]
# 回调函数是主进程执行的,如果主进程调用time.sleep(10000),当work执行完毕之后
# 操作系统就会跟主进程说,别睡了,快去处理任务,处理完再睡
for i in range(10):
p.apply_async(work,args=(i,), callback=get_data) #同步运行,阻塞、直到本次任务执行完毕拿到res
# 因为是异步的,如果不加下面两句,那么主进程退出,池子里的任务还没执行
p.close()
p.join()
对结果的处理方式一般就两种: 一种是拿到结果立即调用,另外一个是把结果拿到做统一处理,推荐使用第一种方式
import time
from concurrent.futures import ProcessPoolExecutor
def work(n):
time.sleep(1)
return n**2
def get_data(res):
# 传的参数res是一个对象
print('得到的数据是:', res.result())
if __name__ == '__main__':
executor = ProcessPoolExecutor(max_workers=5)
# futures = []
# for i in range(10):
# future = executor.submit(work, i).add_done_callback(get_data)
# futures.append(future)
# executor.shutdown()
# for future in futures:
# print(future.result())
for i in range(10):
executor.submit(work, i).add_done_callback(get_data)
# 下面不加shutdown也能执行池里的任务,只不过为了代码可读性,一般还是建议加上
map使用方式
from concurrent.futures import ProcessPoolExecutor
import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
executor = ProcessPoolExecutor(max_workers=3)
# for i in range(11):
# # concurrent 模块不加shutdown 主进程执行完毕会等池里的任务执行完毕程序才会结束,不同于Pool
# # 而且 concurrent 是用异步,没有Pool的同步方式
# executor.submit(task,i)
# map 得到的是一个存储结果(不需要.result()) 的可迭代对象
data_obj = executor.map(task,range(1,12)) # map取代了for+submit
for data in data_obj:
print(data)
python中的协程就是greenlet,就是切换+保存状态。线程是操作系统调度的,但是协程的切换是程序员进行调度的,操作系统对此“不可见”。既然要保存状态,那么肯定就会涉及到栈,协程也有自己的栈,只不过这个开销比线程小。单纯的协程并不能帮我们提高效率,只能帮我们保存上次运行的状态并做来回切换,所以大家又基于协程的切换+保存状态做了进一步的封装,让程序能遇到IO阻塞自动切换,如gevent模块等。gevent模块就是利用事件驱动库 libev 加 greenlet实现的。我们知道,事件循环是异步编程的底层基石。如果用户关注的层次很低,直接操作epoll去构造维护事件的循环,从底层到高层的业务逻辑需要层层回调,造成callback hell,并且可读性较差。所以,这个繁琐的注册回调与回调的过程得以封装,并抽象成EventLoop。EventLoop屏蔽了进行epoll系统调用的具体操作。对于用户来说,将不同的I/O状态考量为事件的触发,只需关注更高层次下不同事件的回调行为。诸如libev, libevent之类的使用C编写的高性能异步事件库已经取代这部分琐碎的工作。综上所述,当事件发生的时候通知用户程序进行协程的切换。准确来说,gevent是一个第三方异步模块,这个模块能让我像使用线程的方式去使用协程,而且这个模块需要socket是非阻塞socket的,所以一般在程序最开头加上monkey.patch_all()
.
import greenlet
def task1():
print('task1 start')
g2.switch()
print('task1 end')
g2.switch()
def task2():
print('task2 start')
g1.switch()
print('task2 end')
g1 = greenlet.greenlet(task1)
g2 = greenlet.greenlet(task2)
g1.switch()
gevent模块的基本使用
from gevent import spawn, joinall, monkey
import time
monkey.patch_all()
def task(i):
time.sleep(1)
print('----', i)
return i * 2
if __name__ == '__main__':
# spawn 的时候会创建一个协程并立即执行
res = [spawn(task, i) for i in range(10)]
joinall(res)
for g in res:
print(g.value)
现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。
用户态和内核态讲述的是CPU的状态,用户态是CPU可以执行的指令比较少,内核态是CPU可以处理的指令比较多,可以先这么简单地理解。
同步和异步,阻塞和非阻塞这两组的关注点是不同的。
同步是程序员的程序"亲自"主动去等结果,当然在这个等结果的过程中程序可以干别的事情(进程或线程的状态是非阻塞),但是程序还得过一段时间来查看是否已经有结果了,程序需要干两件事,只有一个角色
异步是程序员的程序发了个口号去要结果,然后就等另外一个"东西"通知我结果已经好了,这时候角色就有两个了。当然,你可以在这个过程去干别的事,也可以不干(除非你傻)。在这里的消息通知也可以看作是回调,
还记得上面主进程在sleep的时候当work的任务完成之后操作系统会把主进程叫醒执行回调函数吗?
阻塞和非阻塞描述的是进程或者线程所处的状态。
上面的四种不同的IO模型是针对两个阶段的不同状态而言的:1. 等待数据到内核空间 2. 把数据从内核空间拷贝到用户程序的进程空间
1、输入操作:read、readv、recv、recvfrom、recvmsg共5个函数,如果会阻塞状态,则会经理wait data和copy data两个阶段,如果设置为非阻塞则在wait 不到data时抛出异常
2、输出操作:write、writev、send、sendto、sendmsg共5个函数,在发送缓冲区满了会阻塞在原地,如果设置为非阻塞,则会抛出异常
3、接收外来链接:accept,与输入操作类似
4、发起外出链接:connect,与输出操作类似
对IO多路复用的补充:
这三种技术都是"古人"开发的,是操作系统级别的程序开发。
IO多路复用中的两种触发方式:
边缘触发:如果文件描述符自上次状态改变后有新的IO活动到来,此时会触发通知.在收到一个IO事件通知后要尽可能 多的执行IO操作,因为如果在一次通知中没有执行完IO那么就需要等到下一次新的IO活动到来才能获取到就绪的描述 符.信号驱动式IO就属于边缘触发。
from socket import *
import selectors
sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
conn,addr=server_fileobj.accept()
sel.register(conn,selectors.EVENT_READ,read)
def read(conn,mask):
try:
data=conn.recv(1024)
if not data:
print('closing',conn)
sel.unregister(conn)
conn.close()
return
conn.send(data.upper()+b'_SB')
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close()
server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #设置socket的接口为非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept
while True:
events=sel.select() #检测所有的fileobj,是否有完成wait data的
for sel_obj,mask in events:
callback=sel_obj.data #callback=accpet
callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)
#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))
while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
互斥锁只能acquire一次,可重复锁可以acquire多次,这里的acquire多次是针对单个线程而言的,如果A线程acquire了,在A线程内还能再次acquire(内部维护了一个计数器),但是其他线程就不能acquire
每个线程按照固定的顺序去获取锁就不会出问题,比如要想获取锁,必须遵循先A锁后B锁的顺序,那么一个线程获取A锁,那么另外的线程要想获取锁的时候也必须先拿A锁,以此来解决死锁问题。
import threading
import time
from contextlib import contextmanager
_local = threading.local()
# 统一接口,以后想要获取锁就用这个函数去获取,这个函数把锁做了排序
@contextmanager
def acquire(*locks):
locks = sorted(locks, key=lambda x: id(x))
# 这里用threading.local() 保存线程局部变量,主要用于嵌套去获取锁的情况
# 记录线程已经获取的锁,然后和要获取的锁的id做比较,顺序不符合就抛出异常
acquired_locks = getattr(_local, 'acquired', [])
if acquired_locks and max(id(lock) for lock in acquired_locks) >= id(locks[0]):
raise RuntimeError('出现死锁了,程序报错退出')
acquired_locks.extend(locks)
_local.acquired = acquired_locks
try:
for lock in locks:
lock.acquire()
yield
finally:
for lock in reversed(locks):
lock.release()
del acquired_locks[-len(locks):]
if __name__ == '__main__':
x_lock = threading.Lock()
y_lock = threading.Lock()
# def thread_1():
# with acquire(x_lock, y_lock):
# print('Thread-1')
#
# def thread_2():
# with acquire(y_lock, x_lock):
# print('Thread-2')
# t1 = threading.Thread(target=thread_1)
# t1.start()
# t2 = threading.Thread(target=thread_2)
# t2.start()
def thread_1():
with acquire(x_lock):
with acquire(y_lock):
print('Thread-1')
def thread_2():
with acquire(y_lock):
with acquire(x_lock):
print('Thread-2')
t1 = threading.Thread(target=thread_1)
t1.start()
t2 = threading.Thread(target=thread_2)
t2.start()
掌握python异步的起点是:epoll + callback + 事件循环
判断非阻塞调用是否就绪如果 OS 能做,是不是应用程序就可以不用自己去等待和判断了,就可以利用这个空闲去做其他事情以提高效率。
所以OS将I/O状态的变化都封装成了事件,如可读事件、可写事件。并且提供了专门的系统模块让应用程序可以接收事件通知。这个模块就是select。让应用程序可以通过select注册文件描述符和回调函数。当文件描述符的状态发生变化时,select 就调用事先注册的回调函数。
select因其算法效率比较低,后来改进成了poll,再后来又有进一步改进,BSD内核改进成了kqueue模块,而Linux内核改进成了epoll模块。这四个模块的作用都相同,暴露给程序员使用的API也几乎一致,区别在于kqueue 和 epoll 在处理大量文件描述符时效率更高。
Python标准库提供的selectors模块是对底层select/poll/epoll/kqueue的封装。DefaultSelector类会根据 OS 环境自动选择最佳的模块,那在 Linux 2.5.44 及更新的版本上都是epoll了
把I/O事件的等待和监听任务交给了 OS,那 OS 在知道I/O状态发生改变后(例如socket连接已建立成功可发送数据),它又怎么知道接下来该干嘛呢?只能回调。
需要我们将发送数据与读取数据封装成独立的函数,让epoll代替应用程序监听socket状态时,得告诉epoll:“如果socket状态变为可以往里写数据(连接建立成功了),请调用HTTP请求发送函数。如果socket 变为可以读数据了(客户端已收到响应),请调用响应处理函数。这里的回调是通知用户进程去做,而不是操作系统去执行回调函数。
这个循环是我们程序员在程序里写的循环,不是操作系统的epoll的循环,我们通过这个循环,去访问selector模块,等待它告诉我们当前是哪个事件发生了,应该对应哪个回调。这个等待事件通知的循环,称之为事件循环。
selector.select() 是一个阻塞调用,因为如果事件不发生,那应用程序就没事件可处理,所以就干脆阻塞在这里等待事件发生。那可以推断,如果只下载一篇网页,一定要connect()之后才能send()继而recv(),那它的效率和阻塞的方式是一样的。因为不在connect()/recv()上阻塞,也得在select()上阻塞。所以,selector机制是设计用来解决大量并发连接的。当系统中有大量非阻塞调用,能随时产生事件的时候,selector机制才能发挥最大的威力。
部分编程语言中,对异步编程的支持就止步于此(不含语言官方之外的扩展)。需要程序猿直接使用epoll去注册事件和回调、维护一个事件循环,然后大多数时间都花在设计回调函数上。
不论什么编程语言,但凡要做异步编程,上述的“事件循环+回调”这种模式是逃不掉的,尽管它可能用的不是epoll,也可能不是while循环。但是使用的异步方式基本都是 “等会儿告诉你” 模型的异步方式。
但是在asyncio异步编程中为什么没有看到 CallBack 模式呢?因为 Python 异步编程是用了协程帮我们取代了回调
标签:task 情况下 win pre 并发编程 文件系统 als 官方 经理
原文地址:https://www.cnblogs.com/longyunfeigu/p/9590722.html