标签:进程锁 消费者模式 上下文 pre pytho producer tin ack 影响
参考别人的博客:https://www.cnblogs.com/whatisfantasy/p/6440585.html
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
# 方法1 直接调用
import threading
import time
def sayhi(num):
print("running on number:%s"%num)
time.sleep(2)
if __name__ == "__main__":
t1 = threading.Thread(target=sayhi, args=(1,))
t2 = threading.Thread(target=sayhi, args=(2,))
t1.start()
t2.start()
print(t1.getName()) #获取线程名
print(t2.getName())
# 方法2 继承调用
import threading
import time
class MyThread(threading.Thread):
def __init__(self, num):
super(MyThread, self).__init__()
self.num = num
def run(self):
print("running on number:%s" %self.num)
time.sleep(2)
if __name__=="__main__":
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()
join,等待至线程中止。可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
#例子1
import threading
import time
def run(num):
print("thread%s start" % num)
time.sleep(1)
print("thread%s end" % num)
if __name__ == '__main__':
thread_list = []
for i in range(5):
t = threading.Thread(target=run, args=(i,))
t.start()
# t.join()
thread_list.append(t)
for i in thread_list:
i.join()
# 输出结果
thread0 start
thread1 start
thread2 start
thread3 start
thread4 start
thread0 endthread1 end
thread4 endthread3 end
thread2 end
#例子2
import threading
import time
def run(num):
print("thread%s start" % num)
time.sleep(1)
print("thread%s end" % num)
if __name__ == '__main__':
thread_list = []
for i in range(5):
t = threading.Thread(target=run, args=(i,))
t.start()
t.join()
thread_list.append(t)
#输出结果
thread0 start
thread0 end
thread1 start
thread1 end
thread2 start
thread2 end
thread3 start
thread3 end
thread4 start
thread4 end
setDaemon,将一个线程可以被标记成一个 "守护线程"。这个标志的意义是,只有守护线程都终结,整个Python程序才会退出。这里使用setDaemon(True)把所有的子线程都变成了主线程的守护线程,因此当主进程结束后,子线程也会随之结束。所以当主线程结束后,整个程序就退出了。
==注解==
守护线程在程序关闭时会突然关闭。他们的资源(例如已经打开的文档,数据库事务等等)可能没有被正确释放。如果你想你的线程正常停止,设置他们成为非守护模式并且使用合适的信号机制,例如: Event
。
import threading
import time
def sayhi(num):
print("running on number:%s" % num)
print(threading.current_thread().name)
time.sleep(2)
def main():
for i in range(5):
t = threading.Thread(target=sayhi, args=(i,))
t.start()
t.join()
print('starting thread', t.getName())
if __name__ == "__main__":
m = threading.Thread(target=main, args=[])
m.setDaemon(True) #设置为守护进程
m.start()
m.join(timeout=2)
print("---main thread done----")
常用函数
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
run(): 用以表示线程活动的方法。
start():启动线程活动。
join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
isAlive(): 返回线程是否活动的。
getName(): 返回线程名。
setName(): 设置线程名。
setDaemon():设置为守护进程
(Global Interpreter Lock)
无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行
为了实现线程间的数据保护和状态同步
在非python环境中,单核情况下,同时只能有一个任务执行。多核时可以支持多个线程同时执行。但是在python中,无论有多少核,同时只能执行一个线程。究其原因,这就是由于GIL的存在导致的。
GIL的全称是Global Interpreter Lock(全局解释器锁),来源是python设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到GIL,我们可以把GIL看作是“通行证”,并且在一个python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行。GIL只在cpython中才有,因为cpython调用的是c语言的原生线程,所以他不能直接操作cpu,只能利用GIL保证同一时间只能有一个线程拿到数据。而在pypy和jpython中是没有GIL的。
Python多线程的工作过程:
python在使用多线程的时候,调用的是c语言的原生线程。
python针对不同类型的代码执行效率也是不同的:
1、CPU密集型代码(各种循环处理、计算等等),在这种情况下,由于计算工作多,ticks计数很快就会达到阈值,然后触发GIL的释放与再竞争(多个线程来回切换当然是需要消耗资源的),所以python下的多线程对CPU密集型代码并不友好。
2、IO密集型代码(文件处理、网络爬虫等涉及文件读写的操作),多线程能够有效提升效率(单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序执行效率)。所以python的多线程对IO密集型代码比较友好。
使用建议?
python下想要充分利用多核CPU,就用多进程。因为每个进程有各自独立的GIL,互不干扰,这样就可以真正意义上的并行执行,在python中,多进程的执行效率优于多线程(仅仅针对多核CPU而言)。
GIL在python中的版本差异:
1、在python2.x里,GIL的释放逻辑是当前线程遇见
IO操作
或者ticks计数达到100
时进行释放。(ticks可以看作是python自身的一个计数器,专门做用于GIL,每次释放后归零,这个计数可以通过sys.setcheckinterval 来调整)。而每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源。并且由于GIL锁存在,python里一个进程永远只能同时执行一个线程(拿到GIL的线程才能执行),这就是为什么在多核CPU上,python的多线程效率并不高。
2、在python3.x中,GIL不使用ticks计数,改为使用计时器(执行时间达到阈值后,当前线程释放GIL),这样对CPU密集型程序更加友好,但依然没有解决GIL导致的同一时间只能执行一个线程的问题,所以效率依然不尽如人意。
多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。
==未加锁==
import threading
num = 0
def run(n):
global num
for i in range(1000000):
num += n
num -= n
if __name__ == '__main__':
t1 = threading.Thread(target=run, args=(1,))
t2 = threading.Thread(target=run, args=(2,))
t1.start()
t2.start()
t1.join()
t2.join()
print("num:", num)
==加锁版==
import threading
num = 0
def run(n):
global num
lock.acquire()
for i in range(1000000):
num += n
num -= n
lock.release()
if __name__ == '__main__':
lock = threading.Lock()
t1 = threading.Thread(target=run, args=(1,))
t2 = threading.Thread(target=run, args=(2,))
t1.start()
t2.start()
t1.join()
t2.join()
print("num:", num)
使用普通的锁里还要锁的时候,会出现卡死的情况,程序无法进行
import threading
num = 0
money = 0
def earn():
global money
lock.acquire()
money += 1
lock.release()
def run(n):
global num
lock.acquire()
earn()
num += n
lock.release()
if __name__ == '__main__':
lock = threading.RLock() #使用递归锁
t1 = threading.Thread(target=run, args=(1,))
t2 = threading.Thread(target=run, args=(2,))
t1.start()
t2.start()
t1.join()
t2.join()
print("num:", num)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据
import threading
import time
num = 0
def run(n):
global num
semaphore.acquire() #心好累锁
print("thread%s" %n)
time.sleep(n*0.5)
num += 1
semaphore.release()
if __name__ == '__main__':
lock = threading.Lock()
semaphore = threading.BoundedSemaphore(3) #最多可以有三个线程进行操作
t1 = threading.Thread(target=run, args=(1,))
t2 = threading.Thread(target=run, args=(2,))
t3 = threading.Thread(target=run, args=(3,))
t4 = threading.Thread(target=run, args=(4,))
t5 = threading.Thread(target=run, args=(5,))
t6 = threading.Thread(target=run, args=(6,))
t1.start()
t2.start()
t3.start()
t4.start()
t5.start()
t6.start()
print("num:", num)
经过一定时间后,运行该线程
import threading
def hello():
print("hello, world")
t = threading.Timer(5.0, hello)
t.start()
线程同步对象
它作为一个内部的标志,标志被设置,线程进行操作,标志被清空,除非再次被设置,线程才会阻塞。
import threading
import time
def light():
count = 0
if not event.isSet():
event.set()
while True:
time.sleep(1)
if count <= 10:
print("绿灯啦啦啦啦啦")
elif count < 20:
print("红灯啦,不能走啦")
event.clear()
elif count >= 20:
count = 0
if event.isSet():
event.set()
count += 1
def car(num):
while 1:
time.sleep(2)
if event.isSet():
print("car%s moving!!!" % num)
else:
print("car%s stop...." % num)
if __name__ == '__main__':
event = threading.Event()
light = threading.Thread(target=light)
light.start()
for i in range(2):
c = threading.Thread(target=car, args=(i,))
c.start()
Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。
#例子1
import threading
import queue
import time
def producer():
count = 1
while True:
time.sleep(1)
print("生产者生产面包%s" % count)
print("面包数量%s" % q.qsize())
q.put("面包%s" % count)
count += 1
def consumer():
while True:
time.sleep(2)
print("消费者吃%s" % q.get())
if __name__ == '__main__':
q = queue.Queue(2) #设置队列的最大数量是2
pro = threading.Thread(target=producer,)
con = threading.Thread(target=consumer,)
pro.st`art()
con.start()
#例子2
import threading
import queue
import time
def producer():
count = 1
for i in range(10):
time.sleep(1)
print("生产者生产面包%s" % count)
print("面包数量%s" % q.qsize())
q.put("面包%s" % count)
count += 1
q.join() #队列会阻塞除非所有任务完成
def consumer():
while True:
time.sleep(1)
print("消费者吃%s" % q.get())
q.task_done() #告诉队列任务已经完成
if __name__ == '__main__':
q = queue.Queue()
pro = threading.Thread(target=producer,)
con = threading.Thread(target=consumer,)
pro.start()
con.start()
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
线程本地数据是值特定于线程的数据。要管理线程本地数据,只需创建一个本地(或子类)实例,并在其上存储属性
原因:
import threading
local_school = threading.local()
def printName():
std = local_school.student
print(std, "\t hello ", threading.current_thread().name)
def student(name):
local_school.student = name
printName()
print(threading.current_thread().name)
if __name__ == '__main__':
t1 = threading.Thread(target=student, args=("Tom",))
t2 = threading.Thread(target=student, args=("shagua",))
t1.start()
t2.start()
#输出结果
Tom hello Thread-1
Thread-1
shagua hello Thread-2
Thread-2
程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
#例子1 python的父进程是pycharm
import multiprocessing
import os
def run():
print("当前进程的父进程号%s" % os.getppid())
print("当前进程的进程号%s" % os.getpid())
if __name__ == '__main__':
r = multiprocessing.Process(target=run,)
r.start()
r.join()
print("父进程号%s" % os.getppid())
print("进程号%s" % os.getpid())
#输出结果
当前进程的父进程号17784
当前进程的进程号1056
父进程号8148
进程号17784
#例子2 子进程复制一份新的dict,内存不共享
from multiprocessing import Process,Manager
def test(d):
print("before test.d:%s" % d)
d[10] = True
print("after test.d:%s" % d)
if __name__ == '__main__':
d = {}
t = Process(target=test, args=(d,))
t.start()
t.join()
print("主线程的d:%s" % d)
#输出结果
before test.d:{}
after test.d:{10: True}
主线程的d:{}
#例子1
#进程的队列需要传参数
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
管道函数会返回两个连接对象,默认是全双工
import multiprocessing
import time
def f(conn):
conn.send("hello")
conn.close()
if __name__ == '__main__':
parent_pipe, child_pipe = multiprocessing.Pipe()
p = multiprocessing.Process(target=f, args=(child_pipe,))
p.start()
print(parent_pipe.recv())
p.join()
返回一个manager的控制对象,允许其他进程使用代理操作它们。
支持的数据类型list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
#例子1 有manager允许其他进程修改数据
from multiprocessing import Process,Manager
def test(d):
print("before test.d:%s" % d)
d[10] = True
print("after test.d:%s" % d)
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
t = Process(target=test, args=(d,))
t.start()
t.join()
print("主线程的d:%s" % d)
#输出结果
before test.d:{}
after test.d:{10: True}
主线程的d:{10: True}
为什么需要进程同步?
每一个进程需要数据不共享,但是输出结果时候会共享屏幕,输出的结果也会出现混乱
# 不使用进程锁
from multiprocessing import Process, Lock
def f(i):
print("hello world %s" % i)
if __name__ == '__main__':
for i in range(10):
p = Process(target=f, args=(i,))
p.start()
# 使用进程锁
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print("hello world %s" % i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=f, args=(lock, i))
p.start()
一个进程池对象,它控制可以向其提交作业的工作进程池。它支持带timeouts和callback的异步结果,并具有并行映射实现。
进程池调度方法
terminate()
立刻关闭进程池join()
主进程等待所有子进程执行完毕。必须在close或terminate()之后。close()
等待所有进程结束后,才关闭进程池。(必须在join之前,不然会报错)from multiprocessing import Process,Pool
import time
import os
def run(n):
time.sleep(1)
print("hello world %s" % n)
def Bar(arg):
# time.sleep(1)
print("-->>exec done", arg)
if __name__ == '__main__':
pool = Pool(3)
for i in range(5):
pool.apply_async(func=run, args=(i,), callback=Bar(run.__name__)) #异步(并发)
# pool.apply(func=run, args=(i,)) #同步
pool.close()
pool.join()
print("end")
常用函数
Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get().
#example
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
线程和进程的操作是由程序触发系统接口,最后的执行者是系统,它本质上是操作系统提供的功能。而协程的操作则是程序员指定的,在python中通过yield,人为的实现并发处理。
协程的优点:
最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
协程的适用场景:
当程序中存在大量不需要CPU的操作时(IO)。
常用第三方模块gevent和greenlet。(本质上,gevent是对greenlet的高级封装,因此一般用它就行,这是一个相当高效的模块。)
# 例子1,greenlet实现协程来回调度
from greenlet import greenlet
def task1():
print("1")
t2.switch()
print("2")
t2.switch()
def task2():
print("3")
t1.switch()
print("4")
if __name__ == '__main__':
t1 = greenlet(task1)
t2 = greenlet(task2)
t1.switch()
# 例子2,gevent高级封装
from urllib import request
from gevent import monkey
import gevent
monkey.patch_all() #gevent无法识别线程的IO进行调用,必须调用它才可以
def get(url):
print('GET: %s' % url)
resp = request.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))
#通过joinall将任务f和它的参数进行统一调度,实现单线程中的协程。
gevent.joinall([
gevent.spawn(get, 'https://www.python.org/'),
gevent.spawn(get, 'https://www.baidu.com/'),
gevent.spawn(get, 'https://github.com/'),
])
标签:进程锁 消费者模式 上下文 pre pytho producer tin ack 影响
原文地址:https://www.cnblogs.com/akiz/p/11144321.html