标签:开始 div 单位 信号量 不可 while 线程同步 strftime sum
导航
1、Thread类
2、线程同步
3、threading.Condition
4、threading.Event
5、threading.Semaphore 信号量
6、queue模块,线程队列
线程是cpu运行的最小单位,没有自己的内存空间,同一线程的多线程共享一个内存空间,同一线程下的多线程都可以访问全局变量,对全局变量进行操作时要注意同步问题。
threading模块是建立在 _thread 模块上的,对其进行封装,包含较多的功能。
1、Thread类
threading.Thread(group=None, target=None, name=None, args=(), kwargs=None, daemon=None)
1)创建多线程的方式
创建多线程主要有两种方式,①通过Thread类直接创建子线程,并指定子线程需要执行的函数逻辑。②通过继承Thread类创建实例对象,重写run方法,开启子线程
1 from threading import Thread 2 3 4 def fn(i): 5 # 子线程逻辑 6 print("--子线程--", i) 7 8 9 if __name__ == "__main__": 10 t = Thread(target=fn, args=(100,)) 11 t.start() 12 print("--主线程--") 13 14 15 # 输出结果 16 --子线程-- 100 17 --主线程--
1 from threading import Thread 2 3 4 class MyThread(Thread): 5 def __init__(self, name): 6 super().__init__() 7 self.name = name 8 9 def run(self): 10 # 子线程逻辑 11 print("--子线程--", self.name) 12 13 14 if __name__ == "__main__": 15 t = MyThread("my_thread_01") 16 t.start() 17 print("--主线程--") 18 19 20 # 输出结果 21 --子线程-- my_thread_01 22 --主线程--
2)下面来看一下使用多线程与不使用多线程的区别
1 from threading import Thread 2 import time 3 4 5 def fn(i): 6 # 子线程逻辑 7 print("+++", i) 8 time.sleep(1) 9 10 11 if __name__ == "__main__": 12 start_time = time.time() 13 t_l = [] 14 for i in range(5): 15 t = Thread(target=fn, args=(i,)) 16 t.start() 17 t_l.append(t) 18 for i in range(5): 19 t.join() 20 print("运行所需时间:", time.time() - start_time) 21 22 23 # 输出结果 24 +++ 0 25 +++ 1 26 +++ 2 27 +++ 3 28 +++ 4 29 运行所需时间: 1.0024588108062744
1 import time 2 3 4 def fn(i): 5 # 子线程逻辑 6 print("+++", i) 7 time.sleep(1) 8 9 10 if __name__ == "__main__": 11 start_time = time.time() 12 for i in range(5): 13 fn(i) 14 print("运行所需时间:", time.time() - start_time) 15 16 17 # 输出结果 18 +++ 0 19 +++ 1 20 +++ 2 21 +++ 3 22 +++ 4 23 运行所需时间: 5.0021538734436035
可以看到同样的需要执行5次fn函数的逻辑,使用多线程的效率却比单线程的要高
3)daemon守护线程的设置
设置守护线程有有两种方法 ① t.daemon = True 直接设置属性 ② t.setDaemon(True) 通过方法设置。
当子线程设置为守护线程是,主线程结束时程序将不再等待子线程是否执行完毕直接退出程序。如下边子线程设置为守护线程后,主线程结束了,子线程也将跟着结束,并未输出子线程的语句
1 from threading import Thread 2 import time 3 4 5 def fn(i): 6 # 子线程逻辑 7 time.sleep(1) 8 print("+++", i) 9 10 11 if __name__ == "__main__": 12 for i in range(5): 13 t = Thread(target=fn, args=(i,)) 14 t.setDaemon(True) 15 t.start() 16 print("---end---") 17 18 19 # 输出结果 20 ---end---
4)join(timeout=None)
join方法堵塞当前上下文环境的线程,直到调用join方法的子线程执行结束后当前线程才继续执行。在哪里调用就在哪堵塞,最多堵塞timeout秒
1 from threading import Thread 2 import time 3 4 5 def fn(): 6 # 子线程逻辑 7 time.sleep(5) 8 9 10 if __name__ == "__main__": 11 start_time = time.time() 12 t = Thread(target=fn) 13 t.start() 14 t.join() 15 print("所需时间-->", time.time() - start_time) 16 17 18 # 输出结果 19 所需时间--> 5.001126527786255
5)is_alive(),返回子线程存活状态
is_alive方法判断指定对象线程的存活状态。线程 start 后一直到该线程结束都返回True。线程 start 前 或线程已经结束返回False
2、线程同步
当多个线程的同时对一个变量进行操作时便会存在着安全的隐患问题,
如下:有2个线程同时对全局变量进行 + 1,000,000 为什么结果却不是 2,000,000。这是因为当线程一执行完11行时还未开始执行12行,全局变量a没有进行 +1,就已经切换了线程二执行;线程二从10行开始执行,取到的值依旧是还没有 +1 的值,执行完12行,切换回线程一;线程一继续上次的断点执行12行,这时就相当于两次线程总共才完成了一次 +1 操作。多次循环如此,那么全局变量最后的结果就不可能是2,000,000。这里为了看到明显的效果,我将 a += 1 变成了三条语句来实现
1 from threading import Thread 2 3 4 a = 0 5 6 7 def fn(): 8 global a 9 for i in range(1000000): 10 t = a 11 t += 1 12 a = t 13 14 15 if __name__ == "__main__": 16 t_l = [] 17 for i in range(2): 18 t = Thread(target=fn) 19 t_l.append(t) 20 t.start() 21 for t in t_l: 22 t.join() 23 print("全局变量 a-->", a) 24 25 26 # 输出结果 27 全局变量 a--> 1332605
1 from threading import Thread 2 3 def fn(): 4 for i in range(5): 5 print("1234") 6 7 8 for i in range(2): 9 t = Thread(target=fn) 10 t.start() 11 12 13 # 输出结果1234 14 1234 15 12341234 16 17 1234 18 1234 19 1234 20 12341234 21 22 1234
经过这个例子,就可以看出当多线程对共享的资源进行修改是就涉及到同步问题了,我们希望同一时刻只有一个线程t执行某段逻辑,其它线程必须等待这个线程t执行完才能执行。
1)threading.Lock()
Lock线程锁,在需要进行同步的语句块加上可以确保线程间的同步问题
1 from threading import Thread, Lock 2 3 4 def fn(): 5 global a 6 for i in range(1000000): 7 lock.acquire() 8 t = a 9 t += 1 10 a = t 11 lock.release() 12 13 14 a = 0 15 t_l = [] 16 lock = Lock() 17 for i in range(2): 18 t = Thread(target=fn) 19 t_l.append(t) 20 t.start() 21 for t in t_l: 22 t.join() 23 print("全局变量 a-->", a) 24 25 26 # 全局变量 27 全局变量 a--> 2000000
2)死锁
关于死锁的概念我就不再叙述,就简单的说说下边的代码,同时开启两个线程,线程t1中执行到8行,线程t2执行到20行的时候,t1发现锁lock2没有被释放便会在这一直等待,t2发现锁lock1没有被释放也在这一直等待,这样谁也不会先释放锁,就会造成死锁的现象,程序一直卡着。
1 from threading import Thread, Lock 2 import time 3 4 def fn1(): 5 lock1.acquire() 6 print("fn1-->lock1.acquire") 7 time.sleep(1) # 确保另一个线程已经执行fn2函数且lock2已经锁上 8 lock2.acquire() 9 print("fn1-->lock2.acquire") 10 lock2.release() 11 print("fn1-->lock2.release") 12 lock1.release() 13 print("fn1-->lock1.release") 14 15 16 def fn2(): 17 lock2.acquire() 18 print("fn1-->lock2.acquire") 19 time.sleep(1) # 确保另一个线程已经执行fn1函数且lock1已经锁上 20 lock1.acquire() 21 print("fn1-->lock1.acquire") 22 lock1.release() 23 print("fn1-->lock1.release") 24 lock2.release() 25 print("fn1-->lock2.release") 26 27 28 lock1 = Lock() 29 lock2 = Lock() 30 t1 = Thread(target=fn1) 31 t2 = Thread(target=fn2) 32 t1.start() 33 t2.start() 34 t1.join() 35 t2.join() 36 print("----end----") 37 38 39 # 输出结果 40 fn1-->lock1.acquire 41 fn1-->lock2.acquire
3、threading.Condition
1 from threading import Thread, Condition 2 import time 3 4 5 def producer(): 6 global cdt 7 while True: 8 time.sleep(1) 9 cdt.acquire() 10 print("+++++ 1个包子") 11 cdt.notify() 12 cdt.release() 13 14 15 def consumer(): 16 global cdt 17 while True: 18 cdt.acquire() 19 cdt.wait() 20 print("----- 1个包子, 时间:", time.strftime("%X")) 21 # cdt.release() 22 23 24 cdt = Condition() 25 prd = Thread(target=producer) 26 csm = Thread(target=consumer) 27 prd.start() 28 csm.start() 29 prd.join() 30 csm.join() 31 print("----end----") 32 33 34 # 输出结果 35 +++++ 1个包子 36 ----- 1个包子, 时间: 10:05:30 37 +++++ 1个包子 38 ----- 1个包子, 时间: 10:05:31 39 +++++ 1个包子 40 ----- 1个包子, 时间: 10:05:32 41 +++++ 1个包子 42 ----- 1个包子, 时间: 10:05:33 43 +++++ 1个包子 44 ----- 1个包子, 时间: 10:05:34
1 from threading import Thread, Condition 2 import time 3 4 5 def producer(): 6 global cdt 7 while True: 8 time.sleep(1) 9 cdt.acquire() 10 print("+++++ 1个包子") 11 cdt.notify() 12 cdt.release() 13 14 15 def consumer(name): 16 global cdt 17 while True: 18 cdt.acquire() 19 cdt.wait() 20 print("消费者:%s----- 1个包子, 时间:%s" % (name, time.strftime("%X"))) 21 # cdt.release() 22 23 24 cdt = Condition() 25 prd = Thread(target=producer) 26 prd.start() 27 for i in range(5): 28 csm = Thread(target=consumer, args=(i,)) 29 csm.start() 30 31 32 # 输出结果 33 +++++ 1个包子 34 消费者:0----- 1个包子, 时间:10:7:07 35 +++++ 1个包子 36 消费者:1----- 1个包子, 时间:10:7:08 37 +++++ 1个包子 38 消费者:2----- 1个包子, 时间:10:7:09 39 +++++ 1个包子 40 消费者:3----- 1个包子, 时间:10:7:10 41 +++++ 1个包子 42 消费者:4----- 1个包子, 时间:10:7:11 43 +++++ 1个包子 44 消费者:0----- 1个包子, 时间:10:7:12 45 +++++ 1个包子 46 消费者:1----- 1个包子, 时间:10:7:13 47 +++++ 1个包子 48 消费者:2----- 1个包子, 时间:10:7:14 49 +++++ 1个包子 50 消费者:3----- 1个包子, 时间:10:7:15 51 +++++ 1个包子 52 消费者:4----- 1个包子, 时间:10:7:16
4、threading.Event
Event内部定义了一个flag,当flag=Flase时,wait()将会一直堵塞线程;flag=True时,wait()不会堵塞线程,继续向下执行,这时的wait()相当与pass语句
1 from threading import Thread, Event 2 import time 3 4 5 def producer(): 6 global eve 7 while True: 8 time.sleep(1) 9 print("+++++ 1个包子") 10 eve.set() 11 12 13 def consumer(): 14 global eve 15 while True: 16 eve.wait() 17 print("----- 1个包子, 时间:%s" % (time.strftime("%X"))) 18 eve.clear() 19 20 21 eve = Event() 22 prd = Thread(target=producer) 23 csm = Thread(target=consumer) 24 prd.start() 25 csm.start() 26 27 28 # 输出结果 29 +++++ 1个包子 30 ----- 1个包子, 时间:10:12:00 31 +++++ 1个包子 32 ----- 1个包子, 时间:10:12:01 33 +++++ 1个包子 34 ----- 1个包子, 时间:10:12:02 35 +++++ 1个包子 36 ----- 1个包子, 时间:10:12:03 37 +++++ 1个包子 38 ----- 1个包子, 时间:10:12:04
1 from threading import Thread, Event 2 import time 3 4 5 def producer(): 6 global eve 7 while True: 8 time.sleep(1) 9 print("+++++ 3个包子") 10 eve.set() 11 12 13 def consumer(name): 14 global eve 15 while True: 16 eve.wait() # 在此堵塞,直到eve.set() 17 print("消费者:%s----- 1个包子, 时间:%s" % (name, time.strftime("%X"))) 18 eve.clear() # 将标识清除,需重新eve.set() 19 20 21 eve = Event() 22 prd = Thread(target=producer) 23 prd.start() 24 for i in range(5): 25 csm = Thread(target=consumer, args=(i,)) 26 csm.start() 27 28 29 # 输出结果 30 +++++ 3个包子 31 消费者:2----- 1个包子, 时间:10:10:25 32 消费者:1----- 1个包子, 时间:10:10:25 33 消费者:0----- 1个包子, 时间:10:10:25 34 +++++ 3个包子 35 消费者:0----- 1个包子, 时间:10:10:26 36 消费者:2----- 1个包子, 时间:10:10:26 37 消费者:1----- 1个包子, 时间:10:10:26 38 +++++ 3个包子 39 消费者:0----- 1个包子, 时间:10:10:27 40 消费者:2----- 1个包子, 时间:10:10:27 41 消费者:1----- 1个包子, 时间:10:10:27
5、threading.Semaphore 信号量
Semaphore内部管理着一个计数器,acquire时计数器减1,release时计数器加1,当计数器 =0 时,将在acquire语句处堵塞线程。
信号量简单来说,就是控制线程最大并发数的。将信号量比作一定数量的停车位,线程比作车,当有停车位空出来时车才能上去停泊,等停车位满了时,剩下的车只能等停车位上的车走了才能继续上去停泊。
1 from threading import Thread, Lock, Semaphore 2 import time 3 4 def fn(num): 5 global sp 6 sp.acquire() 7 time.sleep(1) 8 print("--->", num, time.strftime("%X")) 9 sp.release() 10 11 12 sp = Semaphore(3) 13 for i in range(15): 14 prd = Thread(target=fn, args=(i,)) 15 prd.start() 16 17 18 # 输出结果 19 ---> 0 14:03:29 20 ---> 2 14:03:29 21 ---> 1 14:03:29 22 ---> 3 14:03:30 23 ---> 4 14:03:30 24 ---> 5 14:03:30 25 ---> 6 14:03:31 26 ---> 7 14:03:31 27 ---> 8 14:03:31 28 ---> 9 14:03:32 29 ---> 10 14:03:32 30 ---> 11 14:03:32 31 ---> 12 14:03:33 32 ---> 13 14:03:33 33 ---> 14 14:03:33
这个例子可以看到信号量为3,每一秒最多执行三个线程
6、queue模块,线程队列
我们知道多线程操作全局变量的时候是不安全的,那么线程队列则没有这个问题。和进程队列一样的方法
在线程中队列有三种方式,① 先进先出 queue.Queue ② 后进先出 queue.LifoQueue (其实这个应该叫"栈") ③ 优先级队列 queue.PriorityQueue 这个队列 put一个元组,元组第一个元素为优先级,第二个元素为值,优先级数值越小优先级越高,越先出来
1 >>> import queue 2 >>> q = queue.Queue() 3 >>> q.put(1) 4 >>> q.put("a") 5 >>> q.put("{}") 6 >>> q.get() 7 1 8 >>> q.get() 9 ‘a‘ 10 >>> q.get() 11 ‘{}‘
1 >>> import queue 2 >>> q = queue.LifoQueue() 3 >>> q.put(1) 4 >>> q.put("a") 5 >>> q.put("[]") 6 >>> q.get() 7 ‘[]‘ 8 >>> q.get() 9 ‘a‘ 10 >>> q.get() 11 1
1 >>> import queue 2 >>> q = queue.PriorityQueue() 3 >>> q.put((1, "a")) 4 >>> q.put((2, "{}")) 5 >>> q.put((3, "[]")) 6 >>> q.put((3, "[]")) 7 >>> import queue 8 >>> q = queue.PriorityQueue() 9 >>> q.put((1, "a")) 10 >>> q.put((3, "{}")) 11 >>> q.put((2, "[]")) 12 >>> q.get() 13 (1, ‘a‘) 14 >>> q.get() 15 (2, ‘[]‘) 16 >>> q.get() 17 (3, ‘{}‘)
1 from threading import Thread 2 import queue 3 import time 4 5 6 def producer(): 7 global q 8 for i in range(100): 9 time.sleep(1) 10 q.put(i) 11 print("+++++ 1个包子") 12 13 14 def consumer(name): 15 global q 16 while True: 17 bz = q.get() 18 print("消费者:%s---得到1个包子%s, 时间:%s" % (name, bz, time.strftime("%X"))) 19 20 21 q = queue.Queue(10) # 最多能添加10包子 22 prd = Thread(target=producer) 23 prd.start() 24 for i in range(3): 25 csm = Thread(target=consumer, args=(i,)) 26 csm.start() 27 28 29 # 输出结果 30 +++++ 1个包子 31 消费者:0---得到1个包子0, 时间:15:11:30 32 +++++ 1个包子 33 消费者:1---得到1个包子1, 时间:15:11:31 34 +++++ 1个包子 35 消费者:2---得到1个包子2, 时间:15:11:32 36 +++++ 1个包子 37 消费者:0---得到1个包子3, 时间:15:11:33 38 +++++ 1个包子 39 消费者:1---得到1个包子4, 时间:15:11:34
标签:开始 div 单位 信号量 不可 while 线程同步 strftime sum
原文地址:https://www.cnblogs.com/yhongji/p/9735925.html