Semaphore线程同步机制,当调用acquire()时,内部计数器数值增加;调用release()时,内部计数器递减;计数器值不能小于0,如果等于0,acquire()方法被阻塞,需要等待其他线程调用release()方法。
BoundedSemaphore(value=1),对信号量的计数器设置界限,默认值为1,计数器的值不能大于设定的值,否则抛出ValueError;如果信号量释放过多,则表示程序有Bug。
信号量通常用来保护容量有限的资源,例如链接数据库服务器,官方示例伪代码:
maxconnections = 5 # ... pool_sema = BoundedSemaphore(value=maxconnections) with pool_sema: conn = connectdb() try: # ... use connection ... finally: conn.close()
正常样例:
1 from threading import Thread, BoundedSemaphore, currentThread 2 import time 3 4 sema = BoundedSemaphore(value=2) # 信号量界限为2 5 def run(): 6 sema.acquire() # 每次执行,线程都会调用acquire、release 7 print(‘Run....[ %s ]‘ % currentThread().getName()) 8 time.sleep(1) 9 sema.release() 10 11 threads = [] 12 for i in range(3): 13 t = Thread(target=run) 14 t.start() 15 threads.append(t) 16 17 for item in threads: 18 item.join()
正常输出:
Run....[ Thread-1 ] Run....[ Thread-2 ] Run....[ Thread-3 ]
异常样例:
1 from threading import Thread, BoundedSemaphore, currentThread 2 import time 3 4 sema = BoundedSemaphore(value=2) 5 def run(): 6 """ 7 第三个线程会报错,这里超时时间为0.5s,因为前两个线程需要在1s之后才可以调用release()方法, 8 所以在等待其他线程调用release()方法时超时,导致第三个线程调用acquire()方法失败, 9 信号量内部计数器没有减少,但是之后又去调用release()方法,计数器数值超出设定界限,所以抛出ValueError 10 """ 11 sema.acquire(timeout=0.5) 12 print(‘Run....[ %s ]‘ % currentThread().getName()) 13 time.sleep(1) 14 try: 15 sema.release() 16 except ValueError as e: 17 print(‘Error:‘, e) 18 19 threads = [] 20 for i in range(3): 21 t = Thread(target=run) 22 t.start() 23 threads.append(t) 24 25 for item in threads: 26 item.join()
异常输出结果:
Run....[ Thread-1 ] Run....[ Thread-2 ] Run....[ Thread-3 ] Error: Semaphore released too many times