标签:exit sam apply 进程安全 缓冲区 focus accept 定义类 ast
day09
进程
是程序的一次执行操作,每个进程都有自己的地址空间,内存,数据栈,及其他记录运行轨迹的辅助
数据.
进程间通过 interprocess communication(IPC)来通讯,不能直接共享信息
线程
有时候被称为轻量级进程,跟进程有些相似,不同的是 所有的线程运行在同一个进程中,共享相同的运行环境,
线程有开始,顺序执行和结束三部分,有自己的指令指针,可以记录自己运行到生么状态,
运行可能被强占(中断),或被暂时的挂起(睡眠),让其的线程运行(让步).线程之间可以更加
方便的共享数据以及通讯.
在单cpu中,真正的并发是不可能的,每个线程会被安排每次只运行一小会,然后把cpu让出来,让其他的线程来运行.
线程也是有危险的,如果多个线程访问同一片数据,则由于数据访问的顺序不一样,有可能导致数据结果不一致的问题.这叫 竟态条件.
另一个需要注意的是: 由于有的函数会在完成之前阻塞住,在咩有特别为多线程做修改的情况下,这种’贪婪’ 的函数会让cpu的时间分配有所倾斜,导致各个线程分配到的运行时间可能不尽相同,不尽公平.
py的全局解释锁
在这里只是简单的略作说明
py的代码python解释器来执行, python在设计之初就考虑到要在主循环中,同时只有一个线程在运行,就想单cpu的系统中运行多个进程那样,内存中可以存放多个程序,但在任意时刻,只有一个程序在cpu中运行,虽然py解释器中可以’运行’多个线程,但是在任意时刻,只有一个线程在解释器中运行
对py虚拟机的访问有全局解释器所(global interpreter lock,GIL) 来控制,正是这个锁能保证在同一时刻只有一个线程在运行.多线程环境中,py虚拟机的执行方式
详细的看这篇文章
python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍
group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=(1,2,‘egon‘,)
kwargs表示调用对象的字典,kwargs={‘name‘:‘egon‘,‘age‘:18}
name为子进程的名称
方法介绍
p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
属性介绍
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
p.name:进程的名称
p.pid:进程的pid
p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
一个简单的程序
第一种:
#!/usr/bin/env python
#coding:utf-8
from multiprocessing import Process
import os
import time
import random
def lala():
print(‘start......‘)
time.sleep(random.randint(1,3))
print(‘stop.....‘)
if __name__ == ‘__main__‘: #在Windows下一定要写这个额,要不然会报错,而且最好吧生成进场的东西写在下面
p1=Process(target=fuck,name=‘test‘)
p1.start() #这个star()会自动调用Process下的run方法,
print(‘主进程‘) #
结果:
‘‘‘
主进程
start......
stop.....
‘‘‘
第二种:
##自己定义类的方式实现
from multiprocessing import Process
import time
import random
class eat(Process):
def __init__(self,name):
super().__init__()
self.name=name
def run(self): # 真正运行的是这个地方
print(‘%s start eat‘ % self.name)
time.sleep(random.randint(1,3))
print(‘%s stop eat‘ % self.name)
if __name__ == ‘__main__‘:
p1 = eat(‘liu‘)
p1.start()
print(‘主进程‘)
sokcet 多并发
server端的
#!/usr/bin/env python
#coding:utf-8
from socket import *
from multiprocessing import Process
server= socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind((‘127.0.0.1‘,8080))
server.listen(5)
def talk(conn,client_addr):
while True:
try:
msg = conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == ‘__main__‘:
while True:
conn,client_addr=server.accept()
p= Process(target=talk,args=(conn,client_addr))
p.start()
client端的
#!/usr/bin/env python
#coding:utf-8
from socket import *
client = socket(AF_INET,SOCK_STREAM)
client.connect((‘127.0.0.1‘,8080))
while True:
msg = input(‘>>>‘).strip()
if not msg:break
client.send(msg.encode(‘utf-8‘))
msg=client.recv(1024)
print(msg.decode(‘utf-8‘))
但是这样就会有问题,如果要是有了上万个用户,我就得开始上万个进程,势必会对服务器造成很大的压力,
如何解决呢------>进程池
进程池解决
需要注意的是,这里开启的进程池,是根据你的cpu的核数来做的,可以使用os.cpu_count()来查看
我开了九个才看到效果 ->_->(fuck).
当我开到第九个的时候,输入字符,不会有任何的提示,会卡主,当我吧其中的一个client停掉的饿时候,
第九个立马接到返回值,而且第九个进程的id,是你停掉的那个进程的id.没错,就是这么神奇.
参数介绍
p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成5 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
obj.ready():如果调用完成,返回True
obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout]):等待结果变为可用。
obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
#!/usr/bin/python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Pool #这个模块里有Pool这个东西,
from socket import *
import os
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind((‘127.0.0.1‘,8080))
server.listen(5)
def talk(conn,addr):
print(os.getpid())
while True: #通讯循环
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == ‘__main__‘:
pool=Pool()
res_l=[]
while True: #链接循环
conn,addr=server.accept()
# print(addr)
# pool.apply(talk,args=(conn,addr))
res=pool.apply_async(talk,args=(conn,addr)) #看到了这个async,你就知道了这个是异步的.
res_l.append(res)
# print(res_l)
要是想看生么时候用apply,生么时候用apply_async,看这篇文章 反正没啥用
可以看看这篇博客
简单的来说
进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
from multiprocessing import Pool
import time,random,os
def get_page(url):
print(‘(进程 %s) 正在下载页面 %s‘ %(os.getpid(),url))
time.sleep(random.randint(1,3))
return url #用url充当下载后的结果
def parse_page(page_content):
print(‘<进程 %s> 正在解析页面: %s‘ %(os.getpid(),page_content))
time.sleep(1)
return ‘{%s 回调函数处理结果:%s}‘ %(os.getpid(),page_content)
if __name__ == ‘__main__‘:
urls=[
‘http://maoyan.com/board/1‘,
‘http://maoyan.com/board/2‘,
‘http://maoyan.com/board/3‘,
‘http://maoyan.com/board/4‘,
‘http://maoyan.com/board/5‘,
‘http://maoyan.com/board/7‘,
]
p=Pool()
res_l=[]
#异步的方式提交任务,然后把任务的结果交给callback处理
#注意:会专门开启一个进程来处理callback指定的任务(单独的一个进程,而且只有一个)
for url in urls:
res=p.apply_async(get_page,args=(url,),callback=parse_page)
res_l.append(res)
#异步提交完任务后,主进程先关闭p(必须先关闭),然后再用p.join()等待所有任务结束(包括callback)
p.close()
p.join()
print(‘{主进程 %s}‘ %os.getpid())
#收集结果,发现收集的是get_page的结果
#所以需要注意了:
#1. 当我们想要在将get_page的结果传给parse_page处理,那么就不需要i.get(),通过指定callback,就可以将i.get()的结果传给callback执行的任务
#2. 当我们想要在主进程中处理get_page的结果,那就需要使用i.get()获取后,再进一步处理
for i in res_l: #本例中,下面这两步是多余的
callback_res=i.get()
print(callback_res)
进程彼此之间互相隔离,要实现进程间通信,即IPC,multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
创建队列的类(底层就是以管道和锁定的方式实现)
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
参数介绍
maxsize是队列中允许最大项数,省略则无大小限制。
你要是想看的更加仔细,看这里
方法介绍
q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
一个简单的例子:
from multiprocessing import Queue,Process
import time
q = Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.full()) ##是不是满了
q.get()
q.get()
q.get()
print(q.empty())
q.get() #如果多了一个,那么这里,就会一直等,等到程序终止,或者 断电
生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
基于队列实现生产者消费者模型
#!/usr/bin/env python
#coding:utf-8
from multiprocessing import Queue,Process
import time,random,os
def consumer(q):
while True:
time.sleep(random.randint(1,3))
res= q.get()
print(‘\033[45m消费者拿到了:%s\033[0m‘ %res)
def producter(seq,q):
for item in seq:
time.sleep(random.randint(1,3))
print(‘\033[46m生产者生产了:%s\033[0m‘ % item)
q.put(item)
if __name__ == ‘__main__‘:
q=Queue()
seq=(‘包子%s‘ %i for i in range(10))
c = Process(target=consumer,args=(q,))
c.start()
producter(seq,q)
print(‘主进程‘)
会看到生产不一定会立马被消费,但一定会被消费,这就是队列.
完美一些的写法:
from multiprocessing import Process,JoinableQueue
import time
import random
def consumer(q,name):
while True:
# time.sleep(random.randint(1,3))
res=q.get()
q.task_done()
print(‘\033[41m消费者%s拿到了%s\033[0m‘ %(name,res))
def producer(seq,q,name):
for item in seq:
# time.sleep(random.randint(1,3))
q.put(item)
print(‘\033[42m生产者%s生产了%s\033[0m‘ %(name,item))
q.join()
print(‘============>>‘)
if __name__ == ‘__main__‘:
q=JoinableQueue()
c=Process(target=consumer,args=(q,‘egon‘),)
c.daemon=True #设置守护进程,主进程结束c就结束
c.start()
seq=[‘包子%s‘ %i for i in range(10)]
p=Process(target=producer,args=(seq,q,‘厨师1‘))
p.start()
# master--->producer----->q--->consumer(10次task_done)
p.join() #主进程等待p结束,p等待c把数据都取完,c一旦取完数据,p.join就是不再阻塞,进
# 而主进程结束,主进程结束会回收守护进程c,而且c此时也没有存在的必要了
print(‘主进程‘)
标签:exit sam apply 进程安全 缓冲区 focus accept 定义类 ast
原文地址:http://www.cnblogs.com/liukang/p/7101770.html