标签:生产者 python roc rlock tar 直接 依赖 生产者消费者模型 子类
什么是线程?
程序的执行线路。每个进程默认有一条线程。线程包含了程序的具体步骤。
多线程就是一个进程中有除主线程(默认线程)外还有多个线程。
线程与进程的关系(进程包含线程,而线程依赖进程存在)
1.进程包含了运行该程序的所有资源,是一个资源单位。
2.线程是CPU的执行单位(最小的执行单位)。
3.进程一旦被创建,就默认开启了一条线程,称之为主线程。
4.线程共享创建它的进程的地址空间;进程有自己的地址空间。
5.线程可以直接访问其进程的数据段;进程有它们自己的父进程的数据段副本。
6.线程可以直接与进程的其他线程通信;进程必须使用进程间通信来与兄弟进程通信。
7.容易创建新线程;新进程需要父进程的复制。
8.线程可以对同一进程的线程进行相当大的控制;流程只能对子流程进行控制。
9.对主线程的更改(取消、优先级更改等)可能会影响进程中其他线程的行为;对父进程的更改不会影响子进程。
为什么使用多线程?
为了提高程序运行效率。与进程区别是线程对于系统资源的占用非常小。
1.多个线程共享一个进程的地址空间。
2.线程比进程对系统资源占用小,创建速度快10-100倍。
3.同一个进程中多个线程之间资源共享,不需要像进程一样需要进程间通信。
线程的两种开启方式
#第一种方式:导入threading模块中的Thread类来创建一个对象 from threading import Thread ? def task(): print(‘子线程 running。。。‘) t = Thread(target=task) t.start() print(‘over‘) # from threading import Thread # # def task(): # print(‘子线程 running。。。‘) # # if __name__ == ‘__main__‘: # t = Thread(target=task) # t.start() # print(‘over‘) ? ? #第二种方式:创建一个子类继承Thread类,可自定义run方法,但是不能改变run方法名。 from threading import Thread class MyThread(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): print(‘%s say hi‘ % self.name) if __name__ == ‘__main__‘: t = MyThread(‘daidai‘) t.start()
进程和线程的对比
from multiprocessing import Process from threading import Thread import time ? def task(): pass #开启100个线程花费的时间 start_time = time.time() ts = [] for i in range(100): t = Thread(target=task) t.start() ts.append(t) print(os.getpid()) # 所有的进程编号都一样,同属一个进程中 for t in ts: t.join() print(time.time()-start_time) ### 0.018949270248413086 ? #开启100个进程花费的时间 if __name__ == ‘__main__‘: start = time.time() ps = [] for i in range(100): p = Process(target=task) p.start() ps.append(p) for p in ps: p.join() print(time.time()-start) ### 5.281934499740601
线程间的资源共享
from threading import Thread x = 100 def task(): print("run....") global x # 修改全局变量 x = 0 t = Thread(target=task) t.start() t.join() # join方法将子线程的优先级提高到主线程前 print(x) print("over") #### run.... 0 over
守护线程
无论是进程还是线程:都遵循守护一方等待主方运行完毕后被销毁。运行完毕不是终止运行。
1.对于主进程来说,运行完毕是主代码运行完毕。
2.对于主线程来说,运行完毕是主线程内所有非守护线程运行完毕。所以守护线程会在所有非守护线程结束后结束。
详细解释:
#1.主进程在其代码结束后就已经算运行完毕(守护进程在此时会被回收),然后著京城会一直等着非守护的子进程都运行完毕后回收子进程的资源(否则就会产生僵尸进程),才会结束。
#2.主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就会被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
?
from threading import Thread import time ? def task(): print(‘sub thread running‘) time.sleep(3) print(‘sub thread over‘) ? t = Thread(target=task) t.setDaemon(True) # 要在开启之前设置 t.start() print(‘main thread over‘) ### sub thread running main thread over
Thread类对象常用的属性和方法
对象的方法:
isAlive():返回线程是否活动
getName():返回线程名
setName():设置线程名
threading模块的一些方法:
current_thread():返回当前线程变量,获取当前线程
enumerate():返回一个包含正在运行线程的列表
active_count():返回正在进行的线程数量,与len(threading.enumerate())有相同的结果。
from threading import Thread,current_thread,enumerate,active_count import time import os def task(): time.sleep(3) print(current_thread().getName()) t = Thread(target = task) t.start() print(current_thread().getName()) print(current_thread()) print(enumerate()) print(active_count()) print(‘daidai‘) #运行结果 MainThread <_MainThread(MainThread, started 14652)> [<_MainThread(MainThread, started 14652)>, <Thread(Thread-1, started 15548)>] 2 daidai Thread-1 ? ? t.join() #主线程等待子线程结束
?
线程互斥锁
当多个进程或者多个线程需要同时修改同一份数据时,可能造成数据的错乱,所以需要给说句加上锁。
同样的线程中也有死锁和可重入锁RLock
import time from threading import Thread, Lock lock = Lock() a = 100 def task(): lock.acquire() global a # 修改全局变量a temp = a-1 time.sleep(0.05) a = temp lock.release() ts = [] for i in range(100): t = Thread(target=task) t.start() ts.append(t) ? for t in ts: t.join() ? print(a) # 全局中的a已经被修改为0
信号量Semaphore
semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1:
调用release()时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
信号量其实也是一种锁,特点是可以设置一个数据可以被几个线程(进程)共享。
与普通锁的区别:
普通锁一旦加锁就意味着这个数据在同一时间只能被一个线程或进程使用。
信号量可以让数据在同一时间能被多个线程使用。
#实例:开启10个线程,每次运行3个 from threading import Semaphore, Thread, current_thread import time ? sem = Semaphore(3) def task(): sem.acquire() print(‘%s task running‘ % current_thread()) time.sleep(3) sem.release() ? for i in range(10): t = Thread(target=task) t.start()
生产者消费者模型中的JoinableQueue
JoinableQueue类是一种队列,Queue的子类。但是实例化的对象可以明确直到队列中是否有数据及数据的使用量。
参数介绍:
maxsize是队列中允许最大项数,省略则无大小限制。
方法介绍:
JoinableQueue的实例除了与Queue实例对象相同方法之外还有:
task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发异常。
q.join():明确生产者不会再生产数据。加入数据到队列中。
import time import random from multiprocessing import Process, JoinableQueue def eat_hotdog(name, q): while True: res = q.get() if not res: print(‘吃完了。。。‘) break print(‘%s吃了%s‘ % (name, res)) time.sleep(random.randint(1, 2)) q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了 def make_hotdog(name, q): for i in range(1,6): time.sleep(random.randint(1, 2)) print(‘%s 生产了第%s个热狗‘ % (name,i)) res = ‘%s的%s个热狗‘% (name,i) q.put(res) ? if __name__ == ‘__main__‘: q = JoinableQueue() #生产者1 c1 = Process(target=make_hotdog, args=(‘万达‘, q)) c1.start() #生产者2 c2 = Process(target=make_hotdog, args=(‘呆呆‘, q)) c2.start() ? p2 = Process(target=eat_hotdog, args=(‘思聪‘, q)) p2.daemon = True p2.start() # 首先保证生产者全部产完成 c1.join() c2.join() # 保证队列中的数据全部被处理了 q.join() # 明确生产方已经不会再生成数据了
标签:生产者 python roc rlock tar 直接 依赖 生产者消费者模型 子类
原文地址:https://www.cnblogs.com/5j421/p/10597769.html