1 import atexit 2 from random import randrange 3 from threading import Thread, Lock, current_thread # or currentThread 4 from time import ctime, sleep 5 6 7 # Use set to record the running thread 8 # If we print set directly, it will shows set([a, b, c]) 9 # So we reload the __str__ function to show it more clean(shows a, b, c as we want) 10 class CleanOutputSet(set): 11 def __str__(self): 12 return ‘, ‘.join(x for x in self) 13 14 # randrange(2, 5) will generate a num in range(2, 5) 15 # for x in range(randrange(3, 7)) will do "for" loop 3-7 times 16 # Below code generate a list which contains 3-6 numbers in range 2-4 17 # If not list(), it will return a generator, and it will be null after one time iteration 18 loops = list((randrange(2, 5) for x in range(randrange(3, 7)))) 19 20 remaining = CleanOutputSet() 21 lock = Lock() 22 23 def loop_without_lock(nsec): 24 myname = current_thread().name 25 remaining.add(myname) 26 print(‘[%s] Start %s‘ % (ctime(), myname)) 27 sleep(nsec) 28 remaining.remove(myname) 29 print(‘[%s] Completed %s (%d secs)‘ % (ctime(), myname, nsec)) 30 # Note: remaining or ‘NONE‘, will return ‘NONE‘ if remaining is Null 31 # Null including: None, False, (), [], {}, 0 32 print(‘ (remaining: %s)‘ % (remaining or ‘NONE‘)) 33 34 def loop_with_lock(nsec): 35 myname = current_thread().name 36 # When we need to modifiy public resource, acquire lock to block other threads 37 # Lock acquire and release can use ‘with lock‘ to simplify 38 lock.acquire() 39 remaining.add(myname) 40 print(‘[%s] Start %s‘ % (ctime(), myname)) 41 # After using resource, release lock for other threads to use 42 lock.release() 43 sleep(nsec) 44 45 lock.acquire() 46 remaining.remove(myname) 47 print(‘[%s] Completed %s (%d secs)‘ % (ctime(), myname, nsec)) 48 print(‘ (remaining: %s)‘ % (remaining or ‘NONE‘)) 49 lock.release() 50 51 def _main(): 52 print(‘-----Below threads without lock-----‘) 53 threads = [] 54 for pause in loops: 55 threads.append(Thread(target=loop_without_lock, args=(pause, ))) 56 for t in threads: 57 t.start() 58 for t in threads: 59 t.join() 60 print(‘-----Below threads with lock-----‘) 61 threads = [] 62 for pause in loops: 63 threads.append(Thread(target=loop_with_lock, args=(pause, ))) 64 for t in threads: 65 t.start() 66 for t in threads: 67 t.join() 68 69 # This is an exit function, when script exit, this function will be called 70 # You can use atexit.register(_atexit) to replace @atexit.register 71 # The function name ‘_atexit‘ can be change to others 72 @atexit.register 73 def _atexit(): 74 print(‘All DONE at:‘, ctime()) 75 76 if __name__ == ‘__main__‘: 77 _main()
第 1-4 行,首先导入需要的模块,atexit用于设置退出脚本时的处理函数,random用于产生随机数来增加线程的不确定性。
第 7- 12 行,定义一个新的集合类,用于输出当前运行线程的集合,新的集合类CleanOutputSet重载了__str__方法,使集合的显示由set([a, b, c])变为我们想要的a, b, c形式。
第 14-21行,利用随机数模块,产生一个生成器,包含3-6个大小在2-4之间的随机数,为了后续重复使用,此处对生成器进行list操作,否则生成器在迭代一次之后将变为空,无法复用。同时对全局锁和集合输出类进行实例化。
第 23-32 行,定义一个不加锁的线程函数,该函数会在进入时向集合添加线程名,sleep相应时间后,移除线程名,同时显示集合(共有资源)内剩余的线程名。
第 34-49 行,定义一个加锁的线程函数,该函数会在进入时获取线程锁,之后再向集合添加线程名,添加完成后释放线程锁,sleep相应时间后,获取线程锁,移除线程名,同时显示集合(共有资源)内剩余的线程名,最后释放线程锁。
第 51-67 行,主函数中分别对加锁和不加锁的两种线程方式进行调用,并利用join()方法挂起线程以区分开两种方式的运行。
第 69-77 行,利用atexit.register函数/@register装饰器定义脚本退出函数。
-----Below threads without lock----- [Tue Aug 1 11:01:57 2017] Start Thread-1 [Tue Aug 1 11:01:57 2017] Start Thread-2[Tue Aug 1 11:01:57 2017] Start Thread-3 [Tue Aug 1 11:01:57 2017] Start Thread-4 [Tue Aug 1 11:01:59 2017] Completed Thread-3 (2 secs) (remaining: Thread-1, Thread-4, Thread-2) [Tue Aug 1 11:02:00 2017] Completed Thread-1 (3 secs) (remaining: Thread-4, Thread-2) [Tue Aug 1 11:02:01 2017] Completed Thread-2 (4 secs)[Tue Aug 1 11:02:01 2017] Completed Thread-4 (4 secs) (remaining: NONE) (remaining: NONE) -----Below threads with lock----- [Tue Aug 1 11:02:01 2017] Start Thread-5 [Tue Aug 1 11:02:01 2017] Start Thread-6 [Tue Aug 1 11:02:01 2017] Start Thread-7 [Tue Aug 1 11:02:01 2017] Start Thread-8 [Tue Aug 1 11:02:03 2017] Completed Thread-7 (2 secs) (remaining: Thread-8, Thread-6, Thread-5) [Tue Aug 1 11:02:04 2017] Completed Thread-5 (3 secs) (remaining: Thread-8, Thread-6) [Tue Aug 1 11:02:05 2017] Completed Thread-6 (4 secs) (remaining: Thread-8) [Tue Aug 1 11:02:05 2017] Completed Thread-8 (4 secs) (remaining: NONE)
1 from threading import Thread, Lock 2 import time 3 lock = Lock() 4 COUNT = 0 5 6 def gentleCounter(): 7 name = ‘gentleCounter‘ 8 lock.acquire() 9 print(‘%s acquired the lock‘ % name) 10 global COUNT 11 COUNT += 1 12 print(‘%s made a count plus, now the COUNT is %d‘ % (name, COUNT)) 13 print(‘%s is taking a rest...‘ % name) 14 time.sleep(3) 15 COUNT += 1 16 print(‘%s made a count plus again, now the COUNT is %d‘ % (name, COUNT)) 17 lock.release() 18 print(‘%s released the lock‘ % name) 19 20 def wildCounter(): 21 time.sleep(1) 22 name = ‘wildCounter‘ 23 print(‘%s didn\‘t acquire the lock‘ % name) 24 global COUNT 25 COUNT += 1 26 print(‘%s made a count plus, now the COUNT is %d‘ % (name, COUNT)) 27 28 Thread(target=gentleCounter).start() 29 Thread(target=wildCounter).start()
第 1-4 行,导入线程和锁两个类,并定义一个锁对应的全局变量COUNT,赋予初值。
第 6-18 行,定义一个正常计数方法,该方法遵循规则,会在获取锁后对COUNT进行+1操作,随后休眠3秒再次进行COUNT+1操作,在这期间不会对锁释放,两次加值之后,正常计数方法会释放锁。
第 20-26 行,定义一个异常计数方法,该方法不会对锁进行获取,它会在进入时等待1秒,当正常计数方法的线程进入休眠状态时,不获取锁权限直接对COUNT进行+1操作。
第 28-29 行,将两种方法以两个线程分别启动。
gentleCounter acquired the lock gentleCounter made a count plus, now the COUNT is 1 gentleCounter is taking a rest... wildCounter didn‘t acquire the lock wildCounter made a count plus, now the COUNT is 2 gentleCounter made a count plus again, now the COUNT is 3 gentleCounter released the lock
Python threading模块有两类锁,互斥锁(threading.Lock )和可重用锁(threading.RLock)。两者的用法基本相同。但互斥锁只能被获取一次,若多次获取则会产生阻塞,需等待原锁释放后才能再次入锁。而可重入锁则可被本线程多次acquire入锁,但是要求入锁次数与释放次数相同,才能完全解锁,且锁的释放需要在同一个线程中进行。
1 from threading import Lock, RLock 2 3 def call(): 4 print(‘This is call() function‘) 5 with lock: 6 g() 7 h() 8 9 def g(): 10 if not lock.acquire(True, 1): 11 print(‘g() acquires lock failed‘) 12 else: 13 print(‘This is g() function‘) 14 lock.release() 15 h() 16 17 def h(): 18 if not lock.acquire(True, 1): 19 print(‘h() acquires lock failed‘) 20 else: 21 print(‘This is h() function‘) 22 lock.release() 23 24 print(‘\n-------Using Lock-------‘) 25 lock = Lock() 26 call() 27 print(‘\n-------Using RLock-------‘) 28 lock = RLock() 29 call()
第 3-22 行,定义call函数,在call函数中会利用with lock进行一次入锁,入锁后调用g()和h(),其中g和h函数的功能都是尝试对锁进行获取,获取失败或超时则输出失败语句,成功则显示函数调用并释放锁,而在g中会对h函数再进行一次调用。
第 24-29 行,运行程序,分别使用互斥锁和可重入锁进行试验。
-------Using Lock------- This is call() function g() acquires lock failed h() acquires lock failed h() acquires lock failed -------Using RLock------- This is call() function This is g() function This is h() function This is h() function
Note: 值得注意的是,对于互斥锁,当一个线程获取锁之后,可以在另一个线程中释放锁,而对于可重入锁,则必须在同一个线程中对锁的获取进行释放。
4.1 迭代死锁
1 from threading import Thread, Lock, RLock, current_thread 2 3 4 mutex = Lock() 5 reentrant = RLock() 6 class MyThread(Thread): 7 def __init__(self, lock): 8 Thread.__init__(self) 9 self.lock = lock 10 11 def run(self): 12 self.name = current_thread().name 13 print(‘-------This is %s-------‘ % self.name) 14 if self.lock.acquire(): 15 print(‘%s get lock one‘ % self.name, ‘\nTrying to get second lock‘) 16 self.lock.acquire() 17 print(‘Got second lock‘) 18 print(‘Trying to release lock...‘) 19 self.lock.release() 20 print(‘First lock released‘) 21 self.lock.release() 22 print(‘Second lock released‘) 23 print(‘Lock all released‘) 24 print(‘--------Exit %s---------‘ % 25 self.name) 26 27 t = MyThread(reentrant) 28 t.start() 29 t.join() 30 t = MyThread(mutex) 31 t.start() 32 t.join()
-------This is Thread-1------- Thread-1 get lock one Trying to get second lock Got second lock Trying to release lock... First lock released Second lock released Lock all released --------Exit Thread-1--------- -------This is Thread-2------- Thread-2 get lock one Trying to get second lock
4.2 互调死锁
1 from threading import Thread, Lock, RLock, current_thread 2 import time 3 4 mutex_1 = Lock() 5 mutex_2 = Lock() 6 reentrant_1 = RLock() 7 reentrant_2 = RLock() 8 COUNT = 0 9 class MyThread(Thread): 10 def __init__(self, lock_1, lock_2): 11 Thread.__init__(self) 12 self.lock_1 = lock_1 13 self.lock_2 = lock_2 14 15 def run(self): 16 self.name = current_thread().name 17 if self.lock_1.acquire(): 18 print(‘%s got its first lock‘ % self.name) 19 global COUNT 20 COUNT += 1 21 print(‘%s make COUNT plus one, now COUNT is %d‘ % (self.name, COUNT)) 22 time.sleep(2) 23 print(‘%s trying to get another one...‘ % self.name) 24 if self.lock_2.acquire(): 25 print(‘%s got its second lock‘ % self.name) 26 self.lock_2.release() 27 print(‘%s release its second lock‘ % self.name) 28 self.lock_1.release() 29 print(‘%s release its first lock‘ % self.name) 30 31 threads = [MyThread(mutex_1, mutex_2), MyThread(mutex_2, mutex_1)] 32 #threads = [MyThread(reentrant_1, reentrant_2), MyThread(reentrant_2, reentrant_1)] 33 for t in threads: 34 t.start() 35 for t in threads: 36 t.join()
第 8-29 行,在导入必须模块后,派生了一个线程子类,这个线程子类要求先后传入两把锁的实例,在run函数中,会对先传入的锁进行获取,获取之后将COUNT+1并休眠等待(以保证另一个线程有足够的时间获取它的第一把锁),随后尝试获取传入的第二把锁,获取成功后依次释放前面的两把锁。
第 31-36 行,此处对两个线程进行实例化,两个线程的区别在于,传入锁的顺序不同,因此线程1会先获取锁1,再获取锁2,而线程2则相反。传入的锁类型可选择互斥锁或可重入锁。
Thread-1 got its first lock Thread-1 make COUNT plus one, now COUNT is 1 Thread-1 trying to get another one... Thread-2 got its first lock Thread-2 make COUNT plus one, now COUNT is 2 Thread-2 trying to get another one...
Note: 值得注意的是,此处即便换成可重入锁,也不能解决互调死锁的问题,因为可重入锁仅对本线程支持可重入,对于其他线程依旧互斥。
1 import atexit 2 from random import randrange 3 import threading 4 from threading import Thread, Lock 5 from time import ctime, sleep 6 7 class CleanOutputSet(set): 8 def __str__(self): 9 return ‘, ‘.join(x for x in self) 10 11 # Use generator is much better than list 12 # loops = (randrange(1, 7) for x in range(2, 8)) 13 loops = list(randrange(1, 7) for x in range(2, 8)) 14 remaining = CleanOutputSet() 15 lock = Lock() 16 17 def loop(nsec): 18 myname = threading.current_thread().name 19 with lock: 20 remaining.add(myname) 21 print(‘[{0}] Start {1}‘.format(ctime(), myname)) 22 # loops is generator, after ‘for‘ iteration, remains [] 23 if len(remaining) == len(loops): 24 func_for_trial() 25 sleep(nsec) 26 with lock: 27 remaining.remove(myname) 28 print(‘[{0}] Completed {1} ({2} secs)‘.format(ctime(), myname, nsec)) 29 print(‘ (remaining: {0})‘.format(remaining or ‘NONE‘)) 30 31 def func_for_trial(): 32 count = threading.active_count() 33 active_thread = threading.enumerate() 34 print(‘There are %d active threads, \n%s‘ % (count, str(active_thread).replace(‘, ‘, ‘\n‘))) 35 36 def _main(): 37 threads = [] 38 for pause in loops: 39 threads.append(Thread(target=loop, args=(pause, ))) 40 for t in threads: 41 t.start() 42 for t in threads: 43 t.join() 44 45 @atexit.register 46 def _atexit(): 47 print(‘All DONE at:‘, ctime()) 48 49 if __name__ == ‘__main__‘: 50 _main()
1 from atexit import register 2 from random import randrange 3 from threading import Semaphore, BoundedSemaphore, Lock, Thread 4 from time import sleep, ctime 5 import threading 6 7 """ 8 # This Obj reload the __len__ method to return current number of semaphore 9 class MySemaphore(BoundedSemaphore): 10 def __len__(self): 11 return self._value 12 candy = MySemaphore(5) 13 print(len(candy)) 14 """ 15 lock = Lock() 16 MAX = 5 17 18 def refill(): 19 lock.acquire() 20 print(‘Refilling candy...‘) 21 try: 22 candyTray.release() 23 except ValueError: 24 print(‘Full, skipping‘) 25 else: 26 print(‘OK, current candy num is %d‘ % candyTray._value) 27 lock.release() 28 29 def buy(): 30 lock.acquire() 31 print(‘Buying candy...‘) 32 if candyTray.acquire(False): 33 print(‘OK, current candy num is %d‘ % candyTray._value) 34 else: 35 print(‘Empty, skipping‘) 36 lock.release() 37 38 def producer(loops): 39 for i in range(loops): 40 refill() 41 sleep(randrange(3)) 42 43 def consumer(loops): 44 for i in range(loops): 45 buy() 46 sleep(randrange(3)) 47 48 def _main(): 49 print(‘Starting at‘, ctime()) 50 nloops = randrange(2, 6) 51 print(‘THE CANDY MACHINE (full with %d bars)!‘ % MAX) 52 # Buyer Thread 53 buyer = Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2), )) 54 buyer.start() 55 # Vendor Thread 56 vendor = Thread(target=producer, args=(nloops, )) 57 vendor.start() 58 for t in [buyer, vendor]: 59 t.join() 60 61 @register 62 def _atexit(): 63 print(‘All DONE at:‘, ctime()) 64 65 if __name__ == ‘__main__‘: 66 print(‘-------BoundedSemaphore-------‘) 67 candyTray = BoundedSemaphore(MAX) 68 _main() 69 print(‘-------Semaphore------‘) 70 candyTray = Semaphore(MAX) 71 _main()
第 7-14 行,在导入必须模块之后,派生的类重定义了一个__len__方法,调用len()方法后,会返回当前信号量的可用资源数。这样的方式使得查看资源的方式变得更加清晰。
第 15-16 行,初始化锁,设置初始信号量参数,用于模拟糖果机初始糖果数量,当使用有界信号量时,糖果数量不允许超过初始值,即糖果最多为5个。
第 18-27 行,定义一个refill函数,这个函数会在调用时进行加锁,然后释放一个信号量+1(模拟糖果机补充糖果),并在信号量将要超过初始值时捕获异常。
第 29-36 行,定义一个buy函数,这个函数会在调用时进行加锁,然后获取一个信号量-1(模拟用户购买一个糖果),并在信号量为0时处理阻塞。
第 38-46 行,定义生产者与购买者函数,分别以随机间隔时间执行糖果购买与糖果补充。
第 48-63 行,定义主函数,在主函数中启动购买者与糖果机线程,通过join挂起线程等待其余线程结束再退出主函数。
-------BoundedSemaphore------- Starting at Tue Aug 1 16:53:32 2017 THE CANDY MACHINE (full with 5 bars)! Buying candy... OK, current candy num is 4 Refilling candy... OK, current candy num is 5 Refilling candy... Full, skipping Refilling candy... Full, skipping Buying candy... OK, current candy num is 4 Buying candy... OK, current candy num is 3 Refilling candy... OK, current candy num is 4 Buying candy... OK, current candy num is 3 Refilling candy... OK, current candy num is 4 Buying candy... OK, current candy num is 3 -------Semaphore------ Starting at Tue Aug 1 16:53:38 2017 THE CANDY MACHINE (full with 5 bars)! Buying candy... OK, current candy num is 4 Refilling candy... OK, current candy num is 5 Buying candy... OK, current candy num is 4 Refilling candy... OK, current candy num is 5 Buying candy... OK, current candy num is 4 Refilling candy... OK, current candy num is 5 Refilling candy... OK, current candy num is 6 Buying candy... OK, current candy num is 5 Buying candy... OK, current candy num is 4 Buying candy... OK, current candy num is 3 Buying candy... OK, current candy num is 2 Buying candy... OK, current candy num is 1
1. 基本概念
2. 多线程的建立