标签:message mon 意义 条件 事件处理 后台 task 定义 远程
Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。
1.1、threading模块
threading模块建立在_thread模块之上。thread模块以低级=原始的方式来处理和控制线程,而threading模块
通过对thread进行二次封装,提供了更方便的api来处理线程。
简单的线程实例:
创建了20个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 创建一个简单的threading线程实例 5 """ 6 import threading 7 import time 8 9 def to_worker(num): 10 """ 11 线程方法 12 :param num: 13 :return: 14 """ 15 time.sleep(1) 16 print("The num is %s" % num) 17 return 18 19 for i in range(5): 20 t = threading.Thread(target=to_worker, args=(i, )) 21 t.start() #激活线程
代码执行结果:
1.2、创建线程的构造方法
t = threading.Thread(group = None, target = None, name = Nome, args = 0, kwargs = {})
注释说明:
group --线程组
target --要执行的方法
name --线程名
args/kwargs -要传入方法的参数
Thread类提供了以下方法:
1、t.start() --激活线程
2、t.getName() --获取线程的名称
3、t.setName() --设置线程的名称
4、t.name() --获取或设置线程的名称
5、t.is_alive() --判断线程是否为激活状态
6、t.isAlive() --判断线程是否为激活状态
7、t.setDaemon() --设置为后台线程或前台线程(默认:False)
8、t.isDaemon() --判断是否为守护线程
9、t.ident() --获取线程的标识符。线程标识符是一个非零整数,只有在调用start()方法后,该属性才有效,否则它只返回None
10、t.join() --逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义。
11、t.run() --线程被cpu调度后自动执行线程对象的run方法
1.3、python线程锁
当有一个数据有多个线程对其进行修改的时候,任何一个线程改变他都会对其他线程造成影响,如果我们想某一个线程在使用完之前,其他线程不能对其修改,就需要对这个线程加一个线程锁。
简单的线程锁实例:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 线程锁小实例 5 """ 6 import threading 7 import time 8 9 globals_num = 0 10 11 lock = threading.RLock() 12 13 def Func(): 14 lock.acquire() #获取锁 15 global globals_num 16 globals_num += 1 17 time.sleep(1) 18 print(globals_num) 19 lock.release() #释放锁 20 21 for i in range(10): 22 t = threading.Thread(target=Func) 23 t.start()
代码执行结果:
threading.RLock和threading.Lock 的区别
RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。
1 import threading 2 lock = threading.Lock() #Lock对象 3 lock.acquire() 4 lock.acquire() #产生了死琐。 5 lock.release() 6 lock.release() 7 8 9 10 import threading 11 rLock = threading.RLock() #RLock对象 12 rLock.acquire() 13 rLock.acquire() #在同一线程内,程序不会堵塞。 14 rLock.release() 15 rLock.release()
1.4、threading.Event
1、python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法:set、wait、clear。
2、事件处理的机制:全局定义了一个Flag,如果Flag值为False,那么当程序执行event.wait方法时就会阻塞,
如果Flag值为True,那么event.wait方法时便不再阻塞。
方法说明:
clear --将Flag设置为False
set -- 将Flag设置为True
Event.isSet() --判断标识符是否为True
threading.Event简单实例:
当线程执行的时候,如果flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 threading.Event事件实例 5 """ 6 import threading 7 8 def do(event): 9 print("start.....") 10 event.wait() 11 print("execuse...") 12 13 event_obj = threading.Event() 14 15 for i in range(5): 16 t = threading.Thread(target=do, args=(event_obj,)) 17 t.start() 18 19 event_obj.clear() 20 inp = input(‘input:(true) ‘) 21 if inp == "true": 22 event_obj.set()
代码执行结果:
1.5、threading.Condition(条件变量)
示例说明:当小伙伴a在往火锅里面添加鱼丸,这个就是生产者行为;另外一个小伙伴b在吃掉鱼丸就是消费者行为。当火锅里面鱼丸达到一定数量加满后b才能吃,这就是一种条件判断了。
Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。
可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。
Condition():
acquire(): 线程锁
release(): 释放锁
wait(timeout): 线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。
notify(n=1): 通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程。
生产者与消费者示例:
现实场景:当a同学王火锅里面添加鱼丸加满后(最多3个,加满后通知b去吃掉),通知b同学去吃掉鱼丸(吃到0的时候通知a同学继续添加)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 threading.Condition 5 """ 6 import threading 7 import time 8 9 con = threading.Condition() 10 11 num = 0 12 13 #生产者 14 class Producer(threading.Thread): 15 16 def __init__(self): 17 threading.Thread.__init__(self) 18 19 def run(self): 20 #锁定线程 21 global num 22 con.acquire() #获取锁 23 while True: 24 print("a同学开始添加......") 25 num += 1 26 print("锅里丸子个数为:%s" % str(num)) 27 time.sleep(1) 28 if num >= 3: 29 print("丸子个数已经达到3个了,无法添加。") 30 #唤醒等待的线程 31 con.notify() #唤醒同学开吃 32 #等待通知 33 con.wait() 34 35 #释放锁 36 con.release() 37 38 #消费者 39 class Consumers(threading.Thread): 40 def __init__(self): 41 threading.Thread.__init__(self) 42 43 def run(self): 44 con.acquire() 45 global num 46 while True: 47 print("我准备开吃了...") 48 num -= 1 49 print("锅里丸子数量为:%s" % str(num)) 50 time.sleep(2) 51 if num <= 0: 52 print("丸子吃完了,赶紧添加啦..") 53 con.notify() #唤醒等待的线程 54 #等待通知 55 con.wait() 56 con.release() #释放锁 57 58 p = Producer() 59 c = Consumers() 60 p.start() 61 c.start()
代码执行结果:
1.6、Queue模块
1.6.1、创建一个“队列”对象
import queue
q = queue.queue(maxsize = 10)
queue.queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。
1.6.2、将一个值放入队列中
q.put(10)
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。
1.6.3、将一个值从队列中取出
q.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
1.6.4、Python queue模块有三种队列及构造函数:
1、Python queue模块的FIFO队列先进先出。 class queue.queue(maxsize)
2、LIFO类似于堆,即先进后出。 class queue.Lifoqueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。 class queue.Priorityqueue(maxsize)
1.6.5、queue常用方法(q =queue.queue()):
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)
非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作
1.6.6、简单的queue实例:生产者-消费者模型
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 Queue队列 5 """ 6 import queue 7 import threading 8 9 10 message = queue.Queue(10) 11 12 def producer(i): 13 while True: 14 message.put(i) 15 16 def consumer(i): 17 while True: 18 msg = message.get(i) 19 20 for i in range(12): 21 t = threading.Thread(target=producer, args=(i, )) 22 t.start() 23 for i in range(10): 24 t = threading.Thread(target=consumer, args=(i, )) 25 t.start()
1.7、自定义线程池
1.7.1、方法一:简单往队列中传输线程数
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 自定义线程池 5 方法一:简单往队列中传输线程数 6 """ 7 import threading 8 import time 9 import queue 10 11 class ThreadingPool(): 12 def __init__(self, max_num = 10): 13 self.queue = queue.Queue(max_num) 14 for i in range(max_num): 15 self.queue.put(threading.Thread) 16 17 def getthreading(self): 18 return self.queue.get() 19 20 def addthreading(self): 21 self.queue.put(threading.Thread) 22 23 24 def func(p, i): 25 time.sleep(1) 26 print(i) 27 p.addthreading() 28 29 if __name__ == "__main__": 30 p = ThreadingPool() 31 for i in range(12): 32 thread = p.getthreading() 33 t = thread(target = func, args = (p, i)) 34 t.start()
代码执行结果:
1.7.2、方法二:往队列中无限添加任务
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 自定义线程池 5 方法二:往队列中无限添加任务 6 """ 7 import queue 8 import threading 9 import contextlib 10 import time 11 12 StopEvent = object() 13 14 class ThreadPool(object): 15 16 def __init__(self, max_num): 17 self.q = queue.Queue() 18 self.max_num = max_num 19 20 self.treminal = False 21 self.generate_list = [] 22 self.free_list = [] 23 24 def run(self, func, args, callback=None): 25 """ 26 线程池执行一个任务 27 :param func: 任务函数 28 :param args: 任务函数所需参数 29 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数:1、任务函数执行状态;2、任务函数返回值(默认为None,即不执行回调函数) 30 :return:如果线程池已经终止,则返回True,否则为None 31 """ 32 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 33 self.generate_thread() 34 w = (func, args, callback,) 35 self.q.put(w) 36 37 def generate_thread(self): 38 """ 39 创建一个线程 40 :param self: 41 :return: 42 """ 43 t = threading.Thread(target=self.call) 44 t.start() 45 46 def call(self): 47 """ 48 循环去获取任务函数并执行任务函数 49 :return: 50 """ 51 current_thread = threading.currentThread 52 self.generate_list.append(current_thread) 53 54 event = self.q.get() #获取线程 55 while event != StopEvent: #判断获取的线程数不等于全局变量 56 func, arguments, callback = event #拆分元组, 获取执行函数,参数, 回调函数 57 try: 58 result = func(*arguments) #执行函数 59 status = True 60 61 except Exception as e: #函数执行失败 62 status = False 63 result = e 64 65 if callback is not None: 66 try: 67 callback(status, result) 68 except Exception as e: 69 pass 70 71 with self.work_state(): 72 event = self.q.get() 73 74 else: 75 self.generate_list.remove(current_thread) 76 77 78 def close(self): 79 """ 80 关闭线程,给传输全局非元组的变量来进行关闭 81 :return: 82 """ 83 for i in range(len(self.generate_list)): 84 self.q.put(StopEvent) 85 86 87 def terminate(self): 88 """ 89 突然关闭线程 90 :return: 91 """ 92 self.terminal = True 93 while self.generate_list: 94 self.q.put(StopEvent) 95 self.q.empty() 96 97 98 def work_state(self): 99 self.free_list.append(threading.current_thread) 100 try: 101 yield 102 finally: 103 self.free_list.remove(threading.currentThread) 104 105 def work(i): 106 print(i) 107 return i + 1 #返回给回调函数 108 109 def callback(ret): 110 print(ret) 111 112 pool = ThreadPool(10) 113 for item in range(50): 114 pool.run(func=work, args=(item, ), callback=callback) 115 116 pool.terminate()
标签:message mon 意义 条件 事件处理 后台 task 定义 远程
原文地址:https://www.cnblogs.com/june-L/p/11795631.html