标签:roc 多进程 机器 index pre 情况下 解锁 range strong
阻塞主进程,等待子进程执行完毕再放开阻塞
import time import random from multiprocessing import Process # 单个子进程 def func(index): time.sleep(random.randint(1, 3)) print(‘发送完毕‘) if __name__ == ‘__main__‘: p = Process(target=func, args=(1,)) p.start() p.join() # 阻塞 直到p进程执行完毕就结束阻塞 print(‘邮件已经发送完毕‘) ‘‘‘ 发送完毕 邮件已经发送完毕 ‘‘‘ # 多个子进程 def func(index): time.sleep(random.randint(1,3)) print(‘第%s个邮件已经发送完毕‘ % index) if __name__ == ‘__main__‘: p_lst = [] for i in range(10): p = Process(target=func, args=(i,)) p.start() p_lst.append(p) for p in p_lst: p.join() # 等待每个子进程执行完毕 print(‘10个邮件已经发送完毕‘) ‘‘‘ 第2个邮件已经发送完毕 第4个邮件已经发送完毕 第7个邮件已经发送完毕 第5个邮件已经发送完毕 第1个邮件已经发送完毕 第9个邮件已经发送完毕 第0个邮件已经发送完毕 第3个邮件已经发送完毕 第8个邮件已经发送完毕 第6个邮件已经发送完毕 10个邮件已经发送完毕 ‘‘‘
以继承Process类的形式开启进程的方式
import os from multiprocessing import Process class MyProcess(Process): def run(self): print(‘子进程: ‘, os.getpid(), os.getppid()) if __name__ == ‘__main__‘: p = MyProcess() p.start() # 开启一个子进程,让这个子进程执行run方法 print(‘主进程:‘, os.getpid()) ‘‘‘ 主进程: 6852 子进程: 6644 6852 ‘‘‘ # 给子进程传参数 class MyProcess(Process): def __init__(self, arg): super().__init__() self.arg = arg def run(self): time.sleep(1) print(‘子进程: ‘, os.getpid(), os.getppid(), self.arg) if __name__ == ‘__main__‘: # 开启单个子进程 p = MyProcess(‘参数‘) p.start() # 开启一个子进程,让这个子进程执行run方法 p.join() print(‘主进程:‘, os.getpid()) ‘‘‘ 子进程: 6552 7784 参数 主进程: 7784 ‘‘‘ if __name__ == ‘__main__‘: # 开启多个子进程 for i in range(10): p = MyProcess(‘参数%s‘ % i) p.start() # 开启一个子进程,让这个子进程执行run方法 print(‘主进程:‘, os.getpid()) ‘‘‘ 主进程: 7340 子进程: 6540 7340 参数0 子进程: 6512 7340 参数3 子进程: 7648 7340 参数1 子进程: 7460 7340 参数2 子进程: 8048 7340 参数4 子进程: 5108 7340 参数8 子进程: 7868 7340 参数7 子进程: 7892 7340 参数6 子进程: 4172 7340 参数9 子进程: 7224 7340 参数5 ‘‘‘
主要功能:每隔一段时间就向一台机器汇报自己的状态(程序的报活)
特点:会随着主进程的结束而结束。
import time from multiprocessing import Process def func(): print(‘子进程 start‘) time.sleep(3) print(‘子进程 end‘) if __name__ == ‘__main__‘: p = Process(target=func) p.daemon = True # 设置p为一个守护进程,必须在start之前完成 p.start() time.sleep(2) print(‘主进程‘) ‘‘‘ 子进程 start 主进程 ‘‘‘ # 主进程会等待子进程完全结束才结束 # 守护进程会随着主进程的代码执行完毕而结束 def func1(): count = 1 while 1: print(count * ‘*‘) time.sleep(0.5) count += 1 def func2(): print(‘func2 start‘) time.sleep(5) print(‘func2 end‘) if __name__ == ‘__main__‘: p1 = Process(target=func1) p1.daemon = True p1.start() # p1是守护进程 p2 = Process(target=func2) p2.start() time.sleep(3) print(‘主进程‘) ‘‘‘ func2 start * ** *** **** ***** ****** 主进程 func2 end ‘‘‘ # 如果主进程代码已经执行完毕,但是子进程还没执行完,守护进程都不会继续执行 # 守护进程会随着主进程的代码执行完毕而结束 # 主进程会等待子进程结束,守护进程只等待主进程代码结束就结束了
加锁降低了程序的效率,让原来能够同时执行的代码变成顺序执行了,异步变同步的过程
好处:保证了数据的安全
import time import json from multiprocessing import Process, Lock # 当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。 # 多进程抢占输出资源 def search(person): with open(‘ticket‘) as f: dic = json.load(f) time.sleep(0.2) # 模拟网络延迟 print(‘%s查询余票:‘ % person, dic[‘count‘]) def get_ticket(person): with open(‘ticket‘) as f: dic = json.load(f) time.sleep(0.2) # 模拟网络延迟 if dic[‘count‘] > 0: print(‘%s买到票了‘ % person) dic[‘count‘] -= 1 # 买到票,数量减1 time.sleep(0.2) # 模拟网络延迟 with open(‘ticket‘, ‘w‘) as f: json.dump(dic, f) # 把剩余票数写回文件 else: print(‘%s没买到票‘ % person) def ticket(person): search(person) # 查票 get_ticket(person) # 抢票 if __name__ == ‘__main__‘: for i in range(10): p = Process(target=ticket, args=(‘person%s‘ % i,)) p.start() ‘‘‘ person0查询余票: 5 person4查询余票: 5 person3查询余票: 5 person0买到票了 person8查询余票: 5 person2查询余票: 5 person7查询余票: 5 person4买到票了 person1查询余票: 5 person5查询余票: 5 person3买到票了 person9查询余票: 5 person6查询余票: 5 person8买到票了 person2买到票了 person7买到票了 person1买到票了 person9买到票了 person6买到票了 person5买到票了 ‘‘‘ # 使用锁维护执行顺序 def search(person): with open(‘ticket‘) as f: dic = json.load(f) time.sleep(0.2) # 模拟网络延迟 print(‘%s查询余票:‘ % person, dic[‘count‘]) def get_ticket(person): with open(‘ticket‘) as f: dic = json.load(f) time.sleep(0.2) # 模拟网络延迟 if dic[‘count‘] > 0: print(‘%s买到票了‘ % person) dic[‘count‘] -= 1 # 买到票,数量减1 time.sleep(0.2) # 模拟网络延迟 with open(‘ticket‘, ‘w‘) as f: json.dump(dic, f) # 把剩余票数写回文件 else: print(‘%s没买到票‘ % person) def ticket(person, lock): search(person) lock.acquire() # 加锁 get_ticket(person) lock.release() # 解锁 if __name__ == ‘__main__‘: lock = Lock() for i in range(10): p = Process(target=ticket, args=(‘person%s‘ % i, lock)) p.start() ‘‘‘ person0查询余票: 5 person1查询余票: 5 person2查询余票: 5 person0买到票了 person5查询余票: 5 person6查询余票: 5 person3查询余票: 5 person9查询余票: 5 person4查询余票: 5 person8查询余票: 5 person7查询余票: 5 person1买到票了 person2买到票了 person5买到票了 person6买到票了 person3没买到票 person9没买到票 person4没买到票 person8没买到票 person7没买到票 ‘‘‘ # 为了保证数据的安全 # 在异步的情况下,多个进程有可能同时修改同一份资源 # 就给这个修改的过程加上锁
import time from multiprocessing import Process, Lock # 加了锁就把异步变成同步了 def func(num, lock): time.sleep(1) print(‘异步执行‘, num) # 异步会同时开始 lock.acquire() time.sleep(0.5) print(‘同步执行‘, num) # 同步要一个结束才开始下一个 lock.release() if __name__ == ‘__main__‘: lock = Lock() for i in range(10): p = Process(target=func, args=(i, lock)) p.start()
from multiprocessing import Process, Lock # 互斥锁 lock = Lock() lock.acquire() print(‘456‘) lock.acquire() print(‘123‘) # 只打印456,123不打印
信号量的实现机制:计数器 + 锁 实现的
import time import random from multiprocessing import Process, Semaphore def ktv(person, sem): sem.acquire() print(‘\033[32m%s走进ktv\033[0m‘ % person) time.sleep(random.randint(1,5)) print(‘\033[31m%s走出ktv\033[0m‘ % person) sem.release() if __name__ == ‘__main__‘: sem = Semaphore(4) # 限定4个 for i in range(10): p = Process(target=ktv, args=(‘person%s‘ % i, sem)) p.start() ‘‘‘ person2走进ktv person1走进ktv person3走进ktv person0走进ktv person1走出ktv person6走进ktv person6走出ktv person7走进ktv person2走出ktv person5走进ktv person3走出ktv person9走进ktv person0走出ktv person8走进ktv person7走出ktv person4走进ktv person8走出ktv person9走出ktv person5走出ktv person4走出ktv ‘‘‘
阻塞事件:wait()方法
wait 是否阻塞是看 event 对象内部的一个属性
控制这个属性的值
set() 将这个属性的值改成True
clear() 将这个属性的值改成False
is_set() 判断当前的属性是否为True
import time import random from multiprocessing import Process, Event def traffic_light(e): print(‘\033[31m红灯亮\033[0m‘) while 1: if e.is_set(): time.sleep(2) print(‘\033[31m红灯亮\033[0m‘) e.clear() else: time.sleep(2) print(‘\033[32m绿灯亮\033[0m‘) e.set() def car(e, i): if not e.is_set(): print(‘car %s 在等待‘ % i) e.wait() print(‘car %s 通过了‘ % i) if __name__ == ‘__main__‘: e = Event() p = Process(target=traffic_light, args=(e,)) p.daemon = True p.start() p_lst = [] for i in range(20): time.sleep(random.randrange(0, 3, 2)) p = Process(target=car, args=(e, i)) p.start() p_lst.append(p) for p in p_lst: p.join() ‘‘‘ 红灯亮 car 0 在等待 car 1 在等待 绿灯亮 car 1 通过了 car 0 通过了 car 2 通过了 car 3 通过了 car 4 通过了 红灯亮 car 7 在等待 car 6 在等待 car 5 在等待 car 8 在等待 绿灯亮 car 6 通过了 car 7 通过了 car 8 通过了 car 5 通过了 红灯亮 car 9 在等待 绿灯亮 car 9 通过了 car 11 通过了 car 10 通过了 红灯亮 car 12 在等待 car 13 在等待 绿灯亮 car 13 通过了 car 12 通过了 car 15 通过了 car 14 通过了 红灯亮 car 17 在等待 car 16 在等待 car 18 在等待 绿灯亮 car 16 通过了 car 17 通过了 car 18 通过了 car 19 通过了 ‘‘‘
《python》join、守护进程、锁/信号量/事件、进程队列
标签:roc 多进程 机器 index pre 情况下 解锁 range strong
原文地址:https://www.cnblogs.com/zyling/p/12787669.html