码迷,mamicode.com
首页 > 编程语言 > 详细

python--线程知识详解

时间:2019-11-04 23:20:23      阅读:160      评论:0      收藏:0      [点我收藏+]

标签:message   mon   意义   条件   事件处理   后台   task   定义   远程   

Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

1.1、threading模块

threading模块建立在_thread模块之上。thread模块以低级=原始的方式来处理和控制线程,而threading模块
通过对thread进行二次封装,提供了更方便的api来处理线程。

简单的线程实例:
创建了20个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 """
 4 创建一个简单的threading线程实例
 5 """
 6 import threading
 7 import time
 8 
 9 def to_worker(num):
10     """
11     线程方法
12     :param num:
13     :return:
14     """
15     time.sleep(1)
16     print("The num is %s" % num)
17     return
18 
19 for i in range(5):
20     t = threading.Thread(target=to_worker, args=(i, ))
21     t.start()  #激活线程

代码执行结果:

技术图片

 

1.2、创建线程的构造方法

t = threading.Thread(group = None, target = None, name = Nome, args = 0, kwargs = {})

注释说明:
group --线程组
target --要执行的方法
name --线程名
args/kwargs -要传入方法的参数

Thread类提供了以下方法:
1、t.start() --激活线程
2、t.getName() --获取线程的名称
3、t.setName() --设置线程的名称
4、t.name() --获取或设置线程的名称
5、t.is_alive() --判断线程是否为激活状态
6、t.isAlive() --判断线程是否为激活状态
7、t.setDaemon() --设置为后台线程或前台线程(默认:False)
8、t.isDaemon() --判断是否为守护线程
9、t.ident() --获取线程的标识符。线程标识符是一个非零整数,只有在调用start()方法后,该属性才有效,否则它只返回None
10、t.join() --逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义。
11、t.run() --线程被cpu调度后自动执行线程对象的run方法

1.3、python线程锁

当有一个数据有多个线程对其进行修改的时候,任何一个线程改变他都会对其他线程造成影响,如果我们想某一个线程在使用完之前,其他线程不能对其修改,就需要对这个线程加一个线程锁。

简单的线程锁实例:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 """
 4 线程锁小实例
 5 """
 6 import threading
 7 import time
 8 
 9 globals_num = 0
10 
11 lock = threading.RLock()
12 
13 def Func():
14     lock.acquire()  #获取锁
15     global globals_num
16     globals_num += 1
17     time.sleep(1)
18     print(globals_num)
19     lock.release()  #释放锁
20 
21 for i in range(10):
22     t = threading.Thread(target=Func)
23     t.start()

代码执行结果:

技术图片

 

threading.RLock和threading.Lock 的区别
RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

 1 import threading
 2 lock = threading.Lock()    #Lock对象
 3 lock.acquire()
 4 lock.acquire()  #产生了死琐。
 5 lock.release()
 6 lock.release() 
 7 
 8 
 9 
10 import threading
11 rLock = threading.RLock()  #RLock对象
12 rLock.acquire()
13 rLock.acquire()    #在同一线程内,程序不会堵塞。
14 rLock.release()
15 rLock.release()

 

1.4、threading.Event

1、python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法:set、wait、clear。
2、事件处理的机制:全局定义了一个Flag,如果Flag值为False,那么当程序执行event.wait方法时就会阻塞,
如果Flag值为True,那么event.wait方法时便不再阻塞。

方法说明:
clear --将Flag设置为False
set -- 将Flag设置为True
Event.isSet() --判断标识符是否为True


threading.Event简单实例:
当线程执行的时候,如果flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 """
 4 threading.Event事件实例
 5 """
 6 import threading
 7 
 8 def do(event):
 9     print("start.....")
10     event.wait()
11     print("execuse...")
12 
13 event_obj = threading.Event()
14 
15 for i in range(5):
16     t = threading.Thread(target=do, args=(event_obj,))
17     t.start()
18 
19 event_obj.clear()
20 inp = input(input:(true) )
21 if inp == "true":
22     event_obj.set()

代码执行结果:

技术图片

 

1.5、threading.Condition(条件变量)

示例说明:当小伙伴a在往火锅里面添加鱼丸,这个就是生产者行为;另外一个小伙伴b在吃掉鱼丸就是消费者行为。当火锅里面鱼丸达到一定数量加满后b才能吃,这就是一种条件判断了。

Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。

可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。

Condition():

acquire(): 线程锁
release(): 释放锁
wait(timeout): 线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。
notify(n=1): 通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程。


生产者与消费者示例:
现实场景:当a同学王火锅里面添加鱼丸加满后(最多3个,加满后通知b去吃掉),通知b同学去吃掉鱼丸(吃到0的时候通知a同学继续添加)

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 """
 4 threading.Condition
 5 """
 6 import threading
 7 import time
 8 
 9 con = threading.Condition()
10 
11 num = 0
12 
13 #生产者
14 class Producer(threading.Thread):
15 
16     def __init__(self):
17         threading.Thread.__init__(self)
18 
19     def run(self):
20         #锁定线程
21         global  num
22         con.acquire()  #获取锁
23         while True:
24             print("a同学开始添加......")
25             num += 1
26             print("锅里丸子个数为:%s" % str(num))
27             time.sleep(1)
28             if num >= 3:
29                 print("丸子个数已经达到3个了,无法添加。")
30                 #唤醒等待的线程
31                 con.notify()  #唤醒同学开吃
32                 #等待通知
33                 con.wait()
34 
35         #释放锁
36         con.release()
37 
38 #消费者
39 class Consumers(threading.Thread):
40     def __init__(self):
41         threading.Thread.__init__(self)
42 
43     def run(self):
44         con.acquire()
45         global num
46         while True:
47             print("我准备开吃了...")
48             num -= 1
49             print("锅里丸子数量为:%s" % str(num))
50             time.sleep(2)
51             if num <= 0:
52                 print("丸子吃完了,赶紧添加啦..")
53                 con.notify()  #唤醒等待的线程
54                 #等待通知
55                 con.wait()
56         con.release()  #释放锁
57 
58 p = Producer()
59 c = Consumers()
60 p.start()
61 c.start()

