标签:用法 exe 个人 syn down ssi .com 线程池 rand
一、定义:
线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是执行单位
二、线程定义方式:
1、使用替换threading模块提供的Thread
from threading import Thread from multiprocessing import Process def task(): print(‘is running‘) if __name__ == ‘__main__‘: t=Thread(target=task,) # t=Process(target=task,) t.start() print(‘主‘)
2、自定义类,继承Thread
from threading import Thread from multiprocessing import Process class MyThread(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): print(‘%s is running‘ %self.name) if __name__ == ‘__main__‘: t=MyThread(‘egon‘) # t=Process(target=task,) t.start() print(‘主‘)
三、多线程共享同一个进程内的资源
因为线程间的数据是共享的所以都会用同一个资源
from threading import Thread from multiprocessing import Process n=100 def work(): global n n=0 if __name__ == ‘__main__‘: # p=Process(target=work,) # p.start() # p.join() # print(‘主‘,n) t=Thread(target=work,) t.start() t.join() print(‘主‘,n)
四、其它相关函数
Thread实例对象的方法 # isAlive(): 返回线程是否活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。 threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程变量。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
from threading import Thread,activeCount,enumerate,current_thread import time def task(): print(‘%s is running‘ %current_thread().getName()) time.sleep(2) if __name__ == ‘__main__‘: t=Thread(target=task,) t.start() t.join() print(t.is_alive()) print(t.getName()) print(enumerate()) print(‘主‘) print(activeCount())
current_thread的用法 from threading import Thread,activeCount,enumerate,current_thread from multiprocessing import Process import time def task(): print(‘%s is running‘ %current_thread().getName()) time.sleep(2) if __name__ == ‘__main__‘: p=Process(target=task) p.start() print(current_thread())
from threading import Thread,activeCount,enumerate,current_thread from multiprocessing import Process import time def task(): print(‘%s is running‘ %current_thread().getName()) time.sleep(2) if __name__ == ‘__main__‘: t1=Thread(target=task) t2=Thread(target=task) t3=Thread(target=task) t1.start() t2.start() t3.start() print(current_thread())
五、守护线程
守护线程则是主线程等待其它非守护线程结束,主线程结束则守护线程结束
#再看:守护线程 from threading import Thread import time def task1(): print(‘123‘) time.sleep(10) print(‘123done‘) def task2(): print(‘456‘) time.sleep(1) print(‘456done‘) if __name__ == ‘__main__‘: t1=Thread(target=task1) t2=Thread(target=task2) t1.daemon=True t1.start() t2.start() print(‘主‘)
六、线程互斥锁
即:线程中谁抢到了锁谁去执行,没有抢到的则在等待
from threading import Thread,Lock import time n=100 def work(): global n mutex.acquire()#抢到锁加锁 temp=n time.sleep(0.1) n=temp-1 mutex.release()#解锁 if __name__ == ‘__main__‘: mutex=Lock() l=[] start=time.time() for i in range(100): t=Thread(target=work) l.append(t) t.start() for t in l: t.join() print(‘run time:%s value:%s‘ %(time.time()-start,n))
七:互斥锁与join的区别
互斥锁只是在重要的代码阶段加上谁抢到谁处理,而join则是一个一个的全部把所有的代码都执行,大大加大执行代码的时间
join实例:
from threading import Thread,Lock import time n=100 def work(): time.sleep(0.05) global n temp=n time.sleep(0.1) n=temp-1 if __name__ == ‘__main__‘: start=time.time() for i in range(100): t=Thread(target=work) t.start() t.join() print(‘run time:%s value:%s‘ %(time.time()-start,n))
互斥锁实例:
#互斥锁 from threading import Thread,Lock import time n=100 def work(): time.sleep(0.05) global n mutex.acquire() temp=n time.sleep(0.1) n=temp-1 mutex.release() if __name__ == ‘__main__‘: mutex=Lock() l=[] start=time.time() for i in range(100): t=Thread(target=work) l.append(t) t.start() for t in l: t.join() print(‘run time:%s value:%s‘ %(time.time()-start,n))
八:线程死锁与递规锁
死锁:则是几个人在抢几把锁,但是一个人抢一把锁,在没有解这把锁,则是去抢另一把,则永远无法抢到,也没法解除当前的锁,由为死锁
from threading import Thread,Lock,RLock import time mutexA=Lock() mutexB=Lock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print(‘\033[45m%s 抢到A锁\033[0m‘ %self.name) mutexB.acquire() print(‘\033[44m%s 抢到B锁\033[0m‘ %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print(‘\033[44m%s 抢到B锁\033[0m‘ %self.name) time.sleep(1) mutexA.acquire() print(‘\033[45m%s 抢到A锁\033[0m‘ %self.name) mutexA.release() mutexB.release()
递归锁:
则需要threading导入RLock,用这个每一个人拿到的都是这把锁,解除这把锁之后才能拿到下把锁,这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
#递归锁 from threading import Thread,Lock,RLock import time mutex=RLock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutex.acquire() print(‘\033[45m%s 抢到A锁\033[0m‘ %self.name) mutex.acquire() print(‘\033[44m%s 抢到B锁\033[0m‘ %self.name) mutex.release() mutex.release() def f2(self): mutex.acquire() print(‘\033[44m%s 抢到B锁\033[0m‘ %self.name) time.sleep(1) mutex.acquire() print(‘\033[45m%s 抢到A锁\033[0m‘ %self.name) mutex.release() mutex.release()
九:信号量
同进程的一样
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()
from threading import Thread,current_thread,Semaphore import time,random sm=Semaphore(5) def work(): sm.acquire() print(‘%s 上厕所‘ %current_thread().getName()) time.sleep(random.randint(1,3)) sm.release() if __name__ == ‘__main__‘: for i in range(20): t=Thread(target=work) t.start()
十:Event
同进程的一样
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。
from threading import Thread,current_thread,Event import time event=Event() def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise ConnectionError(‘链接失败‘) print(‘%s 等待第%s次链接mysql‘ %(current_thread().getName(),count)) event.wait(0.5) count+=1 print(‘%s 链接ok‘ % current_thread().getName()) def check_mysql(): print(‘%s 正在检查mysql状态‘ %current_thread().getName()) time.sleep(1) event.set() if __name__ == ‘__main__‘: t1=Thread(target=conn_mysql) t2=Thread(target=conn_mysql) check=Thread(target=check_mysql) t1.start() t2.start() check.start()
十一:定时器
定义:指定n秒后执行某操作
from threading import Timer def hello(n): print("hello, world",n) t = Timer(3, hello,args=(11,))#3秒后执行 t.start() # after 1 seconds, "hello, world" will be printed
十二:线程queue
定义:线程的队列,使用import queue,用法与进程Queue一样
import queue q=queue.Queue(3) #队列:先进先出 q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) q=queue.LifoQueue(3) #堆栈:后进先出 q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) q=queue.PriorityQueue(3) #数字越小优先级越高 q.put((10,‘data1‘)) q.put((11,‘data2‘)) q.put((9,‘data3‘)) print(q.get()) print(q.get()) print(q.get())
十三、线程池
定义:则是同时开启多少线程,如果并发则用的线程名则还是已开启的
#线程池 import requests #pip3 install requests import os,time,threading from multiprocessing import Pool from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor def get_page(url): print(‘<%s> get :%s‘ %(threading.current_thread().getName(),url)) respone = requests.get(url) if respone.status_code == 200: return {‘url‘:url,‘text‘:respone.text} def parse_page(obj): dic=obj.result() print(‘<%s> parse :%s‘ %(threading.current_thread().getName(),dic[‘url‘])) time.sleep(0.5) res=‘url:%s size:%s\n‘ %(dic[‘url‘],len(dic[‘text‘])) #模拟解析网页内容 with open(‘db.txt‘,‘a‘) as f: f.write(res) if __name__ == ‘__main__‘: # p=Pool(4) p=ThreadPoolExecutor(3) #同时开始3个线程 urls = [ ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ] for url in urls: # p.apply_async(get_page,args=(url,),callback=parse_page) p.submit(get_page,url).add_done_callback(parse_page) p.shutdown() print(‘主进程pid:‘,os.getpid())
标签:用法 exe 个人 syn down ssi .com 线程池 rand
原文地址:http://www.cnblogs.com/liuxiaowei/p/7493386.html