标签:read name 优先 producer 用户 并发编程 res 意义 bytecode
线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间。当进程退出时该进程所产生的线程都会被强制退出并清除。线程可与属于同一进程的其他线程共享进程所拥有的全部资源,但是起本身基本上不拥有系统资源,只拥有一点在运行中必不可少的信息(如程序计数器,一组寄存器和栈)。
进程是正在执行的程序实例。进程至少由一个线程组成,线程是最小的执行单元。
Threading方法: t.start() # 激活线程 t.getName() # 获取线程的名称 t.setName() # 设置线程的名称 t.name # 获取或设置线程的名称 t.is_alive() # 检查线程是否在运行中 t.isAlive() # 检查线程是否在运行中 t.setDaemon() # 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止 t.isDaemon() # 判断是否为守护线程 t.ident # 获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。 t.join() # 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义 t.run() # 线程被cpu调度后自动执行线程对象的run方法
线程调用的两种方式:
# -*- coding:utf-8 -*- import threading import time def run(num): print("Running on Number: %s" % num) time.sleep(1) if __name__ == ‘__main__‘: t1 = threading.Thread(target=run, args=(1,)) t2 = threading.Thread(target=run, args=(2,)) t1.start() # 启动线程 t2.start() start_time = time.time() t1.join() # 等待线程t1执行完毕返回结果 t2.join() end_time = time.time() cost_time = end_time - start_time print(cost_time) print(t1.getName()) # 获取线程名 print(t2.getName()) t1.setName(‘Number 1‘) # 设置线程名 t2.setName(‘Number 2‘) print(t1.getName()) print(t2.getName()) print(t1.ident) # 获取线程ID print(t2.ident)
# -*- coding:utf-8 -*- import threading class MyThread(threading.Thread): def __init__(self, num): super(MyThread, self).__init__() self.num = num def run(self): print("Running on Number: %s" % self.num) time.sleep(1) if __name__ == ‘__main__‘: t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
# _*_coding:utf-8_*_ import time import threading def run(n): print(‘[%s]------running----\n‘ % n) time.sleep(2) print(‘--done--‘) def main(): for i in range(5): t = threading.Thread(target=run, args=[i, ]) t.start() t.join(1) print(‘starting thread‘, t.getName()) m = threading.Thread(target=main, args=[]) m.setDaemon(True) # 将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务 m.start() m.join(timeout=2) print("---main thread done----")
用线程将main函数启动,作为主线程,主线程作为守护线程时,当主线程退出,main函数中启动的子线程也会退出,无论run函数是否执行结束。
一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?在Python2.x中,如果不加锁,修改后的数据不一定为0,而Python3.x中,即使不加锁,结果总是为0。有可能是Python3.x中自动加锁。
# _*_coding:utf-8_*_ import time import threading def addNum(): global num # 在每个线程中都获取这个全局变量 print(‘--get num:‘, num) time.sleep(1) lock.acquire() # 修改数据前请求锁 num -= 1 # 对此公共变量进行-1操作 lock.release() # 修改数据后释放锁 num = 100 # 设定一个共享变量 thread_list = [] lock = threading.Lock() # 生成全局锁 for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: # 等待所有线程执行完毕 t.join() print(‘final num:‘, num)
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思就是,无论你启动多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行。
首先需要明确的一点是GIL
并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL
归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。
Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock?注意啦,这里的lock是用户级的lock,跟那个GIL没关系。
说白了就是在一个大锁中还要再包含子锁。RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。
# _*_coding:utf-8_*_ import time import threading def run1(): print("grab the first part data") lock.acquire() # 5、请求锁(小锁) global num num += 1 lock.release() # 6、释放锁(小锁) return num def run2(): print("grab the second part data") lock.acquire() global num2 num2 += 1 lock.release() return num2 def run3(): lock.acquire() # 3、请求锁(大锁) res = run1() # 4、运行run1() print(‘--------between run1 and run2-----‘) res2 = run2() lock.release() # 7、释放锁(大锁) print(res, res2) if __name__ == ‘__main__‘: num, num2 = 0, 0 lock = threading.RLock() # 1、生成递归锁 for i in range(10): t = threading.Thread(target=run3) # 2、启动线程 t.start() while threading.active_count() != 1: # 返回当前活跃的Thread对象数量 print(threading.active_count()) else: print(‘----all threads done---‘) print(num, num2)
互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据。
# _*_coding:utf-8_*_ import threading, time def run(n): semaphore.acquire() time.sleep(1) print("run the thread: %s\n" % n) semaphore.release() if __name__ == ‘__main__‘: num = 0 semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行 for i in range(20): t = threading.Thread(target=run, args=(i,)) t.start() while threading.active_count() != 1: pass # print threading.active_count() else: print(‘----all threads done---‘) print(num)
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
# _*_coding:utf-8_*_ import threading def do(event): print(‘start‘) event.wait() # 等待“Flag”设置为True print(‘execute‘) event_obj = threading.Event() # 生成事件 for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) # 将事件作为参数传进do函数 t.start() event_obj.clear() # 将“Flag”设置为False inp = input(‘input:‘) if inp == ‘true‘: event_obj.set() # 将“Flag”设置为True
红绿灯例子:
# _*_coding:utf-8_*_ import threading, time import random def light(): if not event.isSet(): event.set() # wait就不阻塞 # 绿灯状态 count = 0 while True: if count < 10: print(‘--green light on---‘) elif count < 13: print(‘--yellow light on---‘) elif count < 20: if event.isSet(): event.clear() print(‘--red light on---‘) else: count = 0 event.set() # 打开绿灯 time.sleep(1) count += 1 def car(n): while 1: time.sleep(random.randrange(10)) if event.isSet(): # 绿灯 print("car [%s] is running.." % n) else: print("car [%s] is waiting for the red light.." % n) if __name__ == ‘__main__‘: event = threading.Event() Light = threading.Thread(target=light) Light.start() for i in range(3): t = threading.Thread(target=car, args=(i,)) t.start()
这里还有一个event使用的例子,员工进公司门要刷卡, 我们这里设置一个线程是“门”, 再设置几个线程为“员工”,员工看到门没打开,就刷卡,刷完卡,门开了,员工就可以通过。
# _*_coding:utf-8_*_ import threading import time import random def door(): door_open_time_counter = 0 while True: if door_swiping_event.is_set(): # 判断标识位是否为True print("door opening....") door_open_time_counter += 1 else: print("door closed...., swipe to open.") door_open_time_counter = 0 # 清空计时器 door_swiping_event.wait() if door_open_time_counter > 3: # 门开了已经3s了,该关了 door_swiping_event.clear() time.sleep(0.5) def staff(n): print("staff [%s] is comming..." % n) while True: if door_swiping_event.is_set(): print("door is opened, passing.....") break else: print("staff [%s] sees door got closed, swipping the card....." % n) print(door_swiping_event.set()) door_swiping_event.set() print("after set ", door_swiping_event.set()) time.sleep(0.5) door_swiping_event = threading.Event() # 设置事件 door_thread = threading.Thread(target=door) door_thread.start() for i in range(5): p = threading.Thread(target=staff, args=(i,)) time.sleep(random.randrange(3)) p.start()
Python3.5中,队列是线程间最常用的交换数据的形式。Queue模块是提供队列操作的模块,虽然简单易用,但是不小心的话,还是会出现一些意外。
Python queue模块有三种队列及构造函数:
1、Python Queue模块的FIFO队列先进先出。 class Queue.Queue(maxsize)
2、LIFO类似于堆,即先进后出。 class Queue.LifoQueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。 class Queue.PriorityQueue(maxsize)
# _*_coding:utf-8_*_ # 创建一个“队列”对象 import queue q = queue.Queue(maxsize=10) # 将10个值放入队列中 for i in range(10): q.put(i) # 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为 # 1、如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。 print(q.maxsize) # 可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。 # 将10个值从队列中取出 for i in range(11): print(q.get()) # 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。 # 如果队列为空且block为False,队列将引发Empty异常。 # 此包中的常用方法(q = Queue.Queue()): # q.qsize() # 返回队列的大小 # q.empty() # 如果队列为空,返回True,反之False # q.full() # 如果队列满了,返回True,反之False # q.full 与 q.maxsize 大小对应 # q.get([block[, timeout]]) # 获取队列,timeout等待时间 # q.get_nowait() # 相当q.get(False) # 非阻塞 q.put(item) 写入队列,timeout等待时间 # q.put_nowait(item) # 相当q.put(item, False) # q.task_done() # 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号 # q.join() # 实际上意味着等到队列为空,再执行别的操作
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式?
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式?
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
下面来学习一个最基本的生产者消费者模型的例子:
# _*_coding:utf-8_*_ import threading import queue def producer(): for i in range(10): q.put("骨头 %s" % i) print("开始等待所有的骨头被取走...") q.join() print("所有的骨头被取完了...") def consumer(name): while q.qsize() > 0: print("%s 取到" % name, q.get()) q.task_done() # 告知这个任务执行完了 q = queue.Queue() p = threading.Thread(target=producer, ) p.start() c1 = consumer("狗")
再来一个吃包子的例子:
# _*_coding:utf-8_*_ import time, random import queue, threading q = queue.Queue() def Producer(name): count = 0 while count < 20: time.sleep(random.randrange(3)) q.put(count) print(‘Producer %s has produced %s baozi..‘ %(name, count)) count += 1 def Consumer(name): count = 0 while count < 20: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() print(data) print(‘\033[32;1mConsumer %s has eat %s baozi...\033[0m‘ %(name, data)) else: print("-----no baozi anymore----") count += 1 p1 = threading.Thread(target=Producer, args=(‘A‘,)) c1 = threading.Thread(target=Consumer, args=(‘B‘,)) p1.start() c1.start()
标签:read name 优先 producer 用户 并发编程 res 意义 bytecode
原文地址:http://www.cnblogs.com/kirusx/p/7113626.html