标签:join() 并行 效率 tcp 实例 lex get 独立 效果
多道 、分时、实时
同步异步
阻塞和非阻塞
进程三状态:就绪 运行 阻塞
并发并行
子进程和主进程
多并发的tcp服务端
import socket
from multiprocessing import Process
def communicate(conn):
while True:
conn.send("hello".encode("utf-8"))
print(conn.recv(1024))
if __name__ == ‘__main__‘:
sk = socket.socket()
sk.bind((‘127.0.0.1‘,9001))
sk.listen()
while True:
conn,addr = sk.accept()
Process(target=communicate,args=(conn,)).start()
import socket
sk = socket.socket()
sk.connect((‘127.0.0.1‘,9001))
while True:
print(sk.recv(1024))
mv = input(">>>>>>>>>>:").strip()
sk.send(mv.encode("utf-8"))
进程是操作系统中最小的资源分配单位
进程
第二种开启子进程的方式
def func(index):
time.sleep(random.random())
print(‘第%s个邮件已经发送完毕‘%index)
if __name__ == ‘__main__‘:
p_lst = []
for i in range(10):
p = Process(target=func,args=(i,))
p.start()
p_lst.append(p)
for p in p_lst:
p.join()
print(‘全部发送完毕‘)
join控制子进程
#子进程同步,执行完毕后才执行主程序后面的程序
# import time
# from multiprocessing import Process
# def f(name):
# print("hello",name)
# time.sleep(1)
# if __name__ == ‘__main__‘:
# p_list = []
# for i in range(5):
# p = Process(target=f,args=(i,))
# p.start()
# p_list.append(p)
# p.join() #阻塞,
# print("主进程执行")
#子程序异步执行,执行完了阻塞结束
import time
from multiprocessing import Process
def f(name):
print("hello",name)
time.sleep(1)
if __name__ == ‘__main__‘:
p_list = []
for i in range(10):
p = Process(target=f,args=(i,))
p.start()
p_list.append(p)
for i in p_list:
i.join()
print("主进程执行完毕")
守护进程 daemon
守护进程会随着主进程代码执行完毕而结束
守护进程内无法再开启子进程,否则会抛出异常
注意:进程之间是相互独立的,主进程代码运行结束,守护进程也会随即终止
import time
from multiprocessing import Process
def func1():
count = 1
while True:
time.sleep(0.5)
print(count*"*")
count += 1
def func2():
print("func strat")
time.sleep(5)
print("func2 end")
if __name__ == ‘__main__‘:
p1 = Process(target=func1)
p1.daemon = True #定义为守护进程
p1.start() #执行
Process(target=func2).start()
time.sleep(3)
print("主进程")
#输出
# func strat
# *
# **
# ***
# ****
# *****
# 主进程
# func2 end
如果主进程执行完毕那么守护进程也会结束,但是其他子进程如果没执行完还会继续执行
锁
作业:在进程之间保证数据安全性
from multiprocessing import Process,Lock
lock= Lock()实例对象
lock.acquire() 取钥匙开门
lock.release() 关门放钥匙
例题 模拟抢票
import time
import json
from multiprocessing import Process,Lock
def search(person): #查票
with open("ticket") as f: #文件中保存着一个字典{"count":4}
dic = json.load(f) #读出文件中的字典
time.sleep(0.2)
print("%s查询余票"%person,dic["count"])
def get_ticket(person): #抢票
with open("ticket") as f:
dic = json.load(f)
time.sleep(0.2) #模拟延迟
if dic["count"] >0:
print("%s买到票了"%person)
dic["count"] -= 1
time.sleep(0.2)
with open("ticket","w") as f:
json.dump(dic,f) #写回文件
else:
print("%s没买到票"%person)
def ticket(person,lock):
search(person)
lock.acquire() #开门,一次只能进一个
get_ticket(person)
lock.release() #关门
if __name__ == ‘__main__‘:
lock = Lock()
for i in range(10):
p = Process(target=ticket,args=("person%s"%i,lock))
p.start()
为了保证数据的安全,在异步的情况下,多个进程又可能同时修改同一份数据的时候,需要给这个数据上锁
加锁的作用
同步控制
import time
from multiprocessing import Process,Lock
def func(num,lock):
time.sleep(1)
print("异步执行",num)
lock.acquire()
time.sleep(0.5)
print("同步执行",num)
lock.release() #同步执行是依次执行,间隔0.5秒
if __name__ == ‘__main__‘:
lock = Lock()
for i in range(10):
p = Process(target=func,args=(i,lock))
p.start()
信号量 机制:计数器+锁实现的 Semaphore
主程序控制一定数量的子程序同时执行,这些数量的子程序执行完一个就会有下一个子程序补充进来
import time
import random
from multiprocessing import Process,Semaphore
def ktv(person,sem):
sem.acquire() #进
print("%s走进KTV"%person)
time.sleep(random.randint(1,3)) #随机延迟一到三秒
print("%s走出ktv"%person)
sem.release() #出
if __name__ == ‘__main__‘:
sem = Semaphore(4) #信号量为4,默认为1
for i in range(10):
Process(target=ktv,args=(i,sem)).start()
事件 Event
阻塞事件 wait() 方法
控制这个属性的值
set()将这个属性的值改成True
clear() 将这个属性的值改成False
is_set() 判断当前属性是否为True
#模拟红绿灯,只有全部车通过后才停止
import time
import random
from multiprocessing import Process,Event
def traffic_light(e):
print("红灯亮")
while True:
if e.is_set():
time.sleep(2)
print("红灯亮")
e.clear()
else:
time.sleep(2)
print("绿灯亮")
e.set()
def car(e,i):
if not e.is_set():
print("car%s在等待"%i)
e.wait()
print("car%s通过了"%i)
if __name__ == ‘__main__‘:
e = Event()
p = Process(target=traffic_light,args=(e,))
p.daemon =True #变成守护进程
p.start()
p_list = []
for i in range(10):
time.sleep(random.randrange(0,3,2))
p = Process(target=car,args=(e,i))
p.start()
p_list.append(p)
for p in p_list:p.join()
多个进程之间有一些固定的通信内容
socket给予文件家族通信
进程之间虽然内存不共享,但是可以通信,
队列是基于管道实现的
管道是基于socket实现的
队列 + 锁 简便的IPC机制 使得进程之间的数据安全
def consume(q):
print(‘son-->‘,q.get())
q.put(‘abc‘)
if __name__ == ‘__main__‘:
q = Queue()
p = Process(target=consume,args=(q,))
p.start()
q.put({‘123‘:123})
p.join()
print(‘Foo-->‘,q.get())
简单的生产消费模型
def consume(q):
print(‘son-->‘,q.get())
q.put(‘abc‘)
if __name__ == ‘__main__‘:
q = Queue()
p = Process(target=consume,args=(q,))
p.start()
q.put({‘123‘:123})
p.join()
print(‘Foo-->‘,q.get())
相同的原理 JoinableQueue
task_done 通知队列已经有一个数据被处理了
q.join() 阻塞直到放入队列中所有的数据都被处理掉(有多少个数据就接受到多少taskdone)
import time
import random
from multiprocessing import Process,JoinableQueue
def consumer(q,name):
while True:
food = q.get()
time.sleep(random.uniform(0.3,0.8))
print("%s吃了一个%s"%(name,food))
q.task_done()
def producer(q,name,food):
for i in range(10):
time.sleep(random.uniform(0.3,0.8))
print("%s生产了%s%s"%(name,food,i))
q.put(food+str(i))
if __name__ == ‘__main__‘:
jq = JoinableQueue()
c1 = Process(target=consumer,args=(jq,"alex"))
c1.daemon = True
p1 = Process(target=producer,args=(jq,"libai","包子"))
c1.start()
p1.start()
p1.join()
jq.join()
管道 进程之间数据不安全 且存取数据复杂
开启过多的进程并不能提高你的效率,反而会降低效率
计算密集型 充分占用CPU 多进程可以充分利用多核 适合开启多进程,但是不适合开启很多多进程
IO密集型 大部分时间都在阻塞队列,而不是在运行状态 根本不太适合开启多进程
提交任务:
同步提交 apply
返回值:子进程对应函数的返回值
一个一个顺序执行的,并没有任何的并发效果
# import os
# import time
# from multiprocessing import Process,Pool
# def task(num):
# time.sleep(0.5)
# print("%s: %s"%(num,os.getpid()))
# return num ** 2
# if __name__ == ‘__main__‘:
# p = Pool(4)
# for i in range(20):
# res = p.apply(task,args=(i,)) #apply 提交任务方法,同步提交
# print("--->",res)
#四个任务依次执行,轮换
异步提交 apply_async
没有返回值,要想所有任务能够顺利的执行完毕
有返回值的情况下
res.get() #get不能再提交任务之后立刻执行,应该是先提交所有的任务再通过get获取结果
map()方法
import os
import time
from multiprocessing import Pool
def task(num):
time.sleep(1)
print("%s: %s"%(num,os.getpid()))
return num **2
if __name__ == ‘__main__‘:
p = Pool(4)
for i in range(20):
res = p.apply_async(task,args=(i,)) #apply_async 异步提交
p.close()
p.join()
#输出结果同时四个认识执行
标签:join() 并行 效率 tcp 实例 lex get 独立 效果
原文地址:https://www.cnblogs.com/yuncong/p/9683881.html