码迷,mamicode.com
首页 > 编程语言 > 详细

队列、线程、进程、协程

时间:2016-07-22 22:55:22      阅读:208      评论:0      收藏:0      [点我收藏+]

标签:



一、线程

  1、基本使用

创建线程的两种方式:
import threading
def f1(arg):
   print(arg)

t = threading.Thread(target=f1, args=(123,))
t.start()

#以下为执行结果
123

import threading
class MyThread(threading.Thread):
   def __init__(self, func,args):
      self.func = func
      self.args = args
      super(MyThread, self).__init__()

   def run(self):
      self.func(self.args)

def f2(arg):
   print(arg)

obj = MyThread(f2,123)
obj.start()

#以下为执行结果
123

  2、队列

#put get 存取数据
import queue
q = queue.Queue() #创建队列对象,该对象为先进先出对象
q.put(123)          
q.put(234)
q.put(345)
print(q.get())
print(q.get())
print(q.get())
#输出:
123
234
345

 

#判断 阻塞

import  queue
q=queue.Queue()
print(q.empty())  #队列是否为空
print(q.full())  #队列是否已满
#q=queue.Queue(10)   #队列最大长度10
q.put(11)
q.put(22)
print(q.qsize()) #队列现在有几个元素
# q.put(33,block=False)    #不阻塞,直接抛出异常
# q.put(33,timeout=2)  #存数据阻塞 ,超时时间 2秒后抛出异常
print(q.get())
print(q.get())
print(q.get(timeout=2))           #取数据  阻塞,两秒后抛出异常

#以下为执行结果
True
False
2
11
22

 

# task_done()    join() 
import  queue
q = queue.Queue(5)
q.put(123)
q.put(123)
q.get()
q.task_done() #表示某个任务完成.
q.get()
q.task_done()
q.join()  #如果队列中有没有处理的元素,等待 阻塞,任务执行完成,取消阻塞。



  3、消费者模型

import queue
import threading
import time
q = queue.Queue()

def productor(arg):                                   #put 像队列添加购买请求
    """
    买票
    :param arg:
    :return:
    """
    q.put(str(arg) + - 包子)


for i in range(300):
    t = threading.Thread(target=productor,args=(i,))
    t.start()

def consumer(arg):
    """
    服务器后台
    :param arg:
    :return:
    """
    while True:
        print(arg, q.get())                                         #get 处理请求
        time.sleep(2)

for j in range(3):
    t = threading.Thread(target=consumer,args=(j,))
    t.start()
#以下为执行结果:
0 0- 包子
1 1- 包子
2 2- 包子
1 3- 包子
2 4- 包子
0 5- 包子
2 6- 包子
...

 

  4、线程锁
    1)线程锁 ,每次放行一个
import threading
import time
NUM=10
def func(l):
    global NUM
    #上锁
    l.acquire()
    NUM -= 1
    time.sleep(2)
    print(NUM)
    #开锁
    l.release()
lock = threading.Lock()    #锁每次放一个
#lock= threading.RLock()   #多层锁


for i in range(10):
    t=threading.Thread(target=func,args=(lock,))
    t.start()

#以下为执行结果:
9
8
7
6
5
4
...

    

    2)信号量锁,可以设置每次放行的个数(5个)
import threading
import time
NUM=10
def func(i,l):
    global NUM
    #上锁
    l.acquire()
    NUM -= 1
    time.sleep(2)
    print(NUM,i)
    #开锁
    l.release()

lock=threading.BoundedSemaphore(5)   #每次放多个

for i in range(30):
    t=threading.Thread(target=func,args=(i,lock,))
    t.start()

#以下为执行结果:
5 1
5 0
3 2
2 4
1 3
0 6
...

    3)事件event锁, 放行所有
import threading
def func(i,e):
    print(i)
    e.wait()  #检查是什么灯,如果红灯,停,绿灯,行
    print(i+100)
