1,守护线程
import time from threading import Thread def func(): print(‘开始执行子线程‘) time.sleep(3) print(‘子线程执行完毕‘) t = Thread(target=func) t.setDaemon(True) # 进程设置守护进程 是一个属性 daemon = True t.start() t2 = Thread(target=func) t2.start() t2.join() # 等待t2结束
# 守护线程 守护进程 都是等待主进程或者主线程中的代码 执行完毕
# t2 = Thread(target=func)
# t2.start() ---> 代码执行完毕
# 守护线程就结束了
# 主线程还没结束 等待t2继续执行
# t2执行完毕 主线程结束
# t2 = Thread(target=func)
# t2.start()
# t2.join() # 等待t2结束 执行完这句话代码才执行完毕
# t2线程执行完毕
# 主线程中没有代码了,守护线程结束
2,锁
import time from threading import Thread from threading import Lock def func(): global n time.sleep(2) lock.acquire() temp = n # 从进程中获取n time.sleep(0.01) n = temp-1 # 得到结果,再存储回进程 lock.release() n = 100 lock = Lock() t_lst = [] for i in range(100): t = Thread(target=func) t.start() t_lst.append(t) [t.join() for t in t_lst] print(n) # GIL 不是锁数据 而是锁线程 # 在多线程中 特殊情况 仍然要加锁 对数据
3,死锁
import time from threading import RLock from threading import Thread m = kz = RLock() # def eat(name): kz.acquire() # 拿到钥匙 print(‘%s拿到筷子了‘%name) m.acquire() print(‘%s拿到面了‘%name) print(‘%s吃面‘%name) m.release() kz.release() def eat2(name): m.acquire() # 没有钥匙 print(‘%s拿到面了‘ % name) time.sleep(1) kz.acquire() print(‘%s拿到筷子了‘ % name) print(‘%s吃面‘ % name) kz.release() m.release()
在不同的线程中 恰好要对这两个数据进行操作所以就出现了死锁。
4,信号量
1 import time 2 import random 3 from threading import Thread 4 from threading import Semaphore 5 def func(n,sem): 6 sem.acquire() 7 print(‘thread -%s start‘%n) 8 time.sleep(random.random()) 9 print(‘thread -%s done‘ % n) 10 sem.release() 11 sem = Semaphore(5) # 一把锁有5把钥匙 12 for i in range(20): 13 Thread(target=func,args=(i,sem)).start() 14 # 信号量 和 线程池 有什么区别? 15 # 相同点 在信号量acquire之后,和线程池一样 同时在执行的只能有n个 16 # 不同点 17 # 开的线程数不一样 线程池来说 一共就只开5个线程 信号量有几个任务就开几个线程 18 # 对有信号量限制的程序来说 可以同时执行很多线程么? 19 # 实际上 信号量并不影响线程或者进程的并发,只是在加锁的阶段进行流量限制
5,事件
import time import random from threading import Event from threading import Thread def conn_mysql(): # 连接数据库 count = 1 while not e.is_set(): # 当事件的flag为False时才执行循环内的语句 if count>3: raise TimeoutError print(‘尝试连接第%s次‘%count) count += 1 e.wait(0.5) # 一直阻塞变成了只阻塞0.5 print(‘连接成功‘) # 收到check_conn函数内的set指令,让flag变为True跳出while循环,执行本句代码 def check_conn(): ‘‘‘ 检测数据库服务器的连接是否正常 ‘‘‘ time.sleep(random.randint(1,2)) # 模拟连接检测的时间 e.set() # 告诉事件的标志数据库可以连接 e = Event() check = Thread(target=check_conn) check.start() conn = Thread(target=conn_mysql) conn.start()
6,条件
import threading def run(n): con.acquire() con.wait() # 等着 print("run the thread: %s" % n) con.release() if __name__ == ‘__main__‘: con = threading.Condition() # 条件 = 锁 + wait的功能 for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input(‘>>>‘) if inp == ‘q‘: break con.acquire() # condition中的锁 是递归锁 if inp == ‘all‘: con.notify_all() else: con.notify(int(inp)) # 传递信号 notify(1) --> 可以放行一个线程 con.release()
7,计时器
from threading import Timer def hello(): print("hello, world") while True: # 每隔一段时间要开启一个线程 t = Timer(10, hello) # 定时开启一个线程,执行一个任务 # 定时 : 多久之后 单位是s # 要执行的任务 :函数名 t.start()
8,队列
import queue pq = queue.PriorityQueue() # 值越小越优先,值相同就asc码小的先出 pq.put((1,‘z‘)) pq.put((1,‘b‘)) pq.put((15,‘c‘)) pq.put((2,‘d‘)) # print(pq.get()) print(pq.get())
9,concurrent与线程池和回调函数
import time import random from concurrent import futures def funcname(n): print(n) time.sleep(random.randint(1,3)) return n*‘*‘ def call(args): print(args.result()) thread_pool = futures.ThreadPoolExecutor(5)#线程池 thread_pool.map(funcname,range(10)) # map,天生异步,接收可迭代对象的数据,不支持返回值 f_lst = [] for i in range(10): f = thread_pool.submit(funcname,i) # submit 合并了创建线程对象和start的功能 f_lst.append(f) thread_pool.shutdown() # close() join() for f in f_lst: # 一定是按照顺序出结果 print(f.result()) #f.result()阻塞 等f执行完得到结果 # 回调函数 add_done_callback(回调函数的名字) thread_pool.submit(funcname,1).add_done_callback(call)