标签:之间 场景 优雅 方式 xxx 文件中 code 单元 操作系统
上篇主要对多任务从生活上来认识, 同时引入对 进程 的认识, 即操作系统资源分配的基本单元. 然后通过对 并发, 并行 概念的认识, 去理解 任务调度. 然后用内置的 multiprocessing 模块来实现多任务的基本操作过程, 大致为:
创建多个任务 --> 为每个任务创建一个进程 --> 启动进程
注意参数的两种传递方式: args=(a,b, c...) 和 kwargs={‘a‘:123, ‘b‘: 456}
还留了一个问题, 关于多进程之前 不能共享全局变量, 而其实要解决这个问题, 就通过一个消息队列(Queue) 来实现消息的传递哦.
即多线程的一个默认特性是, 主进程会等待所有的子进程结束后, 才会结束哦.
import os
import time
import multiprocessing
def task_01(n):
for i in range(n):
print("task_01 is working...")
time.sleep(0.5)
if __name__ == '__main__':
# 创建子进程
p1 = multiprocessing.Process(target=task_01, args=(3,))
p1.start()
# 主进程等待
time.sleep(1)
print('主进程执行完毕...')
task_01 is working...
task_01 is working...
主进程执行完毕...
task_01 is working...
这里发现, 主进程是要等待所有的子进程都执行结束后, 才会真正结束哦,
即当主进程结束的时候, 杀掉所有的子进程, 即设置进程的属性 de
import os
import time
import multiprocessing
def task_01(n):
for i in range(n):
print("task_01 is working...")
time.sleep(0.5)
def task_02(n):
for i in range(n):
print('task_02 is working..')
time.sleep(0.2)
if __name__ == '__main__':
# 创建子进程
p1 = multiprocessing.Process(target=task_01, args=(3,))
p2 = multiprocessing.Process(target=task_02, args=(3,))
p1.daemon = True # 设置为守护主进程
# p2.daemon = True
p1.start()
p2.start()
# 主进程等待
time.sleep(1)
print('主进程执行完毕...')
# p2.terminate()
task_01 is working...
task_02 is working..
task_02 is working..
task_02 is working..
task_01 is working...
主进程执行完毕...
设置为守护主进程, 这样就比较灵活一下, 能适应各种业务场景.
用来解决, 进程之间的通信的哦.
import time
import multiprocessing
def add_data(queue):
for i in range(5):
if queue.full():
print("queue is full")
break
queue.put(i)
time.sleep(0.1)
print('add cur_data:', i)
def get_data(queue):
while True:
if queue.qsize() == 0:
print('queue is None')
break
print(f"get cur_data: {queue.get()}")
if __name__ == '__main__':
# 创建消息队列
queue = multiprocessing.Queue(3)
# 创建各自的进程
p_add = multiprocessing.Process(target=add_data, args=(queue,))
p_get = multiprocessing.Process(target=get_data, args=(queue,))
p_add.start()
p_add.join() # 主进程等 p_add 执行完后再执行
p_get.start()
add cur_data: 0
add cur_data: 1
add cur_data: 2
queue is full
get cur_data: 0
get cur_data: 1
get cur_data: 2
queue is None
之前的方式是, 可以为任务手动用 Process 来创建多个进程, 但当任务量是, 几千几万个是, 手动来弄, 似乎有手写难搞. 进程池就是基于这样的场景下, 自动根据任务数量而创建最合理的进程数, 这样不仅不仅场景变多了, 而且能合理利用资源, 更为重要一点, 我感觉是写代码上, 真的非常简洁优雅.
而基本的流程, 在我们初始化 Pool 时候, 可以手动指定一个最大的进程数 (超参数), 当新的请求提交到 Pool 的时候, 如果池没有满, 则创建一个新的进程来执行该请求;
而当池子是满的, 即池子中的进程数已经达到了最大值, 则 请求会自动等待, 直到池子中有空闲资源, 然后就该任务就加入进程池. 这感觉还挺智能的, 说实话.
任务一个接一个地完成. 即下一个任务要等上一个任务跑完后才能再继续执行哦.
同步 vs 异步
- 生活中, 同步, 多个任务一起跑;
- 编程中, 同步, 多个任务按顺序跑; 异步同时执行多任务, 不等待
import time
import multiprocessing
def send_email():
print("send_email ...", multiprocessing.current_process().pid)
time.sleep(1)
if __name__ == '__main__':
# 创建Pool 并设置最大进程数
pool = multiprocessing.Pool(3)
# 大批量任务哦
for i in range(6):
pool.apply(send_email)
# 同步执行任务,要等一个任务完,再执行另一个, 别混淆跟 多进程.
send_email ... 15348
send_email ... 14928
send_email ... 7156
send_email ... 15348
send_email ... 14928
send_email ... 7156
这厉害了, 就是多个任务同时执行, 且多进程, 这个效率就非常高呀.
import time
import multiprocessing
def send_email(n):
print("send_email ...", multiprocessing.current_process().pid)
time.sleep(1)
if __name__ == '__main__':
# 创建Pool 并设置最大进程数
pool = multiprocessing.Pool(3)
# 大批量任务哦
for i in range(6):
# apply_async: 异步, 执行顺序是Pool自己调度哦
pool.apply_async(send_email, args=(i, ))
pool.close() # 关闭进程池
pool.join() # 等待所有任务执行完成
send_email ... 8096
send_email ... 14520
send_email ... 9324
send_email ... 8096
send_email ... 14520
send_email ... 9324
我是感觉, 这个异步任务, 其顺序自己调度, 有点东西哦. 在工作中, 用异步的情景有, 之前有批量处理文件, 几千个嘛, 表字段差不多的, 需求是对每个文件进行判断, 里面的某个字段的值是否为 xxx . 在这 1000 个文件中, 先处理谁, 后处理谁是没有关系的, 因此就异步呗. 还有一个异步场景是, 爬虫 , 大批量的 url 等着被请求解析, 当然用异步啦.
同步任务. 目前我好像是很少用哦. 主要是我的一些数据处理, 分析的一些活吧, 大多都比较简单, 就读取个数据, 写写简单的 sql, 用 用pandas啥的, 就能满足了, 用的不是太多哦.
感觉多进程, 到这差不多了, 基本概念理清楚, 然后能用就行, 也是做个笔记, 主要是为方便后面 copy 代码而已, 嗯, 后面是可以添加一几个小案例. 多任务, 除了多进程, 还有多线程, 协程这样的概念, 一个个都会有的.
哦, 今天是2020年2月29日, 闰年哦, 希望4年后的今天, 我已经变成了数据大牛, 知识大牛. 也需个愿, 并坚持每日学习不怠.
标签:之间 场景 优雅 方式 xxx 文件中 code 单元 操作系统
原文地址:https://www.cnblogs.com/chenjieyouge/p/12386892.html