标签:生产 [] 自己 多个 也会 str rlock 交互 信号
线程是进程内的独立的运行线路,是操作系统能够进行运算调度的最小单位,同时也是处理器调度的最小单位。线程被包含在进程之内,是进程中实际运作单位。
一个线程指的是进程中的一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
首先要import threading
线程有两种调用方式,第一种是直接调用
import threading
import time
# 直接调用
def run(n):
print("task {0} {1}".format(n,threading.current_thread()))
time.sleep(2)
if __name__ == "__main__":
t1 = threading.Thread(target=run,args=("t1",)) # 参数括号内的逗号不能省略
t2 = threading.Thread(target=run,args=("t2",)) # 生成一个线程对象
t1.start() # 启动线程
t2.start()
# 现在同时启动50个线程
start = time.time()
for i in range(50):
t= threading.Thread(target=run,args=("t{0}".format(i),))
t.start()
end = time.time()
cost = end - start
print("cost time:",cost)
第二种是继承式
import threading
import time
‘‘‘继承式调用‘‘‘
class MyThread(threading.Thread):
def __init__(self,n):
super(MyThread,self).__init__()
self.n = n
def run(self):
print("running task{0} {1}".format(self.n,threading.current_thread()))
time.sleep(1)
if __name__ == "__main__":
start = time.time()
obj = []
for i in range(50):
t= MyThread("t{0}".format(i))
t.start()
end = time.time()
cost = end - start
print("cost time:",cost)
使用多线程对比函数调用,得出结论是:线程是并发执行的(同时执行),而函数调用只能是顺序执行,多线程执行大大提高了运行效率。
但是以上两个程序在运行的时候发现了一个共同的问题:拿继承式的代码来说,理论上50个线程执行完需要经过一秒多才会执行完毕,退出程序,但是实际情况却是开启50个线程之后立马就退出程序,执行时间不足0.01s。为什么呢?
答案是多线程。当前运行的线程是主线程,主线程启动了50个子线程,启动完毕后继续做自己的事情,每个线程之间互不干扰,并行运行。因此无法用此方式测定50个程序到底运行了多久。
主线程运行完毕即退出程序,如果不特殊处理,它不会等待子线程处理完毕。所以我们如果想要等待子线程的运行结果,需要加上join()语句。
import threading import time ‘‘‘继承式调用‘‘‘ class MyThread(threading.Thread): def __init__(self,n): super(MyThread,self).__init__() self.n = n def run(self): print("running task{0} {1}".format(self.n,threading.current_thread())) time.sleep(1) if __name__ == "__main__": start = time.time() obj = [] for i in range(50): t= MyThread("t{0}".format(i)) t.start() obj.append(t) # 为了不阻塞后面线程的启动,现将其加入列表 for j in obj: j.join() print(threading.current_thread()) # 证明主线程 end = time.time() cost = end - start print("cost time:",cost)
当线程被启动后,如果调用join()方法,则会在此线程执行完之前程序不往下走,也就是阻塞当前程序,使程序变为串行执行,这当然不是我们愿意看到的。在此程序中,我们希望它并发执行的同时,满足主线程等待所有子线程执行完毕后再结束这个条件。只需要在开启所有线程之后,一一地join()即可。
在线程锁之间讲一下GIL全局性解释器锁。
首先要明白GIL并不是Python的特性,它是在实现Python解释器(CPython)时引入的一个概念。简单来说,GIL锁的存在使得:无论你开启多少个线程,无论你的CPU有多少核,在执行程序的时候同一时间只会占用一个核。所以,你以为的同时占用多核只是假象。再说一遍,这不是Python的特性,这只是CPyhton解释器的特性,其他类型的解释器如JPyhon,pypy等没有这个特性。之所以我们在使用Python多线程时没有感觉是单线程是因为上下文的切换。
在这里不详细多说,关于GIL全局性解释器锁的详细信息,有兴趣可以参考:https://www.cnblogs.com/cjaaron/p/9166538.html
我们最重要的是需要理解,Python多线程适合IO密集型操作,但在计算密集型操作中,多线程甚至没有单线程快。
再来讲互斥锁(也就是线程锁)
当多个线程同时修改一个数据的时候,可能会发生无法预估的错误,所以这时候要上锁。在这里叫做互斥锁。
def run(): lock.acquire() # 获取锁 global num num += 1 lock.release() # 释放锁 time.sleep(1) if __name__ == "__main__": lock = threading.Lock() # 生成锁的实例 num = 0 obj = [] start = time.time() for i in range(1000): t= threading.Thread(target=run) t.start() obj.append(t) for j in obj: j.join() end = time.time() cost = end - start print("cost time:",cost) print("num:",num)
互斥锁之间可以嵌套,但是没有理清逻辑容易造成死锁,无法解开。这里使用递归锁,实现多层上锁和多层解锁
def run1():
print("grab the first part data")
lock.acquire()
global num
num += 1
lock.release()
return num
def run2():
print("grab the second part data")
lock.acquire()
global num2
num2 += 1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print(‘--------between run1 and run2-----‘)
res2 = run2()
lock.release()
print(res, res2)
if __name__ == ‘__main__‘:
num, num2 = 0, 0
lock = threading.RLock() # 递归锁,设锁和解锁必须是成对的
for i in range(10):
t = threading.Thread(target=run3)
t.start()
while threading.active_count() != 1:
print(threading.active_count())
else:
print(‘----all threads done---‘)
print(num, num2)
需要注意的是:上锁和解锁必须是成对的。
信号量其实也是一种锁,用于限制线程的并发量,即同一时间只允许几个线程运行。
def run(n): semaphore.acquire() time.sleep(1) print("run the thread: {0}\n".format(n)) semaphore.release() if __name__ == ‘__main__‘: semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行 for i in range(20): t = threading.Thread(target=run, args=(i+1,)) t.start() while threading.active_count() != 1: pass # print threading.active_count() else: print(‘----all threads done---‘)
创建一个有延迟的线程。
def hello(): print("hello, world") t = Timer(30.0, hello) t.start() # 30秒后执行此线程
threading.Event()用来实现两个或多个线程之间的交互
有四个方法:
set() 设置标志位
clear()清空标志位
wait()如果已经设置标志位不会阻塞,如果没有设置标志位则产生阻塞
isSet() 判定是否设置了标志位。
接下来是一个车等红绿灯的例子,车是停还是行需要根据红绿灯来判定。将实例化若干个车(线程)和一个红绿灯(线程),来实现线程间交互
import threading
import time
import random
def light():
"红绿灯"
while True:
count = 1
event.set() # 标志位设定了,wait就不阻塞 #绿灯状态
while True:
if count == 1:
print(‘\033[42;1m--green light on---\033[0m‘)
time.sleep(10)
count = 2
elif count == 2:
print(‘\033[43;1m--yellow light on---\033[0m‘)
time.sleep(2)
count = 3
elif count == 3:
event.clear() # 标志位被清空,wait()阻塞
print(‘\033[41;1m--red light on---\033[0m‘)
time.sleep(10)
break
def car(n):
while 1:
time.sleep(random.randrange(8))
if event.isSet(): # 判断标志位是否设定
print("car [%s] is running.." % n)
else:
print("car [%s] is waiting for the red light.." %n)
if __name__ == ‘__main__‘:
event = threading.Event()
Light = threading.Thread(target=light)
Light.start()
for i in range(3):
t = threading.Thread(target=car,args=(i,))
t.start()
第二个例子,员工过门。如果门是开的,员工直接过门;如果们是关的,员工需要刷卡开门再过门。
import threading
import time
import random
def door():
count = 0
while True:
if door_event.is_set():
print("\033[32;1mdoor opening....\033[0m")
count +=1
else:
print("\033[31;1mdoor closed...., swipe to open.\033[0m")
count = 0 #清空计时器
door_event.wait()
if count > 3:#门开了已经3s了,该关了
door_event.clear()
time.sleep(0.5)
def staff(n):
print("staff [%s] is comming..." % n )
while True:
if door_event.is_set():
print("\033[34;1mdoor is opened, passing.....\033[0m")
break
else:
print("staff [%s] sees door got closed, swipping the card....." % n)
door_event.set()
print("door is opening...")
time.sleep(0.5)
if __name__ == "__main__":
door_event = threading.Event() #设置事件
door_thread = threading.Thread(target=door) # 实例化“门”线程并启动
door_thread.start()
for i in range(5):
p = threading.Thread(target=staff,args=(i,))
time.sleep(random.randrange(3))
p.start()
线程间的数据通信。需要导入模块queue。
分为三种:
qsize() 查询队列中数据的个数
put(item,block=Ture,timeout=None) 往队列中放入数据
get(block=True,timeout=None) 取出数据。当队列中没有数据的时候,get将会造成阻塞,直到队列中又存入数据为止。参数block=False不会阻塞,timeout=1表示只卡1s
empty() 当队列为空返回True
full() 当队列已满返回True
def put(): #time.sleep(1) # 将执行此函数的线程滞后,更好地观摩多线程的运行效果 try: for i in range(50): cou = i q.put(i + 1, block=False) else: print("存入了{0}个".format(cou+1)) except queue.Full: print("队列已满,存入了{0}个".format(cou)) def get(): try: for i in range(50): print(q.get(block=False)) except queue.Empty: print("队列已空,无法取出数据") def qsize(): # time.sleep(1) print("此时队列中个数:",q.qsize()) put_thr = threading.Thread(target=put) get_thr = threading.Thread(target=get) qsi_thr = threading.Thread(target=qsize) q = queue.Queue(5) # 队列最多同时容纳5个数据 put_thr.start() qsi_thr.start() get_thr.start()
在代码中,有用到异常处理,是因为如果队列满了再存入数据会触发异常,队列空了再取数据也会触发异常。
方法与上同
import queue import threading ‘‘‘ 按照优先级取出数据。 ‘‘‘ q = queue.PriorityQueue() # 生成队列 q.put((5,"lwz")) q.put((10,"alex")) q.put((2,"chenronghua")) # 数字越小优先级越高 print(q.get()) print(q.get()) print(q.get())
该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。
在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。
同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式?
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,
而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,
而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
import queue import threading import time def Producer(): count = 0 while True: count += 1 q.put("骨头{0}".format(count)) print("生产了骨头{0}".format(count)) time.sleep(1) def Customer(name): while 1: print("{0}取到并且吃了{1}".format(name,q.get())) time.sleep(0.1) if __name__ == "__main__": q = queue.Queue() c1 = threading.Thread(target=Customer,args=("alex",)) c2 = threading.Thread(target=Customer,args=("Chen",)) p = threading.Thread(target=Producer) c1.start() c2.start() p.start()
标签:生产 [] 自己 多个 也会 str rlock 交互 信号
原文地址:https://www.cnblogs.com/V587Chinese/p/9381653.html