标签:原型 jpg rtti str art eve ceshi queue ==
死锁:只上锁不解锁容易造成死锁现象
互斥锁:加一把锁就对应解一把锁,形成互斥锁
递归锁:用于解决死锁,只是一种应急的处理方法
from threading import RLock
从语法上讲,锁可以互相嵌套,但不要使用
不要因为逻辑问题让上锁分成两次,导致死锁
from queue import Queue
put 存
get 取
put_nowait 存,超出了队列长度,报错
get_nowait 取,没数据取不出来,报错
linux windows 线程中put_nowait,get_nowait都支持
(1) Queue
先进先出,后进后出
(2)LifoQueue
先进后出,后进先出(按照栈的特点设计)
from queue import LifoQueue
lq = LifoQueue(3)
lq.put(11)
lq.put(22)
lq.put(33)
# print(lq.put_nowait(444))
print(lq.get())
print(lq.get())
print(lq.get())
(3)PriorityQueue
按照优先级顺序排序(默认从小到大排序)
from queue import PriorityQueue
# 1.如果都是数字,默认从小到大排序
pq = PriorityQueue()
pq.put(13)
pq.put(3)
pq.put(20)
print(pq.get())
print(pq.get())
print(pq.get())
# 2.如果都是字符串
"""如果是字符串,按照ascii编码排序"""
pq1 = PriorityQueue()
pq1.put("chinese")
pq1.put("america")
pq1.put("latinos")
pq1.put("blackman")
print(pq1.get())
print(pq1.get())
print(pq1.get())
print(pq1.get())
# 3.要么全是数字,要么全是字符串,不能混合 error
"""
pq2 = PriorityQueue()
pq2.put(13)
pq2.put("aaa")
pq2.put("拟稿")
"""
pq3 = PriorityQueue()
# 4.默认按照元组中的第一个元素排序
pq3.put( (20,"wangwen") )
pq3.put( (18,"wangzhen") )
pq3.put( (30,"weiyilin") )
pq3.put( (40,"xiechen") )
print(pq3.get())
print(pq3.get())
print(pq3.get())
print(pq3.get())
# 线程池
# 实例化线程池 ThreadPoolExcutor (推荐5*cpu_count)
# 异步提交任务 submit / map
# 阻塞直到任务完成 shutdown
# 获取子线程的返回值 result
# 使用回调函数 add_done_callback
# 回调函数
就是一个参数,将这个函数作为参数传到另一个函数里面.
函数先执行,再执行当参数传递的这个函数,这个参数函数是回调函数
# 线程池 是由子线程实现的
# 进程池 是由主进程实现的
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time
def func(i):
print("任务执行中... start" , os.getpid())
time.sleep(10)
print("任务结束... end" , i)
return i
(1) ProcessPoolExecutor 进程池基本使用
"""默认如果一个进程短时间内可以完成更多的任务,就不会创建额外的新的进程,以节省资源"""
if __name__ == "__main__":
lst = []
# print(os.cpu_count()) # 8 cpu逻辑核心数
# (1) 创建进程池对象
"""进程池里面最多创建os.cpu_count()这么多个进程,所有任务全由这几个进程完成,不会额外创建进程"""
p = ProcessPoolExecutor()
# (2) 异步提交任务
for i in range(10):
res = p.submit(func,i)
lst.append(res)
# (3) 获取当前进程池返回值
# for i in lst:
# print(i.result())
# (4) 等待所有子进程执行结束
p.shutdown() # join
print("主程序执行结束....")
(2) ThreadPoolExecutor 线程池的基本用法
"""默认如果一个线程短时间内可以完成更多的任务,就不会创建额外的新的线程,以节省资源"""
from threading import current_thread as cthread
def func(i):
print("thread ... start" , cthread().ident,i)
time.sleep(3)
print("thread ... end" , i )
return cthread().ident
if __name__ == "__main__":
lst = []
setvar = set()
# (1) 创建线程池对象
"""限制线程池最多创建os.cpu_count() * 5 = 线程数,所有任务全由这几个线程完成,不会额外创建线程"""
tp = ThreadPoolExecutor()# 我的电脑40个线程并发
# (2) 异步提交任务
for i in range(100):
res = tp.submit(func,i)
lst.append(res)
# (3) 获取返回值
for i in lst:
setvar.add(i.result())
# (4) 等待所有子线程执行结束
tp.shutdown()
print(len(setvar) , setvar)
print("主线程执行结束 ... ")
(3)线程池 map
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread as cthread
from collections import Iterator
def func(i):
# print("thread start ... ",cthread().ident)
# print("thread end ... ",i)
time.sleep(0.5)
return "*" * i
if __name__ == "__main__":
setvar = set()
lst = []
tp = ThreadPoolExecutor(5)
# map(自定义函数,可迭代性数据) 可迭代性数据(容器类型数据,迭代器,range对象)
it = tp.map(func,range(20))
# 判定返回值是否是迭代器
print(isinstance(it,Iterator))
tp.shutdown()
for i in it:
print(i)
把函数当成参数传递给另外一个函数
在当前函数执行完毕之后,最后调用一下该参数(函数),这个函数就是回调函数
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread as cthread
import os,time
def func1(i):
print("Process start ... ",os.getpid())
time.sleep(0.5)
print("Process end ... ",i)
return "*" * i
def func2(i):
print("thread start ... ",cthread().ident)
time.sleep(0.5)
print("thread end ... ",i)
return "*" * i
def call_back1(obj):
print("<==回调函数callback进程号:===>",os.getpid())
print(obj.result())
def call_back2(obj):
print("<==回调函数callback线程号:===>",cthread().ident)
print(obj.result())
(1) 进程池的回调函数 : 由主进程执行调用完成
if __name__ == "__main__":
p = ProcessPoolExecutor(5)
for i in range(1,11):
res = p.submit(func1,i)
# 进程对象.add_done_callback(回调函数)
‘‘‘
add_done_callback 可以把res本对象和回调函数自动传递到函数里来
‘‘‘
res.add_done_callback(call_back1)
p.shutdown()
print("主进程执行结束 ... " , os.getpid())
(2) 线程池的回调函数: 由当前子线程执行调用完成
if __name__ == "__main__":
tp = ThreadPoolExecutor(5)
for i in range(1,11):
res = tp.submit(func2,i)
# 进程对象.add_done_callback(回调函数)
‘‘‘
add_done_callback 可以把res本对象和回调函数自动传递到函数里来
‘‘‘
res.add_done_callback(call_back2)
tp.shutdown()
print("主线程执行结束 ... " , cthread().ident)
(3) 回调函数原型
# add_done_callback 原型
class Ceshi():
def add_done_callback(self,func):
print("执行操作1 ... ")
print("执行操作2 ... ")
func(self)
def result(self):
return 123456
def call_back3(obj):
print(obj)
print(obj.result())
obj = Ceshi()
obj.add_done_callback(call_back3)
先安装 gevent模块
#协程也叫纤程: 协程是线程的一种实现.
指的是一条线程能够在多任务之间来回切换的一种实现.
对于CPU、操作系统来说,协程并不存在.
任务之间的切换会花费时间.
目前电脑配置一般线程开到200会阻塞卡顿.
#协程的实现
协程帮助你记住哪个任务执行到哪个位置上了,并且实现安全的切换
一个任务一旦阻塞卡顿,立刻切换到另一个任务继续执行,保证线程总是忙碌的,更加充分的利用CPU,抢占更多的时间片
# 一个线程可以由多个协程来实现,协程之间不会产生数据安全问题
#协程模块
# greenlet gevent的底层,协程,切换的模块
# gevent 直接用的,gevent能提供更全面的功能
(1) 协程的具体实现
switch 遇到阻塞时,只能手动调用该函数进行函数切换,不能自动实现切换,来规避io阻塞;
from greenlet import greenlet
import time
def eat():
print("eat 1")
g2.switch()
time.sleep(3)
print("eat 2")
def play():
print("play one")
time.sleep(3)
print("play two")
g1.switch()
g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch()
(3) gevent
gevent 可以自动切换,但是不能够自动识别time.sleep这样的阻塞
import gevent
def eat():
print("eat 1")
time.sleep(3)
print("eat 2")
def play():
print("play one")
time.sleep(3)
print("play two")
# 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)
# 阻塞,必须g1协程执行完毕为止
g1.join()
# 阻塞,必须gg协程执行完毕为止
g2.join()
print("主线程执行完毕 ... ")
(4) gevent,time 添加阻塞,让他实现自动切换
def eat():
print("eat 1")
gevent.sleep(3)
print("eat 2")
def play():
print("play one")
gevent.sleep(3)
print("play two")
# 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)
# 阻塞,必须g1协程执行完毕为止
g1.join()
# 阻塞,必须gg协程执行完毕为止
g2.join()
print("主线程执行完毕 ... ")
(5) 终极大招 彻底解决不识别阻塞的问题
from gevent import monkey
monkey.patch_all() # 把下面所有引入的模块中的阻塞识别一下
import time
import gevent
def eat():
print("eat 1")
time.sleep(3)
print("eat 2")
def play():
print("play one")
time.sleep(3)
print("play two")
# 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)
# 阻塞,必须g1协程执行完毕为止
g1.join()
# 阻塞,必须gg协程执行完毕为止
g2.join()
print("主线程执行完毕 ... ")
(1) spawn(函数,参数1,参数2,参数3 .... ) 启动协程
(2) join 阻塞,直到某个协程任务执行完毕之后,再放行
(3) joinall 等待所有协程任务都执行完毕之后,在放行
g1.join() g2.join() <=> gevent.joinall( [g1,g2] )(推荐:比较简洁)
(4) value 获取协程任务中的返回值 g1.value g2.value 获取对应协程中的返回值
(1) 利用协程爬取数据
requests 抓取页面数据模块
HTTP 状态码
200 ok
404 not found
400 bad request
(2) 基本语法
from gevent import monkey ; monkey.patch_all()
import time
import gevent
import requests
"""
response = requests.get("http://www.baidu.com")
print(response)
# 获取状态码
print( response.status_code )
# 获取网页中的字符编码
res = response.apparent_encoding
print( res )
# 设置编码集,防止乱码
response.encoding = res
# 获取网页当中的内容
res = response.text
print(res)
def get_url(url):
response = requests.get(url)
if response.status_code == 200:
# print(response.text)
time.sleep(0.1)
(3) 正常爬取
starttime = time.time()
for i in url_list:
get_url(i)
endtime = time.time()
print("执行时间:" ,endtime - starttime )
import re
strvar = ‘<img lz_src="http://i5.7k7kimg.cn/cms/cms10/20200609/113159_2868.jpg"‘
obj = re.search(r‘<img lz_src="(.*?)"‘,strvar)
print(obj.groups()[0])
(4) 用协程的方式爬取数据
lst = []
starttime = time.time()
for i in url_list:
g = gevent.spawn(get_url,i)
lst.append(g)
gevent.joinall(lst)
endtime = time.time()
print("执行时间:" ,endtime - starttime ) # 执行时间: 2.3307271003723145
利用多进程,多线程,多携程可以让服务器运行速度更快
并且也可以抗住更多用户的访问
标签:原型 jpg rtti str art eve ceshi queue ==
原文地址:https://www.cnblogs.com/yunchao-520/p/13121928.html