码迷,mamicode.com
首页 > 其他好文 > 详细

Day28:Event对象、队列、multiprocessing模块

时间:2017-07-20 23:41:10      阅读:188      评论:0      收藏:0      [点我收藏+]

标签:exchange   容器   closed   子进程   class   cal   dna   pre   sse   

一、Event对象

  线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就 会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行。

event.isSet():   返回event的状态值True或者False;

event.wait():    如果 event.isSet()==False将阻塞线程;

event.set():     设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():   恢复event的状态值为False。

技术分享

可以考虑一种应用场景(仅仅作为说明),例如,我们有多个线程从Redis队列中读取数据来处理,这些线程都要尝试去连接Redis的服务,一般情况下,如果Redis连接不成功,在各个线程的代码中,都会去尝试重新连接。如果我们想要在启动时确保Redis服务正常,才让那些工作线程去连接Redis服务器,那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作:主线程中会去尝试连接Redis服务,如果正常的话,触发事件,各工作线程会尝试连接Redis服务。

import threading,time

event = threading.Event()

def foo():
    while not event.is_set():
        print(wait....)
        event.wait()

    print(Connect to redis server)


print(attempt to start redis server)


for i in range(5):
    t = threading.Thread(target=foo)
    t.start()

time.sleep(10)
event.set()

‘‘‘
运行结果:
attempt to start redis server
wait....
wait....
wait....
wait....
wait....
Connect to redis server
Connect to redis server
Connect to redis server
Connect to redis server
Connect to redis server

‘‘‘
技术分享
import threading,time,logging

logging.basicConfig(level=logging.DEBUG,
                    format=%(threadName)-10s %(message)s)

def worker(event):
    logging.debug(Waiting for redis ready...)
    event.wait()
    logging.debug(redis ready,and connect to redis server and do some work [%s],time.ctime())
    time.sleep(1)

def main():
    readis_ready=threading.Event()
    t1=threading.Thread(target=worker,args=(readis_ready,),name=t1)
    t1.start()

    t2=threading.Thread(target=worker,args=(readis_ready,),name=t2)
    t2.start()

    logging.debug(first of all,check redis server,make sure it is OK,and then trigger the redis ready event)
    time.sleep(3)
    readis_ready.set()

if __name__==__main__:
    main()
View Code

threading.Event的wait方法还接受一个超时参数,默认情况下如果事件一致没有发生,wait方法会一直阻塞下去,而加入这个超时参数之后,如果阻塞时间超过这个参数设定的值之后,wait方法会返回。对应于上面的应用场景,如果Redis服务器一致没有启动,我们希望子线程能够打印一些日志来不断地提醒我们当前没有一个可以连接的Redis服务,我们就可以通过设置这个超时参数来达成这样的目的:

import threading,time

event = threading.Event()

def foo():
    while not event.is_set():
        print(wait....)
        event.wait(2)

    print(Connect to redis server)


print(attempt to start redis server)


for i in range(2):
    t = threading.Thread(target=foo)
    t.start()

time.sleep(5)
event.set()
‘‘‘
运行结果:
attempt to start redis server
wait....
wait....
wait....
wait....
wait....
wait....
Connect to redis server
Connect to redis server
‘‘‘
def worker(event):
    while not event.is_set():
        logging.debug(Waiting for redis ready...)
        event.wait(2)
    logging.debug(redis ready, and connect to redis server and do some work [%s], time.ctime())
    time.sleep(1)

这样,我们就可以在等待Redis服务启动的同时,看到工作线程里正在等待的情况。

二、队列(queue)

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

1、get与put方法

技术分享
‘‘‘
创建一个“队列”对象

import queue
q = queue.Queue(maxsize = 10)
queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数
maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

将一个值放入队列中
q.put(10)
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

将一个值从队列中取出
q.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

‘‘‘
View Code
import  queue
q = queue.Queue(3)

q.put(11)
q.put(hello)
q.put(3.123)