event= threading.Event()                  #锁住,要放一起放
for i in range(10):
    t=threading.Thread(target=func,args=(i,event,))
    t.start()
event.clear() #设置成红灯
inp=input(>>>)
if inp==1:
    event.set() #设置成绿灯

#以下为运行结果:
0
1
2
3
4
5
6
7
8
9
>>>1
100
101
102
104
105
107
108
109
103
106

    

    4)条件Condition锁  
      (1).wait() 没一次放行,可以自定义个数
import  threading
def func(i,con):
    print (i)
    con.acquire()
    con.wait()
    print (i+100)
    con.release()
c=threading.Condition()
for i in range(10):
    t= threading.Thread(target=func,args=(i,c))
    t.start()
while True:
    inp=input(>>>)
    if inp==q:
        break
    c.acquire()
    c.notify(int(inp))   #锁传几个处理几个
    c.release()

#以下为执行结果
0
1
2
3
4
5
6
7
8
9
>>>1
>>>100
2
>>>101
102

      (2) con.wait_for(condition) 传递参数,条件成立放行
import  threading

def condition():
    ret=False
    r=input(>>>)
    if  r==true:
        ret=True
    return  ret
def func(i,con):
    print (i)
    con.acquire()
    con.wait_for(condition)
    print (i+100)
    con.release()
c=threading.Condition()

for i in range(10):
    t= threading.Thread(target=func,args=(i,c,))
    t.start()


#以下为执行结果

>>>1
2
3
4
5
6
7
8
9


>>>true
101

 

   5、定时器 timer(监控、客户端的时候可能用到)
  
一秒后执行代码
from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

 




  6、自定义线程池
简易的线程池


import  queue
import  threading
import time
class ThreadPool:
    def __init__(self,maxsize=5):
        self.maxsize= maxsize
        self._q=queue.Queue(maxsize)
        for i in range(maxsize):
            self._q.put(threading.Thread)
    def get_thread(self):
        return  self._q.get()

    def add_thread(self):
        self._q.put(threading.Thread)

pool =ThreadPool(5)
def task(arg,p):
    print(arg)
    time.sleep(1)
    p.add_thread()

for i in range(100):
    t=pool.get_thread()
    obj= t(target=task,args=(i,pool,))
    obj.start()

 

   完美的线程池



二、进程

  1.基本用法
    
from multiprocessing import Process

def foo(i):
    print(say hi,i)

if __name__ == "__main__":      #windows下才需要,linux不需要main

    for i in range(10):
        p = Process(target=foo,args=(i,))
        p.start()

#以下为执行结果:
say hi 0
say hi 3
say hi 1
say hi 2
say hi 5
say hi 4
say hi 6
say hi 7
say hi 8
say hi 9

  

  2.进程共享

     (1)默认无法数据共享
from multiprocessing import Process
from multiprocessing import  queues
#import multiprocessing
def foo(i,arg):
     arg.append(i)
     print(say hi,i,arg)

if __name__ == __main__:
    li=[]
    for i in range(10):
        p=Process(target=foo,args=(i,li,))
#        p.daemon= True
        p.start()

#以下为执行结果:
say hi 0 [0]
say hi 1 [1]
say hi 2 [2]
say hi 3 [3]
say hi 6 [6]
say hi 4 [4]
say hi 5 [5]
say hi 9 [9]
say hi 8 [8]
say hi 7 [7]

 

    (2)queues实现共享   数字递增
from multiprocessing import Process
from multiprocessing import  queues
import multiprocessing
def foo(i,arg):
     arg.put(i)
     print(say hi,i,arg.qsize())

if __name__ == __main__:
#    li=[]
    li= queues.Queue(20,ctx=multiprocessing)
    for i in range(10):
        p=Process(target=foo,args=(i,li,))
#        p.daemon= True
        p.start()

#以下为执行结果
say hi 0
say hi 2
say hi 3
say hi 4
say hi 1
say hi 5
say hi 6
say hi 7
say hi 8
say hi 9

 

  (3)Array    共享空间设置限制,超过限制会报错