代码执行结果:

技术图片

 

1.6、Queue模块

1.6.1、创建一个“队列”对象
import queue
q = queue.queue(maxsize = 10)
queue.queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

1.6.2、将一个值放入队列中
q.put(10)
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

1.6.3、将一个值从队列中取出
q.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

1.6.4、Python queue模块有三种队列及构造函数:
1、Python queue模块的FIFO队列先进先出。 class queue.queue(maxsize)
2、LIFO类似于堆,即先进后出。 class queue.Lifoqueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。 class queue.Priorityqueue(maxsize)

1.6.5、queue常用方法(q =queue.queue()):
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)
非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作

1.6.6、简单的queue实例:生产者-消费者模型

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 """
 4 Queue队列
 5 """
 6 import queue
 7 import threading
 8 
 9 
10 message = queue.Queue(10)
11 
12 def producer(i):
13     while True:
14         message.put(i)
15 
16 def consumer(i):
17     while True:
18         msg = message.get(i)
19 
20 for i in range(12):
21     t = threading.Thread(target=producer, args=(i, ))
22     t.start()
23 for i in range(10):
24     t = threading.Thread(target=consumer, args=(i, ))
25     t.start()

 

1.7、自定义线程池

1.7.1、方法一:简单往队列中传输线程数

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 """
 4 自定义线程池
 5 方法一:简单往队列中传输线程数
 6 """
 7 import threading
 8 import time
 9 import queue
10 
11 class ThreadingPool():
12     def __init__(self, max_num = 10):
13         self.queue = queue.Queue(max_num)
14         for i in range(max_num):
15             self.queue.put(threading.Thread)
16 
17     def getthreading(self):
18         return self.queue.get()
19 
20     def addthreading(self):
21         self.queue.put(threading.Thread)
22 
23 
24 def func(p, i):
25     time.sleep(1)
26     print(i)
27     p.addthreading()
28 
29 if __name__ == "__main__":
30     p = ThreadingPool()
31     for i in range(12):
32         thread = p.getthreading()
33         t = thread(target = func, args = (p, i))
34         t.start()

代码执行结果:

技术图片

 

1.7.2、方法二:往队列中无限添加任务

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 """
  4 自定义线程池
  5 方法二:往队列中无限添加任务
  6 """
  7 import queue
  8 import threading
  9 import contextlib
 10 import time
 11 
 12 StopEvent = object()
 13 
 14 class ThreadPool(object):
 15 
 16     def __init__(self, max_num):
 17         self.q = queue.Queue()
 18         self.max_num = max_num
 19 
 20         self.treminal = False
 21         self.generate_list = []
 22         self.free_list = []
 23 
 24     def run(self, func, args, callback=None):
 25         """
 26         线程池执行一个任务
 27         :param func: 任务函数
 28         :param args: 任务函数所需参数
 29         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数:1、任务函数执行状态;2、任务函数返回值(默认为None,即不执行回调函数)
 30         :return:如果线程池已经终止,则返回True,否则为None
 31         """
 32         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 33             self.generate_thread()
 34             w = (func, args, callback,)
 35             self.q.put(w)
 36 
 37     def generate_thread(self):
 38         """
 39         创建一个线程
 40         :param self:
 41         :return:
 42         """
 43         t = threading.Thread(target=self.call)
 44         t.start()
 45 
 46     def call(self):
 47         """
 48         循环去获取任务函数并执行任务函数
 49         :return:
 50         """
 51         current_thread = threading.currentThread
 52         self.generate_list.append(current_thread)
 53 
 54         event = self.q.get()  #获取线程
 55         while event != StopEvent: #判断获取的线程数不等于全局变量
 56             func, arguments, callback = event  #拆分元组, 获取执行函数,参数, 回调函数
 57             try:
 58                 result = func(*arguments) #执行函数
 59                 status = True
 60 
 61             except Exception as e: #函数执行失败
 62                 status = False
 63                 result = e
 64 
 65             if callback is not None:
 66                 try:
 67                     callback(status, result)
 68                 except Exception as e:
 69                     pass
 70 
 71             with self.work_state():
 72                 event = self.q.get()
 73 
 74         else:
 75             self.generate_list.remove(current_thread)
 76 
 77 
 78     def close(self):
 79         """
 80         关闭线程,给传输全局非元组的变量来进行关闭
 81         :return:
 82         """
 83         for i in range(len(self.generate_list)):
 84             self.q.put(StopEvent)
 85 
 86 
 87     def terminate(self):
 88         """
 89         突然关闭线程
 90         :return:
 91         """
 92         self.terminal = True
 93         while self.generate_list:
 94             self.q.put(StopEvent)
 95         self.q.empty()
 96 
 97 
 98     def work_state(self):
 99         self.free_list.append(threading.current_thread)
100         try:
101             yield
102         finally:
103             self.free_list.remove(threading.currentThread)
104 
105 def work(i):
106     print(i)
107     return i + 1 #返回给回调函数
108 
109 def callback(ret):
110     print(ret)
111 
112 pool = ThreadPool(10)
113 for item in range(50):
114     pool.run(func=work, args=(item, ), callback=callback)
115 
116 pool.terminate()

 

python--线程知识详解

标签:message   mon   意义   条件   事件处理   后台   task   定义   远程   

原文地址:https://www.cnblogs.com/june-L/p/11795631.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!