print(q.get())
print(q.get())
print(q.get())
‘‘‘
运行结果:
11
hello
3.123
‘‘‘

2、join与task_done方法

‘‘‘
join() 阻塞进程,直到所有任务完成,需要配合另一个方法task_done。

    def join(self):
     with self.all_tasks_done:
      while self.unfinished_tasks:
       self.all_tasks_done.wait()

task_done() 表示某个任务完成。每一条get语句后需要一条task_done。


import queue
q = queue.Queue(5)
q.put(10)
q.put(20)
print(q.get())
q.task_done()
print(q.get())
q.task_done()

q.join()

print("ending!")
‘‘‘
import  queue,threading
q = queue.Queue(3)
def foo():
    q.put(11)
    q.put(hello)
    q.put(3.123)
    q.join()
def bar():

    print(q.get())
    q.task_done()    #注释掉本行,程序将不会结束。

t1 = threading.Thread(target=foo)
t1.start()

for i in range(3):

    t = threading.Thread(target=bar)
    t.start()
‘‘‘
运行结果:
11
hello
3.123
‘‘‘

3、其他常用方法

‘‘‘

此包中的常用方法(q = queue.Queue()):

q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)非阻塞 
q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作

‘‘‘

4、其他模式

Python queue模块有三种队列及构造函数: 

1、Python queue模块的FIFO队列先进先出。  class queue.Queue(maxsize) 
2、LIFO类似于堆栈,即先进后出。           class queue.LifoQueue(maxsize) 
3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) 


import queue
#先进后出

q=queue.LifoQueue()

q.put(34)
q.put(56)
q.put(12)

print(q.get())
print(q.get())
print(q.get())
‘‘‘
运行结果:
12
56
34
‘‘‘

#优先级
q=queue.PriorityQueue()
q.put([5,100])
q.put([7,200])
q.put([3,"hello"])
q.put([4,{"name":"alex"}])

while 1:
  data=q.get()
  print(data)

‘‘‘
运行结果:
[3, hello]
[4, {name: alex}]
[5, 100]
[7, 200]
‘‘‘

5、生产者消费者模型

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个解耦的过程。

 

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(random.randrange(3))
    q.put(count)
    print(Producer %s has produced %s baozi.. %(name, count))
    count +=1

    print("ok......")