from multiprocessing import Process
from multiprocessing import  queues
import multiprocessing
from multiprocessing import  Array


def foo(i, arg):
    arg[i]=i+100
    for item in arg:
        print(item)
    print("========")

if __name__ == __main__:
    li=Array(i,10)
    for i in range(10):
        p = Process(target=foo, args=(i, li,))
        #        p.daemon= True
        p.start()


#以下为执行结果:
0
0
102
0
0
0
0
0
0
0
========
0
0
102
103
0
0
0
0
0
0
========
100
0
102
103
0
0
0
0
0
0
========
100
0
102
103
104
0
0
0
0
0
========
100
0
102
103
104
0
0
0
108
0
========
100
0
102
103
104
105
0
0
108
0
========
100
0
102
103
104
105
106
0
108
0
========
100
101
102
103
104
105
106
0
108
0
========
100
101
102
103
104
105
106
107
108
0
========
100
101
102
103
104
105
106
107
108
109
========      
    
  (3) Manger 实现数据共享
from multiprocessing import Process
from multiprocessing import  queues
import multiprocessing
from multiprocessing import  Manager


def foo(i, arg):
    arg[i]=i+100
    print(arg.values())

if __name__ == __main__:
    obj=Manager()
    li=obj.dict()
    for i in range(10):
        p = Process(target=foo, args=(i, li,))
        #        p.daemon= True
        p.start()
 #       p.join()
    import  time
    time.sleep(2)
# p.join()

#以下为执行结果

[100]
[100, 101]
[100, 101, 104]
[100, 101, 102, 104]
[100, 101, 102, 104, 105]
[100, 101, 102, 103, 104, 105]
[100, 101, 102, 103, 104, 105, 107]
[100, 101, 102, 103, 104, 105, 106, 107]
[100, 101, 102, 103, 104, 105, 106, 107, 108]
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]

 

    3、进程池  
from multiprocessing import Pool
import  time
def f1(arg):
    time.sleep(1)
    print(arg)
if __name__ == __main__:

    pool= Pool(5)
    for i in range(30):
#        pool.apply(func=f1,args=(i,))   #串行
        pool.apply_async(func=f1,args=(i,))
#    pool.close() #所有任务执行完毕
    time.sleep(2)
    pool.terminate()  #立即终止
    pool.join()
    print(end)

#以下为执行结果

0
1
2
3
4
end

 

  

  3、锁,与线程锁使用方法一样
from multiprocessing import Process
from multiprocessing import queues
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import multiprocessing
import time

def foo(i,lis,lc):
    lc.acquire()
    lis[0] = lis[0] - 1
    time.sleep(1)
    print(say hi,lis[0])
    lc.release()

if __name__ == "__main__":
    # li = []
    li = Array(i, 1)
    li[0] = 10
    lock = RLock()
    for i in range(10):
        p = Process(target=foo,args=(i,li,lock))
        p.start()
#以下为执行结果
say hi 9
say hi 8
say hi 7
say hi 6
say hi 5
say hi 4
say hi 3
say hi 2
say hi 1
say hi 0

 


三、携程

from greenlet import greenlet
import time

def test1():

    print(12)
    gr2.switch()
    time.sleep(1)
    print(34)
    gr2.switch()


def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

#以下为执行结果:
12
56
34
78

 

from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    print(GET: %s % url)
    resp = requests.get(url)
    data = resp.text
    print(%d bytes received from %s. % (len(data), url))

gevent.joinall([
        gevent.spawn(f, https://www.python.org/),
        gevent.spawn(f, https://www.yahoo.com/),
        gevent.spawn(f, https://github.com/),
])

#以下为执行结果:
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
47394 bytes received from https://www.python.org/.
25533 bytes received from https://github.com/.
425622 bytes received from https://www.yahoo.com/.

 

队列、线程、进程、协程

标签:

原文地址:http://www.cnblogs.com/wudalang/p/5681401.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!