标签:
进程:
优点:同时利用多个cpu,能够同时进行多个操作
缺点:耗费资源(重新开辟内存空间)
线程:
优点:共享内存,IO操作的时候,创造并发操作
缺点:抢占资源
进程不是越多越好,cpu个数 = 进程个数
线程也不是越多越好,具体案例具体分析,请求上下文切换耗时
计算机中执行任务的最小单元:线程
IO操作利用cpu
GIL 全局解释锁
IO密集型(不用cpu):
多线程
计算密集型(用cpu):
多进程
进程和线程的目的:提高执行效率
1,截止目前写的程序:单进程单线程,主进程、主线程
2,自定义线程:
主进程
子进程
创建线程
import time import threading def f0(): pass def f1(a1,a2): time.sleep(10) f0() t = threading.Thread(target=f1,args=(123,111,)) t.setDaemon(True) #默认是False表示等待,True后就表示不等待 t.start() t = threading.Thread(target=f1,args=(123,111,)) t.setDaemon(True) t.start() t = threading.Thread(target=f1,args=(123,111,)) t.setDaemon(True) t.start()
#t.setDaemon(True) 表示不等待
#t.setDaemon(False) 表示等待
threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。
import threading import time def worker(num): time.sleep(1) print("Thread %d" % num) return for i in range(20): t = threading.Thread(target=worker, args=(i,), name="t.%d" % i) #括号内部的:函数名,参数,线程起名 t.start()
thread方法
1)t.start() : 激活线程,
2)t.getName() : 获取线程的名称
3)t.setName() : 设置线程的名称
4)t.name : 获取或设置线程的名称
5)t.is_alive() : 判断线程是否为激活状态
6)t.isAlive() : 判断线程是否为激活状态
7)t.setDaemon(): 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。
如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;
如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
8)t.isDaemon() : 判断是否为守护线程
9)t.ident : 获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
10)t.join(): 等待执行 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
11)t.run(): 线程被cpu调度后自动执行线程对象的run方法
线程锁 threading.RLock
import threading import time globals_num = 0 lock = threading.RLock() def Func(): lock.acquire() #获得锁 global globals_num #声明全局变量 globals_num += 1 time.sleep(1) print(globals_num) lock.release() #释放锁 for i in range(10): t = threading.Thread(target=Func) t.start()
threading.Lock
import threading lock = threading.Lock() #Lock对象 lock.acquire() lock.acquire() #产生了死琐。 lock.release() lock.release()
threading.RLock
import threading rLock = threading.RLock() #RLock对象 rLock.acquire() rLock.acquire() #在同一线程内,程序不会堵塞。 rLock.release() rLock.release() # 使用RLock,那么acquire和release必须成对出现, # 即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。 # threading.Lock与threading.RLock差别
threading.Event 线程间通信的机制
Event.wait([timeout]) : 堵塞线程,直到Event对象内部标识位被设为True或超时(如果提供了参数timeout)。
Event.set() :将标识位设为Ture
Event.clear() : 将标识位设为False。
Event.isSet() :判断标识位是否为Ture。
import threading def do(event): print(‘start‘) event.wait() print(‘execute‘) event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear() inp = input(‘input:‘) if inp == ‘true‘: event_obj.set()
Queue 就是队列,它是线程安全的
队列的特性:先进先出
import queue q = queue.Queue(maxsize=0) #构造一个先进先出的队列,maxsize指定队列长度,为0时,表示队列长度无限。 q.join() #等到队列为None的时候,再执行别的操作 q.qsize() #返回队列的大小(不可靠) q.empty() #当队列为空的时候,返回True 否则返回False(不可靠) q.full() #当队列满的时候,返回True,否则返回False(不可靠) q.put(item, block=True, timeout=None) #将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置 #为Flase时为非阻塞,此时如果队列已经满,会引发queue.Full异常。可以选参数timeout,表示会阻塞的时间, #如果队列无法给出放入item的位置,则引发queue.Full异常。 q.get(block=True, timeout=None) #等 移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞, #若此时队列为空,则引发 queue.Empty异常。可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。 q.put_nowait(item) #等效put(item,block=False) q.get_nowait() #不等 等效于 get(item,block=False)
生产者将数据依次存入队列,消费者依次从队列中取出数据。
实例
import queue import threading message = queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() print(msg) for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
from multiprocessing import Process import time li = [] def foo(i): li.append(i) print(‘say hi ‘, li) if __name__ == "__main__": for i in range(10): p = Process(target=foo, args=(i,)) p.start()
multiprocessing是python的多进程管理包,和threading.Thread类似。直接从侧面用subprocesses替换线程使用GIL的方式,由于这一点,multiprocessing模块可
以让程序员在给定的机器上充分的利用CPU。
在multiprocessing中,通过创建Process对象生成进程,然后调用它的start()方法,
from multiprocessing import Process def f(name): print(‘hello‘, name) if __name__ == ‘__main__‘: p = Process(target=f, args=(‘bob‘,)) #括号里的两个值,调用函数 给函数传一个参数 p.start() # 激活线程 p.join() #相当于‘wait’等待
进程各自持有一份数据,默认无法共享数据
from multiprocessing import Process from multiprocessing import Manager import time li = [] def foo(i): li.append(i) print(‘say hi‘,li) if __name__ == ‘__main__‘: for i in range(10): p = Process(target=foo,args=(i,)) p.start() print(‘ending‘,li)
两种共享方法:
方法一:Array()
from multiprocessing import Process, Array temp = Array(‘i‘, [11, 22, 33, 44]) def Foo(i): temp[i] = 100+i for item in temp: print(i, ‘----->‘, item) if __name__ == ‘__main__‘: for i in range(2): p = Process(target=Foo, args=(i,)) p.start()
方法二:manage.dict()
from multiprocessing import Process,Manager def Foo(i,dic): dic [i] = 100+i print(len(dic)) if __name__ == ‘__main__‘: manage = Manager() dic = manage.dict() #定义字典 for i in range(2): p = Process(target=Foo,args=(i,dic,)) p.start() p.join() # 输出: # 1 # 2
#上面代码实现进程数据共享,与上面对比。 from multiprocessing import Process,Manager def Foo(i,dic): dic [i] = 100+i print(len(dic)) if __name__ == ‘__main__‘: manage = Manager() dic = {} #普通的字典 for i in range(2): p = Process(target=Foo,args=(i,dic,)) p.start() p.join() # 输出: # 1 # 1
&
‘c‘: ctypes.c_char, ‘u‘: ctypes.c_wchar, ‘b‘: ctypes.c_byte, ‘B‘: ctypes.c_ubyte, ‘h‘: ctypes.c_short, ‘H‘: ctypes.c_ushort, ‘i‘: ctypes.c_int, ‘I‘: ctypes.c_uint, ‘l‘: ctypes.c_long, ‘L‘: ctypes.c_ulong, ‘f‘: ctypes.c_float, ‘d‘: ctypes.c_double 类型对应表
在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用多进程的时候。 如果你真有需要 要共享数据,
multiprocessing提供了两种方式:
1)Shared memory
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == ‘__main__‘: num = Value(‘d‘, 0.0) arr = Array(‘i‘, range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) print(arr[:2])
#创建num和arr时,“d”和“i”参数由Array模块使用的typecodes创建:
#“d”表示一个双精度的浮点数,
#“i”表示一个有符号的整数,这些共享对象将被线程安全处理。
2)Server process
from multiprocessing import Process, Manager def f(d, l): d[1] = ‘1‘ d[‘2‘] = 2 d[0.25] = None l.reverse() #反转 if __name__ == ‘__main__‘: with Manager() as manager: #上下文管理 d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)
Pool类描述一个工作进程池
p = Pool(5) #一次最多执行5个进程
p.apply 每一个任务是排队进行的;进程.join()
p.apply_async 每一个任务都并发进行,可以设置回调函数;进程.无join();进程daemon = True
1)p.apply 每一个任务是排队进行的 进程,join()
from multiprocessing import Pool import time def f1(a): time.sleep(1) print(a) return 1000 def f2(arg): print(arg) if __name__ == ‘__main__‘: pool = Pool(5) for i in range(10): pool.apply(func=f1, args=(i,)) #apply 每一个任务是排队进行的 print(‘8888‘) pool.close() pool.join()
2)p.apply_async 每一个任务都并发进行,可以设置回调函数; 进程.无join();进程daemon =True
from multiprocessing import Pool import time def f1(a): time.sleep(1) print(a) return 1000 def f2(arg): print(arg) if __name__ == ‘__main__‘: pool = Pool(5) for i in range(10): pool.apply_async(func=f1, args=(i,), callback=f2) #apply_async 每一个任务都并发进行 print(‘8888‘) pool.close() pool.join()
import queue import threading import time class ThreadPool(object): def __init__(self, max_num=20): self.queue = queue.Queue(max_num) #创建一个最大长度为20的队列 for i in range(max_num): #循环把线程对象加入到队列中 self.queue.put(threading.Thread) #把线程的类名放进去,执行完这个Queue def get_thread(self): #定义方法从队列里获取线程 return self.queue.get() #在队列中获取值 def add_thread(self): #线程执行完任务后,在队列里添加线程 self.queue.put(threading.Thread) def func(pool,a1): time.sleep(1) print(a1) pool.add_thread() #线程执行完任务后,队列里再加一个线程 p = ThreadPool(10) #执行init方法; 一次最多执行10个线程 for i in range(100): thread = p.get_thread() #线程池10个线程,每一次循环拿走一个拿到类名,没有就等待 t = thread(target=func, args=(p, i,)) #创建线程; 线程执行func函数的这个任务;args是给函数传入参数 t.start() #激活线程
¥
标签:
原文地址:http://www.cnblogs.com/kongqi816-boke/p/5597161.html