标签:callback 数据 进程安全 数据传递 系统调用 用户交互 进程启动 僵尸 cal
http://www.cnblogs.com/linhaifeng/articles/6817679.html
理论:http://www.cnblogs.com/linhaifeng/articles/7430066.html
链接:http://www.cnblogs.com/linhaifeng/articles/7428874.html
1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口
2:管理、调度进程,并且将多个进程对硬件的竞争变得有序
1.产生背景:针对单核,实现并发
ps:
现在的主机一般是多核,那么每个核都会利用多道技术
有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个cpu中的任意一个,具体由操作系统调度算法决定。
2.空间上的复用:如内存中同时有多道程序
3.时间上的复用:复用一个cpu的时间片
强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样才能保证下次切换回来时,能基于上次切走的位置继续运行
进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。
举例(单核+多道,实现多个进程的并发执行):
egon在一个时间段内有很多任务要做:python备课的任务,写书的任务,交女朋友的任务,王者荣耀上分的任务,但egon同一时刻只能做一个任务(cpu同一时间只能干一个活),如何才能玩出多个任务并发执行的效果?
egon备一会课,再去跟李杰的女朋友聊聊天,再去打一会王者荣耀....这就保证了每个任务都在进行中。
程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。
并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发,(并行也属于并发)
并行:同时运行,只有具备多个cpu才能实现并行
#阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。
#举例:
#1. 同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态);
#2. 阻塞调用:当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数,则当前线程就会被挂起,直到有数据为止。
#非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。
阻塞与非阻塞针对的是进程或线程:阻塞是当请求不能满足的时候就将进程挂起,而非阻塞则不会阻塞当前进程
一个进程由三种状态:
1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)
2. 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)
3. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)
4. 一个批处理作业的初始化(只在大型机的批处理系统中应用)
进程都是由操作系统开启的,开进程时先给操作系统发信号,再由操作系统开启进程
1.相同的是:进程创建后,父进程和子进程有各自不同的地址空间(多道技术要求物理层面实现进程之间内存的隔离),任何一个进程的在其地址空间中的修改都不会影响到另外一个进程。
2.不同的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,提示:子进程和父进程是可以有只读的共享内存区的。但是对于windows系统来说,从一开始父进程与子进程的地址空间就是不同的。
linux子进程和父进程的初始状态一样
windows子进程和父进程的初始状态就不同
1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出错退出(自愿,python a.py中a.py不存在)
3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)
4. 被其他进程杀死(非自愿,如kill -9)
定义一个函数
from multiprocessing import Process
import time
def work(name):
print(‘%s is piaoing‘ %name)
time.sleep(3)
print(‘%s piao end‘ %name)
if __name__ == ‘__main__‘: #在windows系统上必须要在__main__下调用
# Process(target=work,kwargs={‘name‘:‘alex‘})
p=Process(target=work,args=(‘alex‘,)) #target函数名,args参数
p.start()
print(‘主‘)
子进程结束后,子进程的资源由父进程回收掉,所以主进程要在子进程结束后再终止,如果子进程没有终止而主进程突然被终止,那么子进程的资源无法回收,会成为僵尸进程。
from multiprocessing import Process
import time
class Work(Process):
def __init__(self,name):
super().__init__() #重用父类的方法
self.name=name
def run(self): #类下面的run方法是固定的
print(‘%s is piaoing‘ %self.name)
time.sleep(2)
print(‘%s piao end‘ %self.name)
if __name__ == ‘__main__‘:
p=Work(‘wupeiqi‘)
p.start()
print(‘主‘)
from multiprocessing import Process
import time,random
def work(name):
print(‘%s is piaoing‘ %name)
time.sleep(random.randint(1,3))
print(‘%s piao end‘ %name)
if __name__ == ‘__main__‘:
p1=Process(target=work,args=(‘alex‘,))
p2=Process(target=work,args=(‘wupeiqi‘,))
p3=Process(target=work,args=(‘yuanhao‘,))
p1.start()
p2.start()
p3.start()
print(‘主‘)
os.getpid() 查看进程的id号
os.getppid() 查看进程的父进程的id号
from multiprocessing import Process
import time,random,os
def work():
print(‘子进程的pid:%s,父进程的pid:%s‘ %(os.getpid(),os.getppid()))
time.sleep(3)
if __name__ == ‘__main__‘:
p1=Process(target=work)
p2=Process(target=work)
p3=Process(target=work)
p1.start()
p2.start()
p3.start()
print(‘主‘,os.getpid(),os.getppid())
主进程的父进程是pycharm的进程号
from multiprocessing import Process
n=100
def work():
global n
n=0
print(‘子‘,n)
if __name__ == ‘__main__‘:
p=Process(target=work)
p.start()
print(‘主‘,n)
服务端:
import socket
from multiprocessing import Process
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
phone.bind((‘127.0.0.1‘,8012))
phone.listen(5)
print(‘starting...‘)
def talk(conn):
print(phone)
while True: #通信循环
try:
data=conn.recv(1024) #最大收1024
print(data)
if not data:break #针对linux
conn.send(data.upper())
except Exception:
break
conn.close()
if __name__ == ‘__main__‘:
while True:
conn,addr=phone.accept()
print(‘IP:%s,PORT:%s‘ %(addr[0],addr[1]))
p=Process(target=talk,args=(conn,))
p.start()
print(‘===?>‘)
phone.close()
客户端:
import socket
#1、买手机
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#2、打电话
phone.connect((‘127.0.0.1‘,8012))
#3、发收消息
while True:
msg=input(‘>>: ‘).strip()
if not msg:continue
phone.send(msg.encode(‘utf-8‘))
data=phone.recv(1024)
print(data.decode(‘utf-8‘))
#4、挂电话
phone.close()
服务端开的进程数最好最多开的数目和cup的核数一样多
os.cpu_count() 查看cpu核数
from multiprocessing import Process
import time
def work(name,n):
print(‘%s is piaoing‘ %name)
time.sleep(n)
print(‘%s piao end‘ %name)
if __name__ == ‘__main__‘:
start_time=time.time()
p1=Process(target=work,args=(‘alex‘,1))
p2=Process(target=work,args=(‘wupeiqi‘,2))
p3=Process(target=work,args=(‘yuanhao‘,3))
# p1.start()
# p2.start()
# p3.start()
# p3.join() #主进程等,等待子进程结束后,主进程再执行后面的代码
# p2.join() #主进程等,等待子进程结束后,主进程再执行后面的代码
# p1.join() #主进程等,等待子进程结束后,主进程再执行后面的代码
p_l=[p1,p2,p3]
for p in p_l:
p.start()
for p in p_l:
p.join()#主进程等,等待子进程结束后,主进程再执行后面的代码
stop_time=time.time()
print(‘主‘,(stop_time-start_time))
terminate() 关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活,如果被关闭的进程有子进程,这个方法并不会把子进程也关闭,所以这个方法不要用
is_alive() 查看进程是否存活,True为存活,False为不存活
from multiprocessing import Process
import time
def work(name,n):
print(‘%s is piaoing‘ %name)
time.sleep(n)
print(‘%s piao end‘ %name)
if __name__ == ‘__main__‘:
p1=Process(target=work,args=(‘alex‘,1))
p1.start()
p1.terminate()
time.sleep(1)
print(p1.is_alive())
print(‘主‘)
name()获取进程名
pid()获取进程pid不要用,一般用os.getpid()
实现ftp server端和client端的交互
服务端:
import socketserver
class MyServer(socketserver.BaseRequestHandler):
def handle(self):
conn = self.request
conn.sendall(bytes(‘欢迎致电 10086,请输入1xxx,0转人工服务.‘,encoding=‘utf-8‘))
Flag = True
while Flag:
data = conn.recv(1024).decode(‘utf-8‘)
if data == ‘exit‘:
Flag = False
elif data == ‘0‘:
conn.sendall(bytes(‘通过可能会被录音.balabala一大推‘,encoding=‘utf-8‘))
else:
conn.sendall(bytes(‘请重新输入.‘,encoding=‘utf-8‘))
if __name__ == ‘__main__‘:
server = socketserver.ThreadingTCPServer((‘127.0.0.1‘,8008),MyServer)
server.serve_forever()
客户端:
import socket
ip_port = (‘127.0.0.1‘,8008)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)
while True:
data = sk.recv(1024).decode(‘utf-8‘)
print(‘receive:‘,data)
inp = input(‘please input:‘)
sk.sendall(bytes(inp,encoding=‘utf-8‘))
if inp == ‘exit‘:
break
sk.close()
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
#主进程代码运行完毕,守护进程就会结束
from multiprocessing import Process
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
if __name__ == ‘__main__‘:
p1=Process(target=foo)
p2=Process(target=bar)
p1.daemon=True
p1.start()
p2.start()
print("main-------")
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,
竞争带来的结果就是错乱,如何控制,就是加锁处理
并发运行,效率高,但竞争同一打印终端,带来了打印错乱
from multiprocessing import Process
import os,time
def work():
print(‘%s is running‘ %os.getpid())
time.sleep(2)
print(‘%s is done‘ %os.getpid())
if __name__ == ‘__main__‘:
for i in range(3):
p=Process(target=work)
p.start()
由并发变成了串行,牺牲了运行效率,但避免了竞争
from multiprocessing import Process,Lock
import os,time
def work(lock):
lock.acquire()
print(‘%s is running‘ %os.getpid())
time.sleep(2)
print(‘%s is done‘ %os.getpid())
lock.release()
if __name__ == ‘__main__‘:
lock=Lock()
for i in range(3):
p=Process(target=work,args=(lock,))
p.start()
文件当数据库,模拟抢票
#文件db的内容为:{"count":1}
#注意一定要用双引号,不然json无法识别
from multiprocessing import Process,Lock
import time,json,random
def search():
dic=json.load(open(‘db.txt‘))
print(‘\033[43m剩余票数%s\033[0m‘ %dic[‘count‘])
def get():
dic=json.load(open(‘db.txt‘))
time.sleep(0.1) #模拟读数据的网络延迟
if dic[‘count‘] >0:
dic[‘count‘]-=1
time.sleep(0.2) #模拟写数据的网络延迟
json.dump(dic,open(‘db.txt‘,‘w‘))
print(‘\033[43m购票成功\033[0m‘)
def task(lock):
search()
get()
if __name__ == ‘__main__‘:
lock=Lock()
for i in range(30): #模拟并发100个客户端抢票
p=Process(target=task,args=(lock,))
p.start()
购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全
#文件db的内容为:{"count":1}
#注意一定要用双引号,不然json无法识别
from multiprocessing import Process,Lock
import time,json,random
def search():
dic=json.load(open(‘db.txt‘))
print(‘\033[43m剩余票数%s\033[0m‘ %dic[‘count‘])
def get():
dic=json.load(open(‘db.txt‘))
time.sleep(0.1) #模拟读数据的网络延迟
if dic[‘count‘] >0:
dic[‘count‘]-=1
time.sleep(0.2) #模拟写数据的网络延迟
json.dump(dic,open(‘db.txt‘,‘w‘))
print(‘\033[43m购票成功\033[0m‘)
def task(lock):
search()
lock.acquire()
get()
lock.release()
if __name__ == ‘__main__‘:
lock=Lock()
for i in range(5): #模拟并发100个客户端抢票
p=Process(target=task,args=(lock,))
p.start()
#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理
#因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
#1 队列和管道都是将数据存放于内存中
#2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
参数:maxsize是队列中允许最大项数,省略则无大小限制。
用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
同q.get(False)
同q.put(False)
调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
from multiprocessing import Queue
q=Queue(3)
q.put({‘a‘:1})
q.put(‘bbbb‘)
q.put((3,2,1))
# q.put_nowait(1111111)
print(q.get())
print(q.get())
print(q.get())
# print(q.get_nowait())
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
生产者:生产数据
消费者:处理数据
生产者消费者模型:解耦,加入队列,解决生产者与消费者之间的速度差
from multiprocessing import Queue,Process
import time,random
def producer(name,q):
for i in range(10):
time.sleep(random.randint(1,3))
res=‘泔水%s‘ %i
q.put(res)
print(‘厨师 %s 生产了 %s‘ %(name,res))
def consumer(name,q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print(‘%s 吃了 %s‘ %(name,res))
if __name__ == ‘__main__‘:
q=Queue()
p1=Process(target=producer,args=(‘egon‘,q))
c1=Process(target=consumer,args=(‘alex‘,q))
p1.start()
c1.start()
p1.join()
q.put(None)
from multiprocessing import JoinableQueue,Process
import time,random
def producer(name,q,food):
for i in range(1):
time.sleep(random.randint(1,3))
res=‘%s%s‘ %(food,i)
q.put(res)
print(‘厨师 %s 生产了 %s‘ %(name,res))
q.join()
def consumer(name,q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print(‘%s 吃了 %s‘ %(name,res))
q.task_done() # 队列中减一个
if __name__ == ‘__main__‘:
q=JoinableQueue()
p1=Process(target=producer,args=(1,q,‘泔水‘))
p2=Process(target=producer,args=(2,q,‘骨头‘))
p3=Process(target=producer,args=(3,q,‘馒头‘))
c1=Process(target=consumer,args=(‘alex‘,q))
c2=Process(target=consumer,args=(‘wupeiqi‘,q))
c1.daemon=True
c2.daemon=True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
from multiprocessing import Manager,Process,Lock
def work(d,lock):
with lock:
temp=d[‘count‘]
d[‘count‘]=temp-1
if __name__ == ‘__main__‘:
m=Manager()
d=m.dict({"count":100})
# m.list()
lock=Lock()
p_l=[]
for i in range(100):
p=Process(target=work,args=(d,lock))
p_l.append(p)
p.start()
for obj in p_l:
obj.join()
print(d)
如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程
Pool([numprocess [,initializer [, initargs]]]):创建进程池
#为什么要用进程池:为了实现并发,然后在并发的基础上对进程数目进行控制
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组
同步调用:提交完任务后,在原地等待任务结束,一旦结束可以立刻拿到结果
在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
异步调用:提交完任务后,不会在原地等待任务结束,会继续提交下一次任务,等到所有任务都结束后,才get结果
在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
等待所有工作进程退出。此方法只能在close()或teminate()之后调用
提交完任务后,在原地等待任务结束,一旦结束可以立刻拿到结果
正在运行的进程遇到io则进入阻塞状态
提交完任务后,不会在原地等待任务结束,会继续提交下一次任务,等到所有任务都结束后,才get结果
可能是运行状态,也可能是就绪状态
from multiprocessing import Pool
import os,time,random
def work(n):
print(‘%s is working‘ %os.getpid())
# time.sleep(random.randint(1,3))
return n**2
if __name__ == ‘__main__‘:
p=Pool(2)
objs=[]
for i in range(10):
# 同步调用:提交完任务后,在原地等待任务结束,一旦结束可以立刻拿到结果
# res=p.apply(work,args=(i,))
# print(res)
# 异步调用:提交完任务后,不会在原地等待任务结束,会继续提交下一次任务,等到所有任务都结束后,才get结果
obj=p.apply_async(work,args=(i,))
objs.append(obj)
p.close()
p.join()
for obj in objs:
print(obj.get())
print(‘主‘)
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
#obj=p.apply_async(get,args=(url,),callback=parse)
from multiprocessing import Pool,Process
import requests
import os
import time,random
def get(url):
print(‘%s GET %s‘ %(os.getpid(),url))
response=requests.get(url)
time.sleep(random.randint(1,3))
if response.status_code == 200:
print(‘%s DONE %s‘ % (os.getpid(), url))
return {‘url‘:url,‘text‘:response.text}
def parse(dic):
print(‘%s PARSE %s‘ %(os.getpid(),dic[‘url‘]))
time.sleep(1)
res=‘%s:%s\n‘ %(dic[‘url‘],len(dic[‘text‘]))
with open(‘db.txt‘,‘a‘) as f:
f.write(res)
if __name__ == ‘__main__‘:
urls=[
‘https://www.baidu.com‘,
‘https://www.python.org‘,
‘https://www.openstack.org‘,
‘https://help.github.com/‘,
‘http://www.sina.com.cn/‘
]
p=Pool(2)
start_time=time.time()
objs=[]
for url in urls:
obj=p.apply_async(get,args=(url,),callback=parse) #主进程负责干回调函数的活
objs.append(obj)
p.close()
p.join()
print(‘主‘,(time.time()-start_time))
标签:callback 数据 进程安全 数据传递 系统调用 用户交互 进程启动 僵尸 cal
原文地址:http://www.cnblogs.com/xiaojinyu/p/7662994.html