标签:函数 xiaoxiao llb 是的 foo 表示 man try call
############### 进程的启动方式1 ##############
"""
并发编程:
进程
1,运行中的程序,就是进程,程序是没有生命的实体,运行起来了就有生命了,
操作系统可以管理进程,进程是操作系统基本的执行单元,
2,每一个进程都有它自己的地址空间,进程之间是不会混的,比如qq不能访问微信的地址空间,
操作系统替你隔离开了,这也是操作系统引入进程这个概念的原因,
#######################################
进程的调度
1,先来先服务,有一个不好的,就是不利于短作业
2,短作业优先算法,但其对长作业不利;不能保证紧迫性作业(进程)被及时处理;作业的长短只是被估算出来的。
3,时间片轮转算法,就是轮流执行,已经很科学了,
4,多级反馈队列算法,有多个队列,有一个新任务来了放入第一个队列,这是优先级加上时间片轮转,第二个任务来了放入下一级,
#######################################
并发和并行:
进程的并行:这种只有在多核cpu才可以实现,
进程的并发:这是轮流执行,由于速度很快,看起来像是一起执行的,比如一遍听音乐,一遍写代码,
######################################
进程的三状态转换图:非常重要
1,进程一开始运行的时候,是就绪的状态,这是第一个状态,就是告诉cpu,我已经准备好可以运行了,进入排队了,
2,时间片轮转,轮到你了之后,你就运行了,这是第二个状态,
3,发生阻塞,这是第三个状态,比如你的程序让你输入内容,input方法, 这时候是阻塞的,你输入完毕了之后,就又畅通了,
这是等待I/O完成,input,sleep,文件的输入和输出,
事件处理之后,你还要进入就绪状态了,
全部处理完了,就结束了,
###########################################
同步和异步
1,同步,需要等待,需要排队,你什么也不能干,
2,异步,不需要等待,你可以去做其他事情,
###########################################
阻塞和非阻塞
1,阻塞,就是input,sleep这些,需要等待,这是阻塞,
2,非阻塞,就是跳过这些阻塞,但是程序中不可避免的需要阻塞,因为需要等待内容处理,
###########################################
同步异步和阻塞非阻塞:
同步阻塞,就是
同步非阻塞
异步阻塞
异步非阻塞,效率更高,
###############################################
多进程有一个内置的模块:
from multiprocessing import Process
要学习几个地方:
1,进程的注册
2,进程的开启
3,进程的异步,
4,进程的同步,join,
5,进程注册函数的参数,
6,启动多个子进程,
"""
import time
from multiprocessing import Process
import os
def f(args1,args2):
print("*"*args1)
time.sleep(1)
print("*"*args2)
# print("子进程号",os.getpid())
# print("子进程的父进程号",os.getppid())
# print(‘我是子进程‘)
if __name__ == ‘__main__‘:
# p = Process(target=f, args=(‘bob‘,))
# p = Process(target=f, args=(‘bob‘,"123")) # 注册,p是一个进程,还没有启动,这是主进程,
# args=(‘bob‘,),如果注册的函数是有参数的,就要传递参数,如果有一个参数括号内要有一个逗号,因为这是一个元组,
# p.start() # 启动进程,这是启动了一个子进程,
# 现在子进程和主进程之间是异步的,如果我想在子进程结束之后再执行下面的代码,变成同步,怎么办?
# p.join() # 这个join就是在感知一个子进程的一个结束,将异步改成同步,
# time.sleep(1)
# print("父进程号",os.getpid())
# print("父进程的父进程号",os.getppid()) # 这个就是pycharm的进程号,
# print(‘执行主进程的内容了‘) # 这一句的执行和子进程的执行内容是异步的,不是同步的,
# 开启10个子进程
p_list=[]
for i in range(10):
p = Process(target=f, args=(10*i, 20*i))
p_list.append(p)
p.start()
# print("第%d轮"%(i+1))
# p.join()
[p.join() for p in p_list ] # 保证前面的10个进程全部结束了,才会执行下面的代码,
print("运行完了")
# 这种开启了多进程,可以读多个进程去存文件,取文件内容,
# 进程的生命周期,
# 主进程没有开启子进程,就是执行完他的代码就结束了了
# 子进程也是执行完自己的代码就结束了,
# 开启了子进程的主进程,主进程执行完了,要等待子进程结束之后,主进程才可以结束,
############### 进程的启动方式2 和 进程之间是数据隔离的 ##############
# 进程的启动方式2
# 第一点,创建一个类,继承process
# 第二点,类中必须实现run方法,这个run方法里面就是子进程要执行的内容,
import os
from multiprocessing import Process
class MyProcess(Process): # 继承导入的process,
def __init__(self,name): # 为了进程能传递参数,
super().__init__() # 这是继承了父类所有的参数,
self.name=name
def run(self):
# print(os.getpid())
print("子进程号",self.pid)
print("参数",self.name) # print(os.getpid()) 这两句是一样的,
if __name__ == ‘__main__‘:
# p1=MyProcess() # 这是不传参数的
p1=MyProcess("name1") # 这是传参数的,这就是面向对象的实例化,
p2=MyProcess(‘name2‘)
p3=MyProcess(‘name3‘)
p1.start() #start会自动调用run
p2.start()
# p2.run()
p3.start()
# 三个进程之间是异步的,
p1.join()
p2.join()
p3.join()
# 三个进程都结束了才会执行下面的内容,这是把异步,变成异步,
print(‘主线程‘)
# 进程之间的数据隔离问题
# 进程和进程之间的数据是否是隔离的,比如qq和微信,之间的数据是隔离的,
# 几个进程之间,如果不通过特殊的手段,是不可能共享一个数据的,这个记住,没有什么可理解的,
# 下面是一个例子,证明主进程和子进程之间是没有
from multiprocessing import Process
def work():
global n # 声明了一个全局变量,
n=0
print(‘子进程内: ‘,n)
if __name__ == ‘__main__‘:
n = 100
p=Process(target=work)
p.start()
print(‘主进程内: ‘,n)
############### 守护进程 ##############
# 守护进程
# 子进程----守护进程
# 第一版:主进程结束了,子进程还没有结束,
# import time
# from multiprocessing import Process
#
# def func():
# while True:
# time.sleep(1)
# print("我还活着")
#
#
# if __name__ == ‘__main__‘:
# p=Process(target=func)
# p.start()
# i = 0
# while i<10:
# time.sleep(1)
# i+=1
# print("主进程结束")
# 守护进程,就是主进程代码结束了而结束,记住不是主进程彻底结束,而是代码结束,
import time
from multiprocessing import Process
def func():
while True:
time.sleep(1)
print("我还活着")
if __name__ == ‘__main__‘:
p=Process(target=func)
p.daemon=True # 设置子进程为守护进程,
p.start()
p2=Process(target=func) # 这个子进程没有设置为守护进程所以这个进程还在进行中,
p2.start()
# time.sleep(3)
# p2.is_alive() # 判断一个进程是否活着
# p2.terminate() # 结束一个进程,
i = 0
while i<5:
time.sleep(1)
i+=1
print("主进程代码结束")
############### 进程锁 ##############
# 进程锁
# 买票就是一个并发的过程,
# 文件db的内容为:{"ticket":1}
# 注意一定要用双引号,不然json无法识别
# 并发运行,效率高,但竞争写同一文件,数据写入错乱
# 买票不加锁,可能会有多个人买到票了。但是票只有一张,
# from multiprocessing import Process,Lock
# import time,json,random
#
# # 查询余票的函数
# def show(i):
# with open("db") as f :
# dic=json.load(f)
# print(‘余票%s‘%dic[‘ticket‘])
#
# # 买票的函数
# def bug_ticket(i):
# with open("db") as f :
# dic=json.load(f)
# time.sleep(0.1) # 模拟读数据的网络延迟
# if dic[‘ticket‘] >0:
# dic[‘ticket‘]-=1
# print(‘\033[32m%s购票成功\033[0m‘%i)
# else:
# print(‘\033[31m%s没买到票\033[0m‘%i)
# time.sleep(0.1)
# with open("db","w") as f:
# json.dump(dic,f)
#
#
# if __name__ == ‘__main__‘:
# for i in range(10): # 模拟并发10个客户端查询票
# p=Process(target=show,args=(i,))
# p.start()
# for i in range(10):
# p = Process(target=bug_ticket, args=(i,))
# p.start()
# 加锁
from multiprocessing import Process,Lock
import time,json,random
# 查询余票的函数
def show(i):
with open("db") as f :
dic=json.load(f)
print(‘余票%s‘%dic[‘ticket‘])
# 买票的函数
def bug_ticket(i,lock):
lock.acquire() # 这就是拿钥匙进门,有一个进程拿了钥匙,之后第二个进程进来就没有钥匙了,就会阻塞,
with open("db") as f :
dic=json.load(f)
time.sleep(0.1) # 模拟读数据的网络延迟
if dic[‘ticket‘] >0:
dic[‘ticket‘]-=1
print(‘\033[32m%s购票成功\033[0m‘%i)
else:
print(‘\033[31m%s没买到票\033[0m‘%i)
time.sleep(0.1)
with open("db","w") as f:
json.dump(dic,f)
lock.release() # 这是还钥匙,还了之后,别的进程就可以拿到了,就可以执行了,
if __name__ == ‘__main__‘:
for i in range(10): # 模拟并发10个客户端查询票
p=Process(target=show,args=(i,))
p.start()
lock=Lock()
for i in range(10):
p = Process(target=bug_ticket, args=(i,lock))
p.start()
############### 多进程的信号量 ##############
# 多进程的信号量
from multiprocessing import Process
import time,random
from multiprocessing import Semaphore
# ktv只有1个房间,1个房间只能装4个人,但是这样写就是20个人都进入到房间了,
# 假设ket门口有4把钥匙,一个进程来了那一把钥匙,然后关门,这样只有4个进程能拿到,剩下的之后1个进程出来了才可以继续其他的进程,
# 这个概念就叫做信号量,同一时间就只有四个人,
def ktv(i,sem):
sem.acquire() # 获取钥匙
print("%d进入ktv"%i)
time.sleep(random.randint(60,180)) # 这是每一个人唱歌1-3分钟
print("%d走出ktv"%i)
sem.release() # 还钥匙
if __name__ == "__main__":
sem = Semaphore(4) # 这就是设置有多少把钥匙, 信号量的英文就是:Semaphore
for i in range(20):
p=Process(target=ktv,args=(i,sem))
p.start()
############### 进程的事件 ##############
# 进程的事件
# 事件
import time
from multiprocessing import Event, Process
# 一个信号,可以使所有的进程都进入阻塞状态,也可以控制所有信号都解除阻塞,
# 一个事件创建之后,默认是阻塞状态,
# e = Event() # 创建一个事件
# print(e.is_set()) # 查看一个事件是否是阻塞状态,
# print(123445)
# e.set() # 这是把阻塞的状态改为true,
# print(e.is_set())
# e.wait() # 根据e.is_set()的结果,如果是false,就会阻塞,如果是true就会不阻塞
# print(12344)
# e.clear() # 这是把阻塞的状态改为false
# print(e.is_set())
# e.wait() # 虽然阻塞了,但是一定要有这个wait,才会阻塞后面的代码,
# print(444444)
# 举一个例子,红绿灯
# 每一个进程表示一辆车,
def car(e,i):
#e.is_set() 默认返回False 代表的是绿灯
if not e.is_set():
print("car%s在等待"%i)
e.wait()
print("car%s通行了"%i)
def light(e):
while True:
if e.is_set():
e.clear()
print(‘\033[31m红灯亮了\033[0m‘)
else:
e.set()
print(‘\033[32m绿灯亮了\033[0m‘)
time.sleep(2)
if __name__ == ‘__main__‘:
e=Event()
# 模拟启动交通灯
p1=Process(target=light,args=(e,))
p1.daemon=True
p1.start()
#模拟20辆小车
for i in range(20):
import random
time.sleep(random.uniform(0,2))
p2=Process(target=car,args=(e,i))
p2.start()
print("程序彻底结束!")
############### 进程间的通信---队列 ##############
# 进程间的通信,
# 队列和通道,
# 队列
# 之前学过一个模块是queue,现在进程间的通信不能使用这个
# import queue
# 基本的队列的方法:
# from multiprocessing import Queue
# q =Queue(5) # 创建共享的进程队列,maxsize是队列中允许的最大项数,如果省略此参数,则无大小限制,容量是5
# q.put(1) # 添加值,
# q.put(2)
# q.put(3)
# q.put(4)
# q.put(5)
# q.put(6) # 队列满了,就不能放了,这个就会阻塞,
# print(q.full()) # 这个队列是否满了,如果q已满,返回为True
# print(q.get()) # 返回q中的一个项目。如果q为空,此方法将阻塞
# print(q.get()) # 获取值,
# print(q.get()) # 获取值,
# print(q.get()) # 获取值,
# print(q.get()) # 获取值,
# print(q.get()) # 获取值,没有值了,第六次的时候就会阻塞,
#
# print(q.empty()) # 如果调用此方法时 q为空,返回True
#
# while True:
# try:
# q.get_nowait() # 如果有值就等
# except:
# print("队列已经空了")
# import time
# time.sleep(1)
# # 队列之间的通信
# from multiprocessing import Queue,Process
#
# # 生产数据的函数
# def produce(q):
# q.put("hello")
#
# def consume(q):
# print(q.get())
#
# if __name__ == ‘__main__‘:
# q = Queue()
# p = Process(target=produce,args=(q,))
# p.start()
# c = Process(target=consume,args=(q,))
# c.start()
#
# # 这就是两个子进程之间的通信,通过的队列,
#
# 队列的生产者和消费者模型
# 买包子的例子
# 有蒸包子的人,这就是生产者,有买包子的人,这就是消费者,
# 实际中,可能会有数据供需不平衡的问题,
# 就是数据生产的多了没有消费,所以我们要增加消费者,或者减少生产
# 数据消费的多了,我们要增加生产者,来解决这个问题,
# 我们把生产者作为一个进程,把消费者作为一个进程
from multiprocessing import Process,Queue,JoinableQueue
import time, random
def producer(name,food,q): # 三个参数,就是谁生产,生产了什么,放到哪里
for i in range(10):
time.sleep(random.randint(1,3)) # 1-3秒生产1个,
f = "%s生产了%s%s"%(name,food,i)
print(f)
q.put(f)
q.join() # 阻塞, 这是感知一个队列中的数据全部都处理完毕,
# 这种相当于把生产的生命周期拉长了,就是说你是生产完了还没有结束,你还要等待消费者把你生产的所有的东西都消费了,才能结束,
def consumer(q,name):
while True:
food =q.get()
if food is None: # 问题:有多个消费者的时候,只有一个消费者拿到这个值,然后结束了,但是拿不到的消费者,就还没有结束,
print("获取到一个空")
break
print("%s消费了%s"%(name,food))
time.sleep(random.randint(1,3)) # 1-3秒消费1个,
q.task_done() # 队列的计数器 -1
if __name__ == ‘__main__‘:
# q = Queue()
q = JoinableQueue()
p1 = Process(target=producer,args=("andy","包子",q))
p1.start()
p2 = Process(target=producer,args=("Lucy","油条",q))
p2.start()
c1 = Process(target=consumer,args=(q,"xiaoxiao"))
c1.daemon =True
# 意味着,主进程的代码执行结束之后,子进程就结束了,
# 而主进程又是依赖两个生产者结束才结束的,
# 而我在生产者的地方加了一个阻塞,直到消费者全都消费了之后才结束,
# 所以这个设计是非常的巧妙的,
c1.start()
# 只有一个消费者,两个生产者, 所以会有供给过大,需要加一个消费者,
c2 = Process(target=consumer,args=(q,"meimei"))
c2.daemon =True
c2.start()
# 因为只会生产10个,所以怎么能够,没有生产了,但是消费的地方还在get,怎么办?
p1.join()
p2.join()
# q.put(None)
# q.put(None)
# 为什么是两个none?
# 问题:有多个消费者的时候,只有一个消费者拿到这个值,然后结束了,但是拿不到的消费者,就还没有结束,
# 这种写法太麻烦了,如果有1000个还得了,怎么办?使用新的一个模块:JoinableQueue
# 做了三件事;
# 1,把c1,c2,改成守护进程
# 2,把生产者加一个q.join(),直到消费者全部消费结束
# 3,加了一个 q.task_done() # 队列的计数器 -1
############### 进程间通信--管道(了解) ##############
# 进程间通信
# 管道(管道只做了解)
from multiprocessing import Pipe,Process
# conn1,conn2 = Pipe() # pipe是一个函数,有两个返回值,这个地方我们是使用两个参数来接收这两个返回值,
# conn1.send("123")
# print(conn2.recv())
# 这就是管道,这是一个双向通信的,
# 你调用这个,就会给你一个左边,一个右边,你从左边传入,就可以从右边传出,
# 你从右边传入,就可以从左边传出,
# 怎么使用管道是的在进程间通信?
# 也可以使用生产者和消费者模型
import time,random
def producter(con,pro,name,food):
con.close()
for i in range(4):
time.sleep(random.random())
f = "%s生产了%s%s" % (name, food, i)
print(f)
pro.send(f)
def consumer(con,pro,name):
pro.close()
while True:
try:
food = con.recv()
print("%s消费了%s" % (name, food))
time.sleep(random.random())
except EOFError:
con.close()
break
if __name__ == ‘__main__‘:
con, pro = Pipe()
p= Process(target=producter,args=(con, pro,"andy","包子"))
p.start()
c1= Process(target=consumer,args=(con, pro,"li"))
c1.start()
c2= Process(target=consumer,args=(con, pro,"wang"))
c2.start()
con.close()
pro.close()
# pipe有一个数据不安全性,一个放一个取没有问题,
# 但两个消费者的时候会有问题,会出现两个消费者抢资源的问题,
# 怎么解决这个问题?通过加锁
# 所以我们还是使用队列,队列就是基于管道加锁的,管道就是基于socket的,
# 使用队列就不会有数据不安全的问题了,
# 自己加锁需要考虑很多问题,所以我们还是使用队列,管道作为了解,
# 我们工作中顶多就会用到队列,
############### 进程之间的数据共享 ##############
# 进程之间的数据共享
# 通过Manager模块
from multiprocessing import Manager,Process,Lock
# 单个进程:
# def work(dic):
# dic[‘count‘]-=1
# print(dic)
#
# if __name__ == ‘__main__‘:
# m = Manager()
# dic=m.dict({‘count‘:100})
# p_list=[]
# p = Process(target=work, args=(dic,))
# p_list.append(p)
# p.start()
# p.join()
# print("主进程",dic) # 主进程 {‘count‘: 99},这就是子进程的改变,影响到了主进程
# 多个进程,出现的问题:
# def work(dic):
# dic[‘count‘]-=1
# # 这个就是涉及到多个进程修改一个内容的情况,这种是不安全的,怎么办?加锁
# # 按理说是50,可能会结果不是50
#
# if __name__ == ‘__main__‘:
# m = Manager()
# dic=m.dict({‘count‘:100})
# p_list=[]
# for i in range(50): # 创建多个进程
# p = Process(target=work, args=(dic,))
# p_list.append(p)
# p.start()
# for i in p_list:
# p.join()
# print("主进程",dic) # 主进程 {‘count‘: 99},这就是子进程的改变,影响到了主进程
# 多个进程加锁:
def work(dic,lock):
lock.acquire()
dic[‘count‘]-=1
# 这个就是涉及到多个进程修改一个内容的情况,这种是不安全的,怎么办?加锁
lock.release()
if __name__ == ‘__main__‘:
m = Manager()
lock = Lock()
dic=m.dict({‘count‘:100})
p_list=[]
for i in range(50): # 创建多个进程
p = Process(target=work, args=(dic,lock))
p_list.append(p)
p.start()
for i in p_list:
p.join()
print("主进程",dic) # 主进程 {‘count‘: 99},这就是子进程的改变,影响到了主进程
# 这个数据共享,工作中也不会用到,
# 队列还有很多,kafak,rebbitmq,memcache
############### 进程池 ##############
"""
进程池的概念
为什么会有进程池?
1,因为没开启一个进程,都需要创建一个内存空间,这是耗时的
2,进程过多,操作熊的调度也会耗时,
所以会有非常大的性能问题,
所以我们不会让进程太大,我们会设计一个进程池,
进程池:
1,Python中先创建一个进程的池子,
2,这个进程池能存放多少个进程,比如有5个进程,
3,先把这些进程创建好,
4,比如有50个任务他们到进程池里面去找进程,找到的就执行,找不到的就等待,
5,进程执行结束之后,不会结束,而是返回进程池,等待下一个任务,
所以进程池,可以节省进程创建的时间,节省了操作系统的调度,而且进程不会过多的创建,
所以进程池和信号量有什么关系?
假设有200个任务,
信号量,信号量还是200个进程在排队,去拿钥匙,所以不能控制有多少进程,而是控制了同一时间有几个进程在执行,
也就是只允许5个进程让操作系统调度,节省了操作系统的调度时间,但是并没有节省进程的创建时间,
而进程池,是有200个任务去拿进程,所以进程池既是节省了操作系统的调度时间,也节省进程的创建时间,
更高级的进程池是比较智能的,
比如现在进程池有5个进程,就可以处理过来了,就不需要增加
但是如果处理等待的任务太多了,急需要往进程池里面加进程,一直到设置的进程池上限
如果任务减少了,就进程池里面减少,
这是比较智能的,
Python中没有高级的进程池,只有一个固定的进程数的进程池,没有弹性的那种,
"""
# from multiprocessing import Pool, Process
# def func(n):
# print(n + 1)
# if __name__ == ‘__main__‘:
# pool = Pool(5) # 进程池里面有5个进程,约定就是cpu的内核+1
# pool.map(func, range(100)) # 这是模拟100个任务,
# 这个map是一个异步的,而且自带close,和join功能,
# 上面是一个使用进程池的方法,还有其他的使用进程池的方法
# 使用了进程池之后,就不使用哪种创建进程的方式了,
from multiprocessing import Pool
import time,os
def func(n):
print("start func%s"%n,os.getpid())
time.sleep(1)
print("end func%s"%n,os.getpid())
if __name__ == ‘__main__‘:
p = Pool(5) # 不写就是默认cpu的核数,
for i in range(10):
# p.apply(func,args=(i,)) # p.apply这是同步,很慢,
p.apply_async(func,args=(i,)) # p.apply_async这是异步,这个一定是和close和join同时使用的,
p.close() # 结束进程池接收任务
p.join() # 这是感知进程池中的任务执行结束,
############### 进程池的返回值 ##############
# 进程池的返回值,
from multiprocessing import Pool, Process
def func(i):
return i
# if __name__ == ‘__main__‘:
# pool = Pool(5)
# res_list = []
# for i in range(10):
# # res = pool.apply(func,args=(i,)) # 所以这个结果接收,就是返回值,
# res = pool.apply_async(func,args=(i,)) # 所以这个结果接收,就是返回值,
# res_list.append(res)
# for res in res_list:
# print(res.get()) # get会阻塞等待结果
# 上面讲了apply和apply_async 的返回值的问题,
# 下面讲讲map的返回值的问题,比较简单
if __name__ == ‘__main__‘:
pool = Pool(5)
ret = pool.map(func,range(10))
print(ret) # 这是返回了一个列表,
# 使用的时候想用map,map搞不定就使用,apply_async
############### 进程池的回调函数 ##############
# 进程池的回调函数
from multiprocessing import Pool
def func1(n):
print(111)
return n
def func2(n):
print(222)
print(n*2)
if __name__ == ‘__main__‘:
p = Pool(5)
p.apply_async(func1,args=(10,),callback=func2)
p.close()
p.join()
# 回调函数都是在主进程中执行的,
标签:函数 xiaoxiao llb 是的 foo 表示 man try call
原文地址:https://www.cnblogs.com/andy0816/p/12289717.html