标签:tip 10个 range 创建 bind 访问 固定 str __name__
目录
计算机三大组成:应用程序、操作系统、硬件。
执行程序结构:硬盘、内存、CPU。
操作系统:协调、管理和控制计算机软硬件资源的控制程序。
操作系统作用:
特点:
优点:程序员独占资源,即时调试程序
缺点:浪费计算机资源,一个时间段只有一个人使用
特点:
优点:批处理节省机时
缺点:
多道技术:时间多路复用和空间多路复用+硬件上(物理层)支持隔离,以达到将一个单独的CPU变成多个虚拟的CPU。
分时操作系统:多个联机终端+多道技术
进程:一个程序的执行过程,而负责执行任务是CPU。
并发:伪并行,即看起来是同时运行,单个CPU+多道技术就可实现。
串行:一个进程执行完才执行下一个。
并行:同时运行,只有具备多个CPU才能实现。
阻塞:进程在等待输入(即I/O)时的状态。
进程的状态:运行、阻塞、就绪。
句柄:在创建进程时,父进程得到一个特别的令牌,该令牌可控制子进程,但是父进程有权把该句柄传给其他子进程。
进程的层次结构:无论UNIX还是windows,进程只有一个父进程,不同的是
# 方式一:利用Process类
from multiprocessing import Process
import time
import os
def task(name):
print(f'{name} is running,子进程号:{os.getpid()},子父进程号:{os.getppid()}')
time.sleep(2)
print(f'{name} is done')
if __name__ == '__main__':
p = Process(target=task, args=('allen',)) # 实例化
p.start() # 仅仅给操作系统发送一个开启子进程信号
print(f'主,主进程号:{os.getpid()},主父进程号:{os.getppid()}')
# 利用终端查看pycharm进程号
# E:\练习>tasklist |findstr pycharm
# 方式二:继承Process类
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, name): # 重写init方法
super().__init__() # 继承父类init方法
self.name = name
def run(self) -> None: # 固定写法
print(f'{self.name} is running')
time.sleep(2)
print(f'{self.name} is done')
if __name__ == '__main__':
p = MyProcess('allen') # 实例化
p.start() # 仅仅给操作系统发送一个开启子进程信号
print('主')
僵尸进程:当子进程比父进程先结束,而父进程又没有回收子进程,释放子进程占用的资源,此时子进程将成为一个僵尸进程。
孤儿进程:当父进程退出,而它的一个或多个子进程还在运行,那么子进程将成为孤儿进程。
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is done')
if __name__ == '__main__':
p1 = Process(target=task, args=('allen',), name='子进程1') # 让关键参数来指定进程名
p2 = Process(target=task, args=('nick',))
p3 = Process(target=task, args=('tank',))
p_list = [p1, p2, p3]
for p in p_list:
p.start()
p1.terminate() # 给操作系统发信号关闭进程,不会立即关闭
for p in p_list:
p.join() # 让主进程等待p的结束,这是并发并非串行
print(p1.is_alive()) # 判断子进程是否存活
print('主')
print(p1.pid) # 查看子进程号
print(p1.name) # 查看子进程名
# server.py
import socket
from multiprocessing import Process
def talk(conn): # 通信循环函数
while True:
try:
data = conn.recv(1024)
print(data)
conn.send(data.upper())
except Exception:
break
conn.close()
def server(ip, port): # 连接循环函数
server = socket.socket()
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
p = Process(target=talk, args=(conn,)) # 实例化进程对象
p.start() # 开启进程
server.close()
if __name__ == '__main__':
server('127.0.0.1', 8000)
# client.py
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8000))
while True:
msg = input('>>>:').strip()
if not msg: continue
client.send(bytes(msg, encoding='utf-8'))
data = client.recv(1024)
print(data)
# 每来一个客户端,服务端就开启一个新的进程来服务它,这种实现方式导致系统越来越卡。
# 进程之间的内存空间是隔离的。
定义:主进程创建子进程,然后将该进程设置成守护自己的进程。守护进程会在主进程代码执行结束后就终止。
from multiprocessing import Process
import time
def talk(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is done')
if __name__ == '__main__':
p = Process(target=talk, kwargs={'name': 'allen'})
p.daemon = True # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
p.start()
print('主') # 主 #只要终端打印出这一行内容,那么守护进程p也就跟着结束掉了
由来:进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱。
# 并发运行,效率高,但竞争同一打印终端,带来了打印错乱
from multiprocessing import Process
import time, os
def task():
print(f'{os.getpid()} is running')
time.sleep(1)
print(f'{os.getpid()} is done')
if __name__ == '__main__':
for i in range(3):
p = Process(target=task)
p.start()
解决方案:加锁处理。互斥锁的工作原理就是把并发改成串行,降低了效率,但保证了数据安全不错乱。如果把多个进程比喻为多个人,多个人都要去争抢同一个资源:卫生间,一个人抢到卫生间后上一把锁,其他人都要等着,等到这个完成任务后释放锁。
# 由并发变成了串行,牺牲了运行效率,但避免了竞争
from multiprocessing import Process, Lock
import time, os
def task(lock):
lock.acquire() # 加锁
print(f'{os.getpid()} is running')
time.sleep(1)
print(f'{os.getpid()} is done')
lock.release() # 释放锁
if __name__ == '__main__':
lock = Lock() # 锁对象
for i in range(3):
p = Process(target=task, args=(lock,))
p.start()
方案一:多个进程共享同一文件,我们可以把文件当数据库,用多个进程模拟多个人执行抢票任务,并发运行,效率高,但竞争写同一文件,数据写入错乱,只有一张票,卖成功给了10个人,故不成立。
#文件db.txt的内容为:{"count":1}
#注意一定要用双引号,不然json无法识别
from multiprocessing import Process
import time
import json
def search(name):
dic = json.load(open('db.txt', 'r', encoding='utf-8'))
time.sleep(1) # 模拟网络延迟
print(f'路人{name}查询了还有{dic["count"]}张余票')
def get(name):
dic = json.load(open('db.txt', 'r', encoding='utf-8'))
time.sleep(1)
if dic["count"] > 0:
dic["count"] -= 1
time.sleep(1)
json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
print(f'{name}抢票成功')
def task(name):
search(name)
get(name)
if __name__ == '__main__':
for i in range(8):
p = Process(target=task, args=(f'路人{i}',))
p.start()
方案二:加锁处理:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全
from multiprocessing import Process, Lock
import time
import json
def search(name):
"""查票函数"""
dic = json.load(open('db.txt', 'r', encoding='utf-8'))
time.sleep(1) # 模拟网络延迟
print(f'路人{name}查询了还有{dic["count"]}张余票')
def get(name):
"""抢票函数"""
dic = json.load(open('db.txt', 'r', encoding='utf-8'))
time.sleep(1)
if dic["count"] > 0:
dic["count"] -= 1
time.sleep(1)
json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
print(f'{name}抢票成功')
def task(name, lock):
search(name)
lock.acquire() # 加锁
get(name)
lock.release() # 释放锁
if __name__ == '__main__':
lock = Lock() # 在抢票环节加锁,而非查票加锁
for i in range(8):
p = Process(target=task, args=(f'路人{i}', lock))
p.start()
方案三:join方法处理,虽然保证了数据安全,但问题是连查票操作也变成只能一个一个人去查了,很明显大家查票时应该是并发地去查询而无需考虑数据准确与否。
from multiprocessing import Process, Lock
import time
import json
def search(name):
"""查票函数"""
dic = json.load(open('db.txt', 'r', encoding='utf-8'))
time.sleep(1) # 模拟网络延迟
print(f'路人{name}查询了还有{dic["count"]}张余票')
def get(name):
"""抢票函数"""
dic = json.load(open('db.txt', 'r', encoding='utf-8'))
time.sleep(1)
if dic["count"] > 0:
dic["count"] -= 1
time.sleep(1)
json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
print(f'{name}抢票成功')
def task(name):
search(name)
get(name)
if __name__ == '__main__':
for i in range(8):
p = Process(target=task, args=(f'路人{i}', ))
p.start()
p.join()
综上所述:join是将一个任务整体串行,而互斥锁的好处则是可以将一个任务中的某一段代码串行,比如只让task函数中的get任务串行。
def task(name,):
search(name) # 并发执行
lock.acquire()
get(name) #串行执行
lock.release()
虽然可以用文件共享数据实现进程间通信,但问题是:效率低(共享数据基于文件,而文件是硬盘上的数据)和需要自己加锁处理
因此我们最好找寻一种解决方案能够兼顾:效率高(多个进程共享一块内存的数据)和帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC(inter-process-communication)通信机制:队列和管道。
队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
定义:可创建共享的进程队列,可保证多进程安全的队列,实现多进程之间的数据传递。
from multiprocessing import Queue
q = Queue(3) # 创建共享的进程队列
q.put(1) # 插入数据到队列中
q.put('hello')
q.put([1, 2, 3])
print(q.full()) # True
# q.put(4) # 再放就阻塞了
q.get() # 从队列读取并且删除一个元素
q.get()
q.get()
print(q.empty()) # True
q.get() # 再取就阻塞了
定义:生产者是生产数据的任务,消费者是处理数据的任务,而生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。
# 基于Queue实现消费者和生产者模型
from multiprocessing import Process, Queue
import time
def producer(q):
for i in range(3):
res = f'包子{i}'
time.sleep(1)
print(f'生产者生产了{res}')
q.put(res)
def costumer(q):
while True:
res = q.get()
if res is None: break
time.sleep(1)
print(f'消费者吃了{res}包子')
if __name__ == '__main__':
# 容器
q = Queue()
# 生产者们
p1 = Process(target=producer, args=(q,))
p2 = Process(target=producer, args=(q,))
# 消费者们
c1 = Process(target=costumer, args=(q,))
p1.start()
p2.start()
c1.start()
p1.join()
p2.join()
q.put(None)
q.put(None)#有几个消费者就需要发送几次结束信号:相当low
print('主')
# 基于JoinableQueue实现生产者消费者模型
from multiprocessing import Process, JoinableQueue
import time
def producer(q):
for i in range(3):
res = f'包子{i}'
time.sleep(1)
print(f'生产者生产了{res}')
q.put(res)
q.join() # 等到消费者把自己放入队列中的所有的数据都取走之后,生产者才结束
def costumer(q):
while True:
res = q.get()
if res is None: break
time.sleep(1)
print(f'消费者吃了{res}包子')
q.task_done() # 发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕了
if __name__ == '__main__':
# 容器
q = JoinableQueue()
# 生产者们
p1 = Process(target=producer, args=(q,))
p2 = Process(target=producer, args=(q,))
# 消费者们
c1 = Process(target=costumer, args=(q,))
c1.daemon = True
p1.start()
p2.start()
c1.start()
p1.join()
p2.join()
# 1、主进程等生产者p1、p2结束
# 2、而p1、p2是在消费者把所有数据都取干净之后才会结束
# 3、所以一旦p1、p2结束了,证明消费者也没必要存在了,应该随着主进程一块死掉,因而需要将生产者们设置成守护进程
print('主')
生产者消费者模型总结:
标签:tip 10个 range 创建 bind 访问 固定 str __name__
原文地址:https://www.cnblogs.com/daizongqi/p/11246619.html