标签:threading tip 运行 解决办法 set 不用 closed 非阻塞 解释器
概念:指的是一条流水线的工作过程的总称,是一个抽象的概念,是CPU基本执行单位。
进程和线程之间的区别:
1. 进程仅仅是一个资源单位,其中包含程序运行所需的资源,而线程就相当于车间的流水线,负责执行具代码。
2. 每个进程至少包含一个线程,由操作系统自动创建,称之为主线程
3. 每个进程可以有任意数量的线程
4.创建进程的开销要比创建进程小得多
5. 同一进程的线程间数据是共享的
6.线程之间是平等的,没有子父级关系,同一进程下的各线程的PID相同
7. 创建线程的代码可以写在任意位置,不一定非要在main函数下。
为什么使用线程:
提高程序执行效率
和进程类似,但是开启方式不一定非要建在main函数下。
# 第一种方式,实例化 Thread # from threading import Thread # # def task(): # print("subthread is running....") # # t = Thread(target=task) # t.start() # print(‘main is over....‘) # 第二种方式,继承Thread类 from threading import Thread class MyThread(Thread): def run(self): print("subthread is running....")
1. 主线程任务执行完毕后,主线程会等待所有子线程全部执行完毕后结束
2. 在同一进程中,所有线程都是平等的,没有子父级关系
# 验证主线程代码执行完后会不会立即结束, import random import time import threading from threading import Thread def task(name): print("%s is running..." % name) time.sleep(random.randint(1, 3)) print(threading.enumerate()) print("%s is over....." % name) t = Thread(target=task, args=(‘aaa‘,)) t.start() print(‘main over....‘)
from threading import Thread import time def task(): global num time.sleep(1) num -= 1 num = 10 t = Thread(target=task,) t.start() t.join() print(num)
from multiprocessing import Process from threading import Thread import time def task(): pass def expense(cls): """用来测试线程或进程创建开销""" lis = [] start = time.time() for i in range(50): p = cls(target=task,) p.start() lis.append(p) for p in lis: p.join() return time.time()-start
数据共享必然会造成竞争,竞争就会造成数据错乱问题。
解决办法:和进程一样,加互斥锁。
from threading import Thread, Lock import time num = 10 def task(lock): global num lock.acquire() a = num time.sleep(0.5) num = a-1 lock.release() ts = [] lock = Lock() for i in range(10): t = Thread(target=task,args=(lock,)) t.start() ts.append(t) for t in ts: t.join() print(num)
死锁不是一种锁,而是一种锁的状态,
一般出现死锁的情况有两种:
1. 对同一把锁多次acquire.(使用RLOCK锁,代替LOCK)
2. 两个或两个以上的进程或线程在执行过程中,因争夺资源造成的相互等待现象。(解决办法:能不加最好不加,要加就只加一把)
from threading import Thread, Lock import time def task1(name, locka, lockb): locka.acquire() print("%s拿到a锁"%name) time.sleep(0.3) lockb.acquire() print(‘%s拿到b锁‘%name) lockb.release() locka.release() def task2(name, locka, lockb): lockb.acquire() print("%s拿到b锁"%name) time.sleep(0.3) locka.acquire() print(‘%s拿到a锁‘%name) locka.release() lockb.release() locka = Lock() lockb = Lock() t1 = Thread(target=task1, args=(‘t1‘, locka, lockb)) t2 = Thread(target=task2, args=(‘t2‘, locka, lockb)) t1.start() t2.start()
只能解决同一线程多次执行acquire情况。
只有一个线程所有的acquire都被释放,其他线程才能拿到这个锁。
也会发生死锁现象。
from threading import Thread, RLock lock = RLock() lock.acquire() lock.acquire() lock.acquire() lock.acquire() print("over") lock = RLock() def task1(): lock.acquire() print(‘task1‘) def task2(): lock.acquire() print(‘task2‘) Thread(target=task1).start() Thread(target=task2).start()
也是一种锁,用来控制同一时间,有多少线程可以提供并发访问,不是用来处理线程安全问题
from threading import Semaphore, Thread import time s_lock = Semaphore(3) def task(): s_lock.acquire() time.sleep(1) print("run.....") s_lock.release() for i in range(20): t = Thread(target=task) t.start()
守护线程在所有非守护线程结束后结束。
import threading from threading import Thread import time def task1(): print(‘thread-1 is running...‘) time.sleep(3) print(‘thread-1 over....‘) def task2(): print(‘thread-2 is running...‘) time.sleep(1) print(‘thread-2 over....‘) if __name__ == ‘__main__‘: t1 = Thread(target=task1,) t2 = Thread(target=task2,) t1.setDaemon(True) t1.start() t2.start() print(t1.ident) print(threading.enumerate()) print("main over...")
全局解释器锁,是一互斥锁,只有在Cpython解释器存在。
为什么需要:因为一个python.exe进行运行只有一份解释器,如果这个进程开启的多个线程都要执行代码,多线程之间就要竞争解释器,一旦竞争就有可能出现问题。
带来的好处:保证了多线程同时访问解释器时的数据安全问题。
带来的问题:同一时间只有一个线程访问解释器,使得多线程无法真正的并发
出现的原因:默认情况下,一个进程只有一个线程不会是不会出问题,但不要忘了还有GC线程,一旦出现多个线程就可能出现问题,所以当初就简单粗暴的加上了GIL锁
GIL加锁和解锁时机:
加锁:在调用解释器时立即加锁
解锁:当前线程遇到IO时释放,或者当前线程执行超过设定值释放(py2计算的是执行代码的行数,py3中计算的是时间)
解决办法:使用多进程或使用其他的python解释器
一种容器,本质十一存储线程或进程的列表
为什么使用? 因为服务器不能无限开启线程或进程,所以需要对线程数量加以控制,线程池就是帮我们完成线程/进程的创建、销毁以及任务分配
特点:
线程池在创建时不会开启线程,
等到任务提交时,如果没有空闲线程,并且已存在的线程数量小于最大值,开启新线程,
线程开启后不会关闭,直到进程全部结束为止
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor pool= ProcessPoolExecutor(maxsize),创建进程池,maxsize为最大进程个数 res = pool.submit(task, ‘a‘), 提交任务 res.result(timeout),接收调用的返回值,timeout为超时时间,超时报错 该函数是阻塞函数,会一直等待任务执行完毕 pool.shutdown(wait),所有任务执行完毕,阻塞函数 wait=True, 等待池内所有任务执行完毕后回收资源才继续 wait=False,立即返回,并不会等待池内的任务执行完毕
from concurrent.futures import ThreadPoolExecutor import time def task(num): time.sleep(0.5) print("%s is running....."%num) return num**2 pool = ThreadPoolExecutor() ress = [] for i in range(10): res = pool.submit(task, i) ress.append(res) pool.shutdown(wait=False) for i in ress: print(i.result()) print(‘over‘)
阻塞和非阻塞都是指程序的运行状态
阻塞:当程序执行遇到IO操作,无法继续执行代码
非阻塞:程序执行没有遇到IO操作,或通过某种方式,使程序遇到了也不会停在原地,还可以继续执行
同步异步指的是提交任务的方式
同步:发起任务后必须原地等待任务执行完成,才可以继续执行
异步:发起任务后不用等待任务执行,可以立即执行其他操作
异步效率高于同步,发起异步任务方式:就是多线程和多进程
同步和阻塞的不同:阻塞一定使CPU已经切换,同步虽然在等待,但CPU没有切走,还在当前进程中执行其他任务
标签:threading tip 运行 解决办法 set 不用 closed 非阻塞 解释器
原文地址:https://www.cnblogs.com/ywsun/p/10493770.html