标签:判断 odi alt 生产者 async 定时器 imp index 关闭
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。
一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
(1)创建线程的两种方式
直接调用(常用)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time def f1(arg): # 定义每个线程要执行的函数 time.sleep( 0.1 ) print (arg,threading.current_thread()) # threading.current_thread()详细的线程信息 for i in range ( 10 ): # 创建10个线程并发执行函数 t = threading.Thread(target = f1,args = ( ‘python‘ ,)) # args是函数的参数,元组最后一个必须要逗号, t.start() # 启动线程 print (t.getName()) # 可以获取主线程的名字 |
继承调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time class MyThread(threading.Thread): # 继承threading.Thread类 def __init__( self ,func,args): self .func = func self .args = args super (MyThread, self ).__init__() # 执行父类的构造方法 def run( self ): # run()方法,是cpu调度线程会使用的方法,必须是run()方法 self .func( self .args) def f2(arg): time.sleep( 0.1 ) print (arg,threading.current_thread()) for i in range ( 10 ): # 创建10个线程 obj = MyThread(f2, 123 ) obj.start() |
(2)更多方法
自己还可以为线程自定义名字,通过 t = threading.Thread(target=f1, args=(i,), name=‘mythread{}‘.format(i)) 中的name参数,除此之外,Thread还有一下一些方法
t.join(n) 表示主线程等待子线程多少时间,n表示主线程等待子线程的超时时间,如果在n时间内子线程未完成,主线程不在等待,执行后面的代码 t.run() 线程被cpu调度后自动执行线程对象的run方法(一般我们无需设置,除非自己定义类调用) t.start() 线程准备就绪,等待CPU调度 t.getName() 获取线程的名称 t.setName() 设置线程的名称 t.name 获取或设置线程的名称 t.is_alive() 判断线程是否为激活状态 t.isAlive() 判断线程是否为激活状态 t.isDaemon() 判断是否为守护线程 t.setDaemon 设置True或False(默认) True表示主线程不等待子线程全部完成就执行后面的代码 False默认值,标识主线程等待子线程全部执行完后继续执行后面的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time def f1(arg): time.sleep( 5 ) print (arg) t = threading.Thread(target = f1,args = ( ‘python‘ ,)) t.setDaemon( True ) # 默认是False,设置为true表示主线程不等子线程 t.start() t.join( 2 ) # 表示主线程到此,等待子线程执行完毕,2表示主线程最多等待2秒 print ( ‘end‘ ) # 默认主线程在等待子线程结束 print ( ‘end‘ ) print ( ‘end‘ ) |
3、线程锁(Lock、RLock)
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。这里使用Rlock,而不使用Lock,因为Lock如果多次获取锁的时候会出错,而RLock允许在同一线程中被多次acquire,但是需要用n次的release才能真正释放所占用的琐,一个线程获取了锁在释放之前,其他线程只有等待。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time NUM = 10 def func(l): global NUM # 上锁 l.acquire() NUM - = 1 time.sleep( 0.1 ) print (NUM,threading.current_thread()) # 开锁 l.release() # lock = threading.Lock() lock = threading.RLock() # 递归锁 for j in range ( 10 ): t = threading.Thread(target = func,args = (lock,)) t.start() |
互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time NUM = 30 def func(i,l): global NUM # 上锁 l.acquire() NUM - = 1 time.sleep( 1 ) print (NUM,i,threading.current_thread()) # 开锁 l.release() lock = threading.BoundedSemaphore( 5 ) # 设置信号量5,表示同时5个线程同时执行 for i in range ( 30 ): t = threading.Thread(target = func,args = (i,lock,)) t.start() |
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个"Flag",如果"Flag"值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果"Flag"值为True,那么event.wait 方法时便不再阻塞。
下面是一个红绿灯的例子,主线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading def func(i,e): print (i) e.wait() # 检测是什么灯,如果是True红灯,停;绿灯False行,默认是红灯 print (i + 100 ) event = threading.Event() for i in range ( 10 ): t = threading.Thread(target = func,args = (i,event,)) t.start() event.clear() # 主动设置成红灯,默认是红灯,此句可以不写 inp = input ( ‘>>>‘ ) if inp = = ‘1‘ : event. set () # 设置成绿灯,就会执行func()函数中print(i+100)语句 |
使得线程等待,只有满足某条件时,才释放n个线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading def func(i,con): print (i) con.acquire() # 固定写法acquire,wait con.wait() print (i + 100 ) con.release() c = threading.Condition() # 设置条件 for i in range ( 10 ): t = threading.Thread(target = func,args = (i,c,)) t.start() while True : inp = input ( ‘>>>‘ ) if inp = = ‘q‘ : break c.acquire() # 这里是固定写法,acquire,notify,release c.notify( int (inp)) c.release() |
第二种
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading def condition(): ret = False r = input ( ‘>>>‘ ) if r = = ‘true‘ : ret = True else : ret = False return ret def func(i,con): print (i) con.acquire() con.wait_for(condition) # 和上一个例子的差别在这里 print (i + 100 ) con.release() c = threading.Condition() for i in range ( 10 ): t = threading.Thread(target = func,args = (i,c,)) t.start() |
定时器,指定n秒后执行某操作
1
2
3
4
5
6
7
8
9
10
|
#!/usr/bin/env python # -*- coding:utf-8 -*- from threading import Timer def hello(): print ( "hello python" ) t = Timer( 1 ,hello) t.start() |
线程的上一级就是进程,进程可包含很多线程,进程和线程的区别是进程间的数据不共享,多进程也可以用来处理多任务,不过多进程很消耗资源,计算型的任务最好交给多进程来处理,IO密集型最好交给多线程来处理,此外进程的数量应该和cpu的核心数保持一致。
1、线程共享创建它的进程的地址空间,进程有自己的地址空间。 2、线程是直接可以访问线程之间的数据;进程需要复制父进程的数据才能访问。 3、线程可以直接与其他线程的通信过程,进程必须使用进程间通信和同胞交流过程。 4、新创建一个线程很容易;新创建一个进程需要复制父进程。 5、主线程可以控制相当大的线程在同一进程中;进程只能控制子进程。 6、主线程变更(取消、优先级变化等)可能会影响进程的其他线程的行为;父进程的变化不会影响子进程。
1
2
3
4
5
6
7
8
9
10
11
12
|
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process def foo(i): print ( ‘say hi‘ ,i) for i in range ( 10 ): p = Process(target = foo,args = (i,)) #p.daemon = True # 和线程t.setdaemon是一样的 p.start() #p.join() |
注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。其他使用方法和线程threading.Thread是一样的
进程各自持有一份数据,默认无法共享数据;queues,Array,Manager.dict,pipe这些方法都能实现数据共享
(1)特殊队列queues()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process from multiprocessing import queues import multiprocessing def foo(i,arg): arg.put(i) print ( ‘say hi‘ ,i,arg.qsize()) li = queues.Queue( 20 ,ctx = multiprocessing) for i in range ( 10 ): p = Process(target = foo,args = (i,li,)) p.start() |
(2)数组Array()
数组和列表很像,但是数组中的元素在内存中的地址是一段连续的空间地址,而列表中的元素则不是一段连续的的地址,是通过链表的形式找到下一个元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process from multiprocessing import Array def foo(i,arg): arg[i] = i + 100 for item in arg: print (item) li = Array( ‘i‘ , 10 ) # 指定数组时需要指定类型 for i in range ( 10 ): p = Process(target = foo,args = (i,li,)) p.start() |
(3)Manager.dict()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process from multiprocessing import Manager def foo(i,arg): arg[i] = i + 100 print (arg.values()) obj = Manager() li = obj. dict () for i in range ( 10 ): p = Process(target = foo,args = (i,li,)) p.start() p.join() |
(4)pipe()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process, Pipe def f(conn): conn.send([ 42 , None , ‘hello‘ ]) conn.close() parent_conn, child_conn = Pipe() p = Process(target = f, args = (child_conn,)) p.start() print (parent_conn.recv()) # 父进程可以收到子进程的共享信息prints "[42, None, ‘hello‘]" p.join() |
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Pool import time def f1(arg): time.sleep( 1 ) print (arg) pool = Pool( 5 ) for i in range ( 30 ): # 定义30个任务 #pool.apply(func=f1,args=(i,)) # 所有进程串行执行没有多大意义 pool.apply_async(func = f1,args = (i,)) # 异步并行执行 pool.close() #等待所有的任务执行完毕 #time.sleep(1) #pool.terminate() # 立即终止子进程的任务,主进程继续执行 pool.join() # 执行pool.join时必须先执行pool.close或者pool.terminate # 进程池中进程执行完毕后在关闭,如果注释,那么程序直接关闭close,terminate也无效 print ( ‘end‘ ) |
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;由greenlet,gevent实现,gevent是调用greenlet进行封装;需要安装pip install greenlet;pip install gevent;
greenlet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
#!/usr/bin/env python # -*- coding:utf-8 -*- from greenlet import greenlet def test1(): print 12 gr2.switch() print 34 gr2.switch() def test2(): print 56 gr1.switch() print 78 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import gevent def foo(): print ( ‘Running in foo‘ ) gevent.sleep( 0 ) print ( ‘Explicit context switch to foo again‘ ) def bar(): print ( ‘Explicit context to bar‘ ) gevent.sleep( 0 ) print ( ‘Implicit context switch back to bar‘ ) gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ]) |
遇到IO操作自动切换:此操作在python2.x中执行的,urllib2不支持python3.x
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from gevent import monkey; monkey.patch_all() import gevent import urllib2 def f(url): print ( ‘GET: %s‘ % url) resp = urllib2.urlopen(url) data = resp.read() print ( ‘%d bytes received from %s.‘ % ( len (data), url)) gevent.joinall([ gevent.spawn(f, ‘https://www.python.org/‘ ), gevent.spawn(f, ‘https://www.yahoo.com/‘ ), gevent.spawn(f, ‘https://github.com/‘ ), ]) |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue q = queue.Queue( 5 ) # 默认maxsize=0无限接收,最大支持的个数 print (q.empty()) # 查看队列是否为空 q.put( 11 ) # put防数据,是否阻塞默认是阻塞block=True,timeout超时时间 q.put( 22 ) q.put( 33 ,block = False ,timeout = 2 ) print (q.full()) # 查看队列是否已经放满 print (q.qsize()) # 队列中多少元素 print (q.maxsize) # 队列最大支持的个数 print (q.get(block = False ,timeout = 2 )) # get取数据,是否阻塞默认是阻塞block=True,timeout超时时间 print ( "*" * 10 ) print (q.get()) q.task_done() # join配合task_done,队列中有任务就会阻塞进程,当队列中的任务执行完毕之后,不在阻塞 print (q.get()) q.task_done() q.join() # 队列中还有元素的话,程序就不会结束程序,只有元素被取完配合task_done执行,程序才会结束 |
1
2
3
4
5
6
7
8
9
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue q = queue.LifoQueue() q.put( 123 ) q.put( 456 ) print (q.get()) |
1
2
3
4
5
6
7
8
9
10
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue q = queue.PriorityQueue() q.put(( 1 , ‘python1‘ )) q.put(( 5 , ‘python‘ )) q.put(( 3 , ‘python3‘ )) print (q.get()) |
1
2
3
4
5
6
7
8
9
10
11
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue q = queue.deque() q.append( 123 ) q.append( 333 ) q.appendleft( 456 ) print (q.pop()) print (q.popleft()) |
更多请查看官方文档
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue import threading import time q = queue.Queue() def productor(arg): while True : q.put( str (arg)) print ( ‘%s 号窗口有票‘ % str (arg)) time.sleep( 1 ) for i in range ( 3 ): t = threading.Thread(target = productor,args = (i,)) t.start() def consumer(arg): while True : print ( ‘第 %s 人取 %s 号窗口票‘ % ( str (arg),q.get())) time.sleep( 1 ) for j in range ( 300 ): t = threading.Thread(target = consumer,args = (j,)) t.start() |
标签:判断 odi alt 生产者 async 定时器 imp index 关闭
原文地址:https://www.cnblogs.com/stssts/p/10118090.html