标签:spool pen 而在 ali count done 比较 异常处理 return
进程知识回顾
multiprocessing 处理进程
from multiprocessing import Process
开启子进程
p = Process(target=某个函数, args=(参数1,参数2))
p.start()
子进程和主进程
1.数据隔离
2.主进程等待子进程结束之后再结束
3.子进程和主进程之间默认是异步的
4.守护进程:在开启子进程之前可以设置子进程为守护进程,p.daemon = True
守护进程随着主进程的代码执行结束而结束
5.start/terminate 都是非阻塞的
锁——互斥锁, 保证数据安全
from multiprocessing import Lock
什么时候用到锁?
多个进程使用同一份数据资源的时候/操作文件、操作共享数据、操作数据库
acquire/release
队列
from multiprocessing import Queue
IPC进程之间的通信
使用队列的原因是它可以完成进程之间的消息传递
队列:在进程之间数据安全(进程安全)
队列:socket + pickle + 锁 == 管道 + 锁
因为没有锁,所以管道进程不安全
管道:socket + pickle
生产者与消费者模型
解决数据的获取与处理之间时间不对等的情况
能够自由的添加生产者与消费者的数量,以达到在相同的事件中,数据的处理速度能达到一个最大值
queue = Queue(5)
q.get()\q.put()
from multiprocessing import Manager
提供了很多能够实现数据共享的工具 dict/list
dict 能够在进程之间进行数据共享,导致进程不安全
不管是进程还是线程之间,进行 += -= 操作数据不安全,必须加锁
from multiprocessing import Pool
进程池
作用:
节省资源和操作系统的调度成本
一般情况下:开启的进程个数为 cpu个数的1-2倍之间
p = Pool() 如果括号里面不填,则默认为cpu的核数
p.map(func, iterable)/p.apply_async(func, args=(1,2,3))
进程池的特点:
同一时间最多有多少个进程能够同时执行任务与进程池中进程的个数有关系
p.apply_async 要与 close/join配合使用,保证主进程等待所有进程池中的任务执行完毕
ret = p.apply_async(...) ---> 在提交所有任务之后,从ret中获取结果(函数的返回值)也能够让主进程等待池中任务完成
回调函数callback参数
用来保证一个任务在执行完毕之后,立刻发出callback回调函数中的内容
并且子进程的函数返回值作为callback函数的参数
1.进程是一个开销比较大的机制
2.进程之间数据隔离
3.阻塞会影响进程的执行效率
4.过多的进程会给操作系统带来不小的压力
5.在有多个任务的时候要注意控制进程的数量——进程池
线程
1.线程理论
2.在Python中开启线程
模型
锁
队列
3.进程池
线程
什么是线程
为什么要有线程
进程与线程之间的区别
进程:计算机中最小的资源分配单位
线程:进程中的一员,同一个进程之间的几个线程共享一个进程的资源
但线程可以直接被CPU调度,因此线程是计算机中能被CPU调度的最小单位
比如一个qq是一个进程
两个好友可以同时给你发送消息,你可以同时接收,还可以和多个人聊天等等(并行)
比如视频app, 在线观看的同时可以缓存,这种情况下可以看作是两条线程
为什么要有线程:
线程的开启和销毁的速度都比进程要快
并且cpu在进程之间切换和在线程之间切换的效率,线程更高
进程与线程之间的区别:(当然,都是为了解决并发)
进程:数据隔离(不同的业务,应放在不同的进程中,比如qq与微信)
线程:数据共享、效率高
进程可以利用多个cpu
理论上,线程也能利用多个cpu
但是Python的线程是不能利用多核的
比如一个进程里有一个列表 l = []
进程里有三个线程,如果三个线程同时利用多个cpu给列表添加数字1
即三个线程都执行l.append(1)操作,它们很有可能一开始都认为这个列表是空的
那么结果很可能 l = [1],这样就造成数据丢失了
而如果不能利用多个cpu,即如果一个线程利用了一个cpu
其他的线程只能等它从cpu返回后再一个一个利用cpu
这样列表l就能每次都添加数字1,结果就是 l = [1, 1, 1]
全局解释器锁(GIL)——Cpython解释器设置的锁
早期python刚出现时,由于没有多核的概念,所以没有考虑给数据加锁这件事
后来有了多核的概念,而python是解释型语言,考虑到数据的安全及各种数据的记录
为了少考虑数据的安全行,便设置了全局解释器锁
这个锁导致了同一个进程之间的多个线程同一时刻只能有一个线程访问cpu
线程即然不能同时利用多个cpu,它的影响有:
cpu主要是用来做计算的
程序中除了计算还有IO操作
网络延迟的本质是IO操作
from threading import Thread def func(): print("我是子线程") t = Thread(target=func) t.start() print("我是主线程") # 我是子线程 # 我是主线程 # 运行结果与进程的相反,进程的是先打印主进程结果,再打印子进程
import os import time from threading import Thread def func(): time.sleep(1) print("我是子线程", os.getpid()) for i in range(10): t = Thread(target=func) t.start() print("我是主线程") # 我是主线程 # 我是子线程 92 # 我是子线程 92 # 我是子线程 92 # 我是子线程 92 # 我是子线程 92 # 我是子线程 92 # 我是子线程 92 # 我是子线程 92 # 我是子线程 92 # 我是子线程 92 # 这段代码可以说明以下几个问题: # 1.线程只能利用单个cpu(通过os.getpid())可证明 # 2.子线程的10次打印是一次性全部出来的,说明线程效率很高 # 线程也有三个状态:就绪 阻塞 运行 # 运行是在cpu里面的,全局锁也是针对这里。 # 第一个子线程去cpu里面运行的时候,很快会到阻塞状态,在这个状态里要sleep 1s # 而在这个1s时间里,第一个线程已经从cpu返回,然后第二个、第三个...又依次去cpu # 然后返回,速度非常快,1s内10个子线程已经都到阻塞状态了 # 所以运行发现10个子线程几乎同一时间内打印出结果 # 程序中IO操作是不占用全局解释器锁和cpu的
# 线程传参 # join # 守护线程 # 数据共享 # 开启线程的第二种方式 # 线程传参 import time # 为了看并发效果 from threading import Thread def func(i): time.sleep(1) print("我是子线程%s" % i) for i in range(10): t = Thread(target=func, args=(i,)) t.start() # 我是子线程0 # 我是子线程3 # 我是子线程2 # 我是子线程1 # 我是子线程4 # 我是子线程7 # 我是子线程8 # 我是子线程6 # 我是子线程5 # 我是子线程9 # 运行发现是异步的
# join import time # 为了看并发效果 from threading import Thread def func(i): time.sleep(1) print("我是子线程%s" % i) t_lst = [] for i in range(10): t = Thread(target=func, args=(i,)) t.start() t_lst.append(t) for t in t_lst: t.join() # 阻塞,直到子线程中的代码执行 print("所有的线程都执行完了") # 我是子线程1 # 我是子线程0 # 我是子线程2 # 我是子线程4 # 我是子线程5 # 我是子线程3 # 我是子线程9 # 我是子线程8 # 我是子线程6 # 我是子线程7 # 所有的线程都执行完了
# 数据共享 from threading import Thread n = 100 def func(): global n n -= 1 t_lst = [] for i in range(100): t = Thread(target=func) t.start() t_lst.append(t) for t in t_lst: t.join() print(n) # 0 # 运行速度非常快
# 守护进程 import time from threading import Thread def main(): print("主线程开始运行") time.sleep(3) print("主线程运行结束") def daemon_func(): while 1: time.sleep(1) print("守护线程") t = Thread(target=daemon_func) t.setDaemon(True) t.start() main() # 主线程开始运行 # 守护线程 # 守护线程 # 主线程运行结束 import time from threading import Thread def main(): print("主线程开始运行") time.sleep(3) print("主线程运行结束") def daemon_func(): while 1: time.sleep(1) print("守护线程") def thread_son(): print("子线程开始运行") time.sleep(5) print("子线程运行结束") t = Thread(target=daemon_func) t.setDaemon(True) t.start() Thread(target=thread_son).start() # 子线程 main() # 直接调用就表示是主线程,这里必须放最下面 # 否则主线程在上面先运行的话,就不会实现异步了,相当于没有子线程的事 # 子线程开始运行 # 主线程开始运行 # 守护线程 # 守护线程 # 主线程运行结束 # 守护线程 # 守护线程 # 子线程运行结束 # 注意,上面的Thread(target=thread_son).start()如果改为 # Thread(target=thread_son()).start() # 那么会先执行 thread_son(),运行结果就会是同步效果了 # 运行发现守护线程和守护进程不同 # 守护线程会守护主线程直到主线程结束 # 如果主线程要等待其他子线程,那么守护线程在这段时间中仍然发挥守护作用
# 开启线程的第二种方式及查看线程id的第一种方式 from threading import Thread class MyThread(Thread): def run(self): print("子线程", self.ident) for i in range(10): t = MyThread() t.start() # 子线程 8408 # 子线程 10776 # 子线程 2356 # 子线程 6208 # 子线程 896 # 子线程 5676 # 子线程 1956 # 子线程 12880 # 子线程 7776 # 子线程 6936 # 查看线程id的第二种方法 import time from threading import Thread, currentThread, enumerate, active_count def func(i): time.sleep(1) print("我是子线程%s" % i, currentThread().ident) for i in range(10): t = Thread(target=func, args=(i, )) t.start() print(enumerate()) print(active_count()) # [<_MainThread(MainThread, started 9640)>, <Thread(Thread-1, started 6404)>, # <Thread(Thread-2, started 9452)>, <Thread(Thread-3, started 12116)>, # <Thread(Thread-4, started 10412)>, <Thread(Thread-5, started 7680)>, # <Thread(Thread-6, started 2268)>, <Thread(Thread-7, started 2360)>, # <Thread(Thread-8, started 8788)>, <Thread(Thread-9, started 6300)>, # <Thread(Thread-10, started 5756)>] # 11 # 我是子线程1 9452 # 我是子线程0 6404 # 我是子线程2 12116 # 我是子线程5 2268 # 我是子线程3 10412 # 我是子线程7 8788 # 我是子线程4 7680 # 我是子线程6 2360 # 我是子线程8 6300 # 我是子线程9 5756
# 用多线程实现socket server 基于tcp的并发 # server.py import socket from threading import Thread sk = socket.socket() sk.bind(("127.0.0.1", 8080)) sk.listen() def talk(conn): while 1: conn.send("我会一直向客户端发送这个信息".encode()) while 1: conn, addr = sk.accept() Thread(target=talk, args=(conn, )).start() # 因为有 accept、send等各种阻塞,因此实现socket server 基于tcp的并发 # 只用多线程来实现即可,因为运行效率快。 # 可创建多个一样的客户端同时执行 # client.py import socket sk = socket.socket() sk.connect(("127.0.0.1", 8080)) while True: msg = sk.recv(1024) print(msg.decode()) # 我会一直向客户端发送这个信息 # 我会一直向客户端发送这个信息 # 我会一直向客户端发送这个信息 # ...
# 线程锁 # 线程队列 # 池的概念 # 线程的Lock锁 # GIL:从一定程度上保证了数据的安全,但对于数据的 += -= *= /= 操作无法保证 import time from threading import Thread n = 0 def func(): global n tmp = n time.sleep(0.1) # 延迟,相当于时间片轮转 n = tmp + 1 t_lst = [] for i in range(100): t = Thread(target=func) t.start() t_lst.append(t) for t in t_lst: t.join() print(n) # 1 # 如果不加上时间延迟,结果就是100 # 为了保证数据安全,应该加锁 import time from threading import Thread, Lock # 互斥锁 n = 0 def func(lock): global n with lock: n += 1 lock = Lock() t_lst = [] for i in range(100): t = Thread(target=func, args=(lock, )) t.start() t_lst.append(t) for t in t_lst: t.join() print(n) # 100 # 1.如果没有多个线程操作同一变量的时候,就可以不用加锁(在这里就是不使用全局变量) # 因此写程序代码时,为了保证数据的安全,尽量不要使用全局变量 # 2.如果是执行基础数据类型的内置方法,都是线程安全的 # list.append, list.pop, list.extend, list.remove, dic.get["key"]等
# 科学家吃面问题 from threading import Lock, Thread import time noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print("%s拿到面条了" % name) fork_lock.acquire() print("%s拿到叉子了" % name) print("%s开始吃面了" % name) time.sleep(0.2) fork_lock.release() print("%s将叉子放回" % name) noodle_lock.release() print("%s将面条放回" % name) def eat2(name): fork_lock.acquire() print("%s拿到叉子了" % name) noodle_lock.acquire() print("%s拿到面条了" % name) print("%s开始吃面了" % name) time.sleep(0.2) noodle_lock.release() print("%s将面条放回" % name) fork_lock.release() print("%s将叉子放回" % name) Thread(target=eat1, args=("alex", )).start() Thread(target=eat2, args=("wusir", )).start() Thread(target=eat1, args=("taibai", )).start() Thread(target=eat2, args=("pengpeng", )).start() # alex拿到面条了 # alex拿到叉子了 # alex开始吃面了 # alex将叉子放回 # alex将面条放回 # wusir拿到叉子了 # taibai拿到面条了 # 运行发现程序没有停止,而是卡在某一点,仔细研究逻辑就能得出问题所在 # 这就是死锁现象 # 死锁现象 # 形成该现象的本质原因: # 两个锁,锁了两个资源,我要做某件事,需要同时拿到这两个资源 # 递归锁 from threading import Thread, RLock # 递归锁 import time noodle_lock = fork_lock = RLock() def eat1(name): noodle_lock.acquire() print("%s拿到面条了" % name) fork_lock.acquire() print("%s拿到叉子了" % name) print("%s开始吃面了" % name) time.sleep(0.2) fork_lock.release() print("%s将叉子放回" % name) noodle_lock.release() print("%s将面条放回" % name) def eat2(name): fork_lock.acquire() print("%s拿到叉子了" % name) noodle_lock.acquire() print("%s拿到面条了" % name) print("%s开始吃面了" % name) time.sleep(0.2) noodle_lock.release() print("%s将面条放回" % name) fork_lock.release() print("%s将叉子放回" % name) Thread(target=eat1, args=("alex", )).start() Thread(target=eat2, args=("wusir", )).start() Thread(target=eat1, args=("taibai", )).start() Thread(target=eat2, args=("pengpeng", )).start() # alex拿到面条了 # alex拿到叉子了 # alex开始吃面了 # alex将叉子放回 # alex将面条放回 # wusir拿到叉子了 # wusir拿到面条了 # wusir开始吃面了 # wusir将面条放回 # wusir将叉子放回 # taibai拿到面条了 # taibai拿到叉子了 # taibai开始吃面了 # taibai将叉子放回 # taibai将面条放回 # pengpeng拿到叉子了 # pengpeng拿到面条了 # pengpeng开始吃面了 # pengpeng将面条放回 # pengpeng将叉子放回
# 单线程演示 from threading import Lock lock = Lock() lock.acquire() print(1) lock.acquire() print(2) # 1 然后程序没终止 # 递归锁可以快速解决死锁问题,不好的地方是占用资源 # 不用递归锁的解决方案 from threading import Lock, Thread import time lock = Lock() def eat1(name): lock.acquire() print("%s拿到面条了" % name) print("%s拿到叉子了" % name) print("%s开始吃面了" % name) time.sleep(0.2) print("%s将叉子放回" % name) print("%s将面条放回" % name) lock.release() def eat2(name): lock.acquire() print("%s拿到叉子了" % name) print("%s拿到面条了" % name) print("%s开始吃面了" % name) time.sleep(0.2) print("%s将叉子放回" % name) lock.release() Thread(target=eat1, args=("alex", )).start() Thread(target=eat2, args=("wusir", )).start() Thread(target=eat1, args=("taibai", )).start() Thread(target=eat2, args=("pengpeng", )).start() # alex拿到面条了 # alex拿到叉子了 # alex开始吃面了 # alex将叉子放回 # alex将面条放回 # wusir拿到叉子了 # wusir拿到面条了 # wusir开始吃面了 # wusir将叉子放回 # taibai拿到面条了 # taibai拿到叉子了 # taibai开始吃面了 # taibai将叉子放回 # taibai将面条放回 # pengpeng拿到叉子了 # pengpeng拿到面条了 # pengpeng开始吃面了 # pengpeng将叉子放回 # 死锁现象——使用了多把锁在一个线程内进行了多次Acquire导致了不可恢复的阻塞 # 形成原因——两个锁锁了两个资源,要做某件事需要同时拿到这两个资源,多个线程同时执行这个步骤 # 递归锁、互斥锁: # 递归锁——不容易发生死锁现象 # 互斥锁——使用不当容易发生死锁 # 递归锁可以快速帮我们解决死锁问题 # 死锁的真正问题不在于互斥锁,而在于对互斥锁的混乱使用 # 要想真正的解决死锁问题,还是要找出互斥锁的问题进行修正才能解决根本问题
# 为什么线程之间要有队列,它不像进程,进程是需要数据共享时才用到队列 # 但是线程之间用队列是为了在多个线程之间维持一个数据先后的秩序 # 线程模块的队列是线程之间数据安全的 import queue q = queue.Queue() # q.put() —— 队列满时会阻塞 # q.get() —— 队列空时会阻塞 q.put(1) print(q.get_nowait()) # 1 # 异常情况 import queue q = queue.Queue() print(q.get_nowait()) # 报错,raise Empty ---> queue.Empty # 因此进行异常处理 import queue try: print(q.get_nowait()) # 在队列为空时也不阻塞,这时会抛异常 except queue.Empty: pass import queue q = queue.Queue(3) try: print(q.get_nowait()) except queue.Empty: pass try: q.put_nowait(1) except queue.Full: # 这样会造成数据丢失 pass # q.qsize() # 当前队列中有多少个值 # q.empty() # 当前队列是否为空 # q.full() # 当前队列是否为满 # 这三个没什么意义,不够准确 # 比如线程队列中有一个进程使用q.empty()询问队列是否为空,这个时候队列是空的,所以返回信息是空的 # 但是在这个信息返回到这个线程之前,又有一个线程往队列里添加东西了,那么队列不为空了,但是之后第一个线程却以为队列是空的 # 其他两个的道理一样
# 队列——先进先出 # 栈——后进先出 # 堆 # 树 # 栈 import queue q = queue.LifoQueue() q.put("a") q.put("b") q.put("c") print(q.get()) print(q.get()) print(q.get()) # c # b # a # 栈——一般只用在算法中 # 比如 2*3+4/5*6 在一个栈中是这样排列的(从上往下): # a = 2*3 # c = 4/5 # b = c*6 # a+b
# 优先级队列 import queue q = queue.PriorityQueue() q.put((2, "a")) q.put((1, "b")) q.put((3, "ac")) print(q.get()) print(q.get()) print(q.get()) # (1, ‘b‘) # (2, ‘a‘) # (3, ‘ac‘) # 元组的第一个元素的数值越小,越先取出来(按ascii码的值) # 如果第一个元素一样,则比较第二个元素的ascii码的值 # 总结:一共有三个队列 # queue——一般的队列,先进先出 # queue.LifoQueue()——后进先出 # queue.PriorityQueue()——优先级队列
# 线程池 import time from threading import currentThread from concurrent.futures import ThreadPoolExecutor def func(i): time.sleep(1) print(‘子线程%s‘ % i,currentThread().ident) tp = ThreadPoolExecutor(5) for i in range(20): tp.submit(func,i) tp.shutdown() import time import os from concurrent.futures import ProcessPoolExecutor def func(i): time.sleep(1) print(‘子进程%s‘%i,os.getpid()) if __name__ == ‘__main__‘: tp = ProcessPoolExecutor(5) for i in range(20): tp.submit(func,i) tp.shutdown() import time from threading import currentThread from concurrent.futures import ThreadPoolExecutor def func(i): time.sleep(1) print(‘子线程%s‘ % i,currentThread().ident) tp = ThreadPoolExecutor(5) tp.map(func,range(20)) # 使用result获取子线程中的返回值 import time from threading import currentThread from concurrent.futures import ThreadPoolExecutor def func(i): time.sleep(1) print(‘子线程%s‘ % i,currentThread().ident) return i**2 tp = ThreadPoolExecutor(5) ret_l = [] for i in range(20): ret = tp.submit(func,i) ret_l.append(ret) for ret in ret_l: print(ret.result()) # 使用回调函数来处理子线程中代码的执行结果 import time from threading import currentThread from concurrent.futures import ThreadPoolExecutor def func(i): time.sleep(1) print(‘子线程%s‘ % i,currentThread().ident) return i**2 def callback(ret): print(ret.result()) tp = ThreadPoolExecutor(5) for i in range(20): tp.submit(func,i).add_done_callback(callback) tp.shutdown()
标签:spool pen 而在 ali count done 比较 异常处理 return
原文地址:https://www.cnblogs.com/shawnhuang/p/10331628.html