def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(3))
    if not q.empty():
        data = q.get()

        print(\033[32;1mConsumer %s has eat %s baozi...\033[0m %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1

p1 = threading.Thread(target=Producer, args=(A,))
c1 = threading.Thread(target=Consumer, args=(B,))

p1.start()
c1.start()
技术分享
‘‘‘
运行结果:

making........
Producer A has produced 0 baozi..
ok......
making........
Consumer B has eat 0 baozi...
Producer A has produced 1 baozi..
ok......
making........
Producer A has produced 2 baozi..
ok......
making........
Consumer B has eat 1 baozi...
Producer A has produced 3 baozi..
ok......
making........
Consumer B has eat 2 baozi...
Consumer B has eat 3 baozi...
Producer A has produced 4 baozi..
ok......
making........
Producer A has produced 5 baozi..
ok......
making........
Consumer B has eat 4 baozi...
Consumer B has eat 5 baozi...
Producer A has produced 6 baozi..
ok......
making........
Producer A has produced 7 baozi..
ok......
making........
Producer A has produced 8 baozi..
ok......
making........
Consumer B has eat 6 baozi...
Consumer B has eat 7 baozi...
Producer A has produced 9 baozi..
ok......
Consumer B has eat 8 baozi...
Consumer B has eat 9 baozi...
‘‘‘
运行结果

三、multiprocessing模块

Multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

1、Python的进程调用

# Process类调用
from multiprocessing import Process
import time
def f(name):

    print(hello, name,time.ctime())
    time.sleep(1)

if __name__ == __main__:
    p_list=[]
    for i in range(3):
        p = Process(target=f, args=(alvin:%s%i,))
        p_list.append(p)
        p.start()
    for i in p_list:
        p.join()
    print(end)
‘‘‘
运行结果:
hello alvin:0 Wed Jul 19 16:06:40 2017
hello alvin:2 Wed Jul 19 16:06:40 2017
hello alvin:1 Wed Jul 19 16:06:40 2017
end
‘‘‘
#继承Process类调用
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        # self.name = name

    def run(self):

        print (hello, self.name,time.ctime())
        time.sleep(1)

if __name__ == __main__:
    p_list=[]
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)

    for p in p_list:
        p.join()

    print(end)


‘‘‘
运行结果:
hello MyProcess-3 Wed Jul 19 16:09:39 2017
hello MyProcess-1 Wed Jul 19 16:09:39 2017
hello MyProcess-2 Wed Jul 19 16:09:39 2017
end
‘‘‘

2、process类

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

属性:

  daemon:和线程的setDeamon功能一样

  name:进程名字。

  pid:进程号。

from multiprocessing import Process
import os
import time
def info(name):


    print("name:",name)
    print(parent process:, os.getppid())
    print(process id:, os.getpid())
    print("------------------")
    time.sleep(1)


if __name__ == __main__:

    info(main process line)


    p1 = Process(target=info, args=(alvin,))
    p2 = Process(target=info, args=(egon,))
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("ending")
‘‘‘
运行结果:
name: main process line
parent process: 3400
process id: 1712
------------------
name: alvin
parent process: 1712
process id: 8428
------------------
name: egon
parent process: 1712
process id: 8212
------------------
ending

‘‘‘

3、进程间通信

3.1 进程队列Queue

from multiprocessing import Process, Queue

def f(q,n):
    q.put(n*n+1)
    print("son process",id(q))

if __name__ == __main__:
    q = Queue()  #如果使用线程间的队列queue.Queue则无法运行
    print("main process",id(q))

    for i in range(3):
        p = Process(target=f, args=(q,i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())
‘‘‘
运行结果:
main process 41655376
son process 45073408
1
son process 44942336
2
son process 44942392
5

‘‘‘

3.2 管道(pipe)

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).

pipe()函数返回由管道连接的一对连接对象,该管道默认是双向的(双向的)。

For example:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([12, {"name": "yuan"}, hello])
    response = conn.recv()
    print("response", response)
    conn.close()

if __name__ == __main__:

    parent_conn, child_conn = Pipe()    #管道两个对象

    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())  
    parent_conn.send("儿子你好!")
    p.join()
‘‘‘
运行结果:
[12, {name: yuan}, hello]
response 儿子你好!
‘‘‘

Pipe()返回的两个连接对象代表管道的两端。 每个连接对象都有send()和recv()方法(等等)。 请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏

3.3 manager

Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

manager()返回的manager对象控制一个保存Python对象的服务器进程,并允许其他进程使用代理来操作它们。

from multiprocessing import Process, Manager

def f(d, l, n):

    d[n] = n
    d["name"] ="alvin"
    l.append(n)

    #print("l",l)

if __name__ == __main__:

    with Manager() as manager:

        d = manager.dict()         #字典

        l = manager.list(range(5))  #列表

        print(d,\n,l)
        p_list = []

        for i in range(10):
            p = Process(target=f, args=(d,l,i))
            p.start()
            p_list.append(p)

        for res in p_list:
            res.join()

        print(d)
        print(l)

‘‘‘
运行结果:
{}   初始化的字典
[0, 1, 2, 3, 4]  初始化的列表
{3: 3, name: alvin, 0: 0, 2: 2, 7: 7, 5: 5, 4: 4, 1: 1, 6: 6, 8: 8, 9: 9}
[0, 1, 2, 3, 4, 3, 0, 2, 7, 5, 4, 1, 6, 8, 9]

3.4 进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

from multiprocessing import Pool
import time

def foo(args):
 time.sleep(5)
 print(args)

if __name__ == __main__:
     p = Pool(5)
     for i in range(30):
         p.apply_async(func=foo, args= (i,))

     p.close()   # 等子进程执行完毕后关闭进程池
     # time.sleep(2)
     # p.terminate()  # 立刻关闭进程池
     p.join()         # 没有join会立即结束

进程池中有以下几个主要方法:

  1. apply:从进程池里取一个进程并执行
  2. apply_async:apply的异步版本
  3. terminate:立刻关闭线程池
  4. join:主进程等待所有子进程执行完毕,必须在close或terminate之后
  5. close:等待所有进程结束后,才关闭线程池

四、课后作业

1、设计五个线程,2个生产者3消费者:一个生产者每秒钟生产1一个产品放入队列,一个生产者每秒钟生产2个产品放入队列。
每个消费者每秒钟从队列中消费1-5之间的一个随机数个产品。

对于生产者:
队列多于10个时,生产者等待,否则生产者继续生产;
对于消费者:
队列空时,消费者等待,队列有产品时,消费者继续消费。
每个产品有自己独特的标记。

技术分享
import threading,time,queue,random

class Producer(threading.Thread):
    def __init__(self,name,i):
        super().__init__()
        self.name=name
        self.i=i

    def run(self):
        while True:
            time.sleep(self.i)
            if q.qsize()<10:
                a=random.choice([baozi,jianbing,doujiang])+str(random.randint(1,10))
                q.put(a)
                print(%s produce %s  current menu %s%(self.name,a,q.queue))


class Consumer(threading.Thread):
    def __init__(self,name,q):
        super().__init__()
        self.name=name

    def run(self):
        while True:
            time.sleep(1)
            if not q.empty():
                for i in range(random.randint(1,5)):
                    a=q.get()
                    print(%s eat %s%(self.name,a))

if __name__ == __main__:

    q = queue.Queue()

    p=Producer(egon0,1)
    p.start()
    p = Producer(egon1, 0.5)
    p.start()

    for i in range(3):
        c=Consumer(yuan%s%i,q)
        c.start()

参考答案
参考答案

2、设计一个关于红绿灯的线程,5个关于车的线程;

对于车线程,每隔一个随机秒数,判断红绿灯的状态,是红灯或者黄灯,打印waiting;是绿灯打印running。

对于红绿灯线程: 首先默认是绿灯,做一个计数器,十秒前,每隔一秒打印“light green”;第十秒到第十三秒,每隔一秒打印“light yellow”,13秒到20秒, ‘light red’,20秒以后计数器清零。重新循环。

知识点:event对象(提示:event对象即红绿灯,为true是即绿灯,false时为黄灯或者红灯)

技术分享
import threading,random,time

event=threading.Event()
def traffic_lights():
    count=0
    lights=[green light,yellow light,red light]
    current_light=lights[0]
    while True:
        while count<10:
            print(current_light,9-count)
            count+=1
            time.sleep(1)
        else:
            current_light=lights[1]
            event.set()

        while count<13:
            print(current_light,12-count)
            count+=1
            time.sleep(1)
        else:
            current_light=lights[2]

        while count<20:
            print(current_light,19-count)
            count += 1
            time.sleep(1)
            if count == 20:
                count=0
                current_light=lights[0]
                event.clear()
                break


def car(name):
    print(name,starting...)
    while True:
        time.sleep(random.randint(1,4))
        if not event.is_set():
            print(%s is running%name)
        else:
            print(%s is waiting%name)

if __name__ == __main__:
    t=threading.Thread(target=traffic_lights)
    t.start()
    for i in range(5):
        c=threading.Thread(target=car,args=(car%s%(i+1),))
        c.start()

参考答案
参考答案

 

 

Day28:Event对象、队列、multiprocessing模块

标签:exchange   容器   closed   子进程   class   cal   dna   pre   sse   

原文地址:http://www.cnblogs.com/Vee-Wang/p/7214340.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!