码迷,mamicode.com
首页 > 编程语言 > 详细

Python_进程、线程及协程

时间:2016-07-23 12:10:17      阅读:234      评论:0      收藏:0      [点我收藏+]

标签:

一、Python进程

  IO密集型----多线程

  计算密集型----多进程

  1、单进程  

from multiprocessing import Process
def foo(i):
    print(你好哈,i)
if __name__ == __main__:   #if __name__ == ‘__main__‘:只可做测试调用,不能用于生产,windows不支持,linux中可不用添加if __name__ == ‘__main__‘
    for i in range(10):
        t = Process(target=foo,args=(i,))  #创建进程
        t.start()  #执行进程

  注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。

  进程数据共享

  由于进程在内存中具有独立的地址空间,且每个地址空间各持有一份数据,默认情况下无法共享数据  

#方法一:进程共享数据:Array
from multiprocessing import Process
from multiprocessing import Array
def foo(i,arg):
    arg[i] = i +100 #每一个进程加100
    for item in arg:  #
        print(item)  #输出迭代数据
    print("----------")
if __name__ == "__main__":
    li = Array(i,10) #创建数组,i 等于数据格式为整型,并设置10个元素给数据
    for i in range(10):  #设置10个进程
        p = Process(target=foo,args=(i,li,))#创建进程
        p.start() #执行进程
#方法二:常用进程共享
from multiprocessing import  Process
from multiprocessing import Manager
def foo(i,arg):
    arg[i] = i +100
    print(arg.values())
if __name__ == "__main__":
    obj = Manager()  #创建对象
    li = obj.dict()  #创建字典
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        # time.sleep(0.01)  #方式一:
        p.join()   #方式二:join进程之间串联进行
技术分享
    c: ctypes.c_char,  u: ctypes.c_wchar,
    b: ctypes.c_byte,  B: ctypes.c_ubyte,
    h: ctypes.c_short, H: ctypes.c_ushort,
    i: ctypes.c_int,   I: ctypes.c_uint,
    l: ctypes.c_long,  L: ctypes.c_ulong,
    f: ctypes.c_float, d: ctypes.c_double
类型对应表
#创建进程池
from
multiprocessing import Process,Queue def f(i,q): print(i,q.get()) #回去队列中元素 if __name__ == "__main__": q=Queue() #创建队列 q.put(123) #将数据put进队列 q.put(456) q.put(789) for i in range(10): #共设置10个进程池 p = Process(target=f,args=(i,q,)) #创建进程 p.start() #执行进程

   进程池

  进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

  进程池常用方法:  

  • apply  :串行执行每一个进程,即只有当一个进程执行完之后,第二个进程才会开始执行
  • apply_async  :并行执行所有进程,同时执行所有进程
from  multiprocessing import Pool
import time
def f1(arg):
    time.sleep(1)
    print(arg)
if __name__ == "__main__":
    pool = Pool(5)  #创建进程池
    for i in range(10):   #创建10个进程
        pool.apply(func=f1,args=(i,)) #apply:让进程去进程池中获取数据,只有等待一个执行完一个元素后,第二个进程接着执行
        # pool.apply_async(func=f1,args=(i,))
    print("end")  #当0个进程执行完之后,输出end
  • apply_async  :并行执行所有进程,同时执行所有进程
  • terminate()#程序执行此时,立即终止所有进程,不管进程池中是否含有任务
from  multiprocessing import Pool
import time
def f1(arg):
    time.sleep(1)
    print(arg)
if __name__ == "__main__":
    pool = Pool(5)  #创建进程池,每一次可同时执行5个进程
    for i in range(10):   #创建10个进程
        #pool.apply(func=f1,args=(i,)) #apply:让进程去进程池中获取数据,只有等待一个执行完一个元素后,第二个进程接着执行
        pool.apply_async(func=f1,args=(i,)) #apply_async:方法:并行运行所有进程
    # print("end")  #当0个进程执行完之后,输出end
    pool.close()#所有任务(即)执行完毕
    time.sleep(2)
    pool.terminate()#当程序运行此时,立即终止所有进程,不管进程池中任务是否执行完
    # pool.join()

  进程中常用方法由:Lock,Rloack,Event,Condition,Smaphore等于线程中方法一样

  1、进程锁:  

from multiprocessing import RLock,Lock,Event,Condition,Semaphore
from multiprocessing import Process
from multiprocessing import queues
from multiprocessing import Array
import multiprocessing
import time

def foo(i,list,lc):
    lc.acquire() #给进程上锁
    list[0]=list[0]-1   #每一次进程调用子进程时减一
    time.sleep(1)
    print("say hi",list[0])  #输出每一次进程调用
    lc.release()  #给进程解锁
if __name__ == "__main__":
    li = Array(i,1)  #python引用数组,i:表示整型数据,1:带包进程数组中含有一个元素
 li[0]=10 #设置数组长度为10 lock = RLock() #创建进程锁 for i in range(10): #创建10个进程 p = Process(target=foo,args=(i,li,lock)) #创建进程 p.start() #执行进程

  2、多进程

二、Python线程

  定义:线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必  不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.

  threading模块:提供线程相关操作,线程是应用程序中工作的最小单位

  1、单线程

#!/usr/bin/env python
# -*- coding: utf-8 -*- 
# lcj
import  threading
import time
#创建一个class类
def show(arg):  #arg:传参
    time.sleep(2)
    print(‘thread‘+str(arg))  #将传参转换至字符串形式
for i in range(10):  #创建10个线程
    t = threading.Thread(target=show,args=(i,))  #创建线程
    t.start()  #线程就绪,等到CPU调度,
# print(‘main thread stop‘)

  上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令

 更多方法:
 •start            线程准备就绪,等待CPU调度
 •setName      为线程设置名称
 •getName      获取线程名称
 •setDaemon   设置为后台线程或前台线程(默认)
                   如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
                    如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
 •join              逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
 •run              线程被cpu调度后自动执行线程对象的run方法  
技术分享
#自定义线程
import  threading
import time
class MyThread(threading.Thread):  #继承父类threading.Thread
    def __init__(self, num): #MyThread类__ini__方法
        threading.Thread.__init__(self)
        self.num = num
    def run(self):  #定义每个线程执行的函数,此时:self == 调用对象(t1或者t2对象)
        print(running on number:%s % self.num)
        time.sleep(3)  #每一个线程睡眠3秒
if __name__ == "__main__":
    t1 = MyThread(1)  #创建t1对象,并将参数1复制给t1对象
    t2 = MyThread(2)  #创建t2对象,并将参数1复制给t2对象
    t1.start()  #执行线程
    t2.start()
#输出
# running on number:1
# running on number:2
自定义线程 
技术分享
import threading
def f1(arg):
    print(arg)
for i in range(10): #设置10个线程
    t = threading.Thread(target=f1,args=(i,)) #创建线程
    t.start()  #执行线程
    #输出:0-9数据
自定义线程二

 

  生产者和消费者

技术分享
import queue,time
import threading
#先进先出
q = queue.Queue(20)
#生产者
def productor(arg):
    """
    买票
    :param arg:
    :return:
    """
    q.put(str(arg)+"-买票")
for i in range(30):
    t = threading.Thread(target=productor,args=(i,))  #创建买票线程生产者,并将动态参数i传递给productor(arg)
    t.start()  #执行买票线程
    # if i == 30:
    #     break
#消费者
def consumer(arg):
    """
    服务器后台,
    :param arg:
    :return:
    """
    while True:
        print(arg,q.get())  #获取队列中存在元素
        time.sleep(1)
sk = threading.BoundedSemaphore(5)
for j in range(5):
    t = threading.Thread(target=consumer,args=(j,))#创建买票线程生产者,并将动态参数j传递给consumer(arg)
    t.start()  #执行c线程
生产者和消费者模式

    队列

  Python中常用队列方法:

  put放数据,是否阻塞,阻塞时的超时事件

  get取数据(默认阻塞),是否阻塞,阻塞时的超时事件

  qsize():获取队列中有多少个元素

  maxsize:最大支持的个数

  1、先进先出

   queue.Queue(2)方法:创建先进先出队列,参数2表示队列中最多可含有两个元素,否则超过队列中设置的最大队列则报队列满(queue.Full)错误, 

import queue
import time
#先进先出
# put放数据,是否阻塞,阻塞时的超时事件
# get取数据(默认阻塞),是否阻塞,阻塞时的超时事件
# qsize():获取队列中有多少个元素
# maxsize 最大支持的个数
q = queue.Queue(2)  #创建队列,带参数,则表示队列最大可放多少个数据
q.put(12)
q.put(13)
print(q.qsize())  #获取队列中有多少个数据
# q.put(15,timeout=2)   #当队列中设置长度为2时,此时在往队列中增加元素时,超过2秒,报错queue.Full
q.put(15,block=False,timeout=2)
print(q.qsize())  #获取队列中有多少个数据
print("------------")
print(q.get())  #获取队列中元素
print(q.get(block=False))  #get取数据(默认阻塞),当block=False表示不堵塞
print(q.get(timeout=2))  #设置获取队列中元素超时时间2秒,超过则直接执行下面代码
print(q.get(block=False,timeout=2))   #

  2、后进先出

  queue.LifoQueue()方法:创建后进先出队列

import queue
# 队列后进先出
q= queue.LifoQueue()
q.put(123)  #将数据put队列中
q.put(234)
print(q.get()) #获取队列中的元素
#输出:234

  3、队列优先级

  queue.PriorityQueue()方法:创建队列优先级队列

import  queue
#队列优先级,按照下标进行优先级别排序
q = queue.PriorityQueue()  #创建队列优先级别q值
q.put((1,"lcj"))  #将数据put队列中
q.put((0,"alex"))
q.put((4,"xiaoluo"))
print(q.get())   #获取队列中的元素
#输出(0, ‘alex‘)

  4、双向队列

  queue.deque()方法:创建双向队列

import queue
#双向队列
q = queue.deque() #创建双向队列
q.append(33)  #将元素33加入队列中
q.append(44)
q.append(55)
q.appendleft(99)
print(q.pop())  #pop:在队列中提取一个元素
print(q.popleft())  #99

 

  线程锁(Lock,RLock)

  一个进程下有多个线程,且线程是共享进程资源,每一个线程肯能执行N条语句后,当多个线程同时修改一天数据时可能会出现脏数据,所以,出现线程锁,同一时间只允许一个线程操作

  Lock,RLock区别?

  Lock是阻塞其他线程对共享资源的访问,且同一线程只能acquire一次,如多于一次就出现了死锁,程序无法继续执行。为了保证线程对共享资源的独占,又避免死锁的出现,就有了RLock。RLock允许在同一线程中被多次acquire,线程对共享资源的释放需要把所有锁都release。即n次acquire,需要n次release

  线程在未加锁情况下,主线程回一次性执行所有的线程且会产生多个线程同用一个数据,产生脏数据  

技术分享
#未加锁
import threading
import time
num = 0  #定义一个全局变量
def show(arg):
    global num   #对全局变量num进行重新赋值
    time.sleep(1)
    num +=1
    print(num)
for i in range(10):  #定义十个线程
    t = threading.Thread(target=show,args=(i,))#
    t.start()  # 执行线程
线程未加锁

线程加锁特定:每一个线程执行一条数据,防止多个线程重用一条数据产生脏数据

技术分享
#加锁
import threading
import time
num = 0  #定义一个全局变量
lock = threading.RLock()  #定义一个线程锁
def show(arg):
    lock.acquire()  #枷锁操作
    global num   #对全局变量num进行重新赋值
    num +=1
    time.sleep(1)  #每一个线程执行时睡眠一秒钟
    print(num)
    lock.release()  #解锁操作
for i in range(10):  #定义十个线程
    t = threading.Thread(target=show,args=(i,))#创建主线程
    t.start()  # 执行线程
线程加锁

   信号量(Semaphore)

  互拆锁:同时只允许一个线程更改数据,而Semapphore是同时允许一定数量的线程更改数据,比如:一个萝卜一个坑,只有空余的坑才能进入占座

  BoundedSemaphore(5)方法:表示5个线程同时运行

import threading,time
def run(n):
    k1.acquire()  #加锁
    time.sleep(1)
    print("run the threading:%s" %n)
    k1.release() #解锁
if __name__ == __main__:
    num = 0
    k1 = threading.BoundedSemaphore(5) #设置线程最多同时运行5个线程
    for i in range(30):  #设置线程数为30
        t = threading.Thread(target=run,args=(i,))  #主线程
        t.start() #执行线程

  事件(event)

  python线程的事件用于主线程控制其他子线程的执行,事件主要提供是三个方法:set、wait、clear

  事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。  

  • clear:将“Flag”设置为False
  • set:将“Flag”设置为True 
  • wait:检测主线程状态
import threading,time
#代码从上往下执行,由t.start调用子线程执行lcj方法,第一次输出i值,
#当线程执行值wait时,判断线程状态,红灯(clear):停,绿灯(set):行
#主线程继续运行至inp,当用户输如true时,主线程执行set方法,将线程转态设置绿灯,继续运行,则子线程继续执行i+100
def lcj(i,event): print(i) time.sleep(1) event.wait() #检测是什么等,如果是红灯,则停,绿灯:则行 print(i+100) event_obj = threading.Event() #创建一个线程事件 for i in range(10): #设置30个线程 t = threading.Thread(target=lcj,args=(i,event_obj,)) t.start() #调用线程 event_obj.clear() #设置成红灯 inp = input(">>:").strip() if inp ==true: #如果用户输入:true,系统则经过wait检查之后执行其下面打印语句 event_obj.set() #设置成绿灯

   条件(Contion)

  当线程处于等待时,只有满足某条件时,才释放N个线程继续执行子线程下代码

  1、条件wait方法用法

import threading
def run(i,con):
    print(i)
    con.acquire()  #加锁
    con.wait()  #检测主线程状态,当主线程客户端输入3,则子线程输出队列中前三个元素每一个各加100输出
    print(i+100)
    con.release()  #解锁
# if __name__ == "__main__":
con = threading.Condition()
for i in range(10):
    t = threading.Thread(target=run,args=(i,con,))  #创建主线程
    t.start() #执行子线程
while True:
    inp = input(">>>:").strip()
    if inp == q:
        break
    # 此三个必须放在一起、acquire、notify、release
    con.acquire()  #加锁
    con.notify(int(inp))  #notify()方法不会释放所占用的锁,输入几就释放几个元素,比如:客户输入2,则子线程输出100,101
    con.release()  #解锁

  2、条件wait_for方法:  

import  threading
def contion():
    ret = False
    i = input(">>>>:")
    if i =="true":
        ret = True
    else:
        ret = False
    return ret
def run(i,con):
    print(i)  #输出所有线程
    con.acquire()#加锁
    con.wait_for(contion)  #wait_for:等待条件=true成立,则执行contion函数中语句
    print(i+100)  #当主线程条件成立,则执行i+100
    con.release() #解锁
c = threading.Condition()  #创建线程条件
for i in range(10): #设置10个线程
    t = threading.Thread(target=run,args=(i,c,)) #创建线程
    t.start() #执行线程

  定时器(time)

  python中定时器:指定N秒之后执行子线程操作  

from threading import Timer
def lcj():
    print("hello,world")
t = Timer(1,lcj)  #定时器:1秒后执行lcj函数,并输出hello,world
t.start() #执行线程

  2、多线程 

三、线程池

  线程池原理:可理解为是一个容器(容器中可指定大小),在容器中取一个线程则少一个线程,再无没有线程时等待,线程执行完毕,将线程归还给线程池

  1、基本线程池 

#自定义线程池
import queue
import threading
import time
class ThreadPool:   #进程池类
    def __init__(self,maxsize=5):  #创建构造方法,设置最大线程为5个线程
        self.maxsize = maxsize
        self._q = queue.Queue(maxsize)#_q:下划线 为了区别,创建队列
        for i in range(maxsize):  #每次循环最大线程
            self._q.put(threading.Thread) #将类名put队列中
    def get_thread(self):
        return self._q.get()  #获取线程
    def add_thread(self):
        self._q.put(threading.Thread)#队列含有:threading.Thread
pool = ThreadPool(5)  #线程池最大个数为5,即每次最多只能获取5个线程
def task(arg,p):
    print(arg)
    time.sleep(1)  #
    p.add_thread()#从线程池取完一次数据之后 再次想线程池中获取数据,直到执行完100个线程
for i in range(100):  #循环100次
    #t==threading.Thread类
    t = pool.get_thread()
    obj = t(target=task,args=(i,pool,))  #创建主线程
    obj.start()   #执行子线程池

   2、高级线程池 

四、Python协程

  协程原理:利用一个线程,分解一个线程成为多个“微线程”

  线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

  协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

  协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

  协程常用方法

  greenlet:底层  

from greenlet import greenlet
def f1():
    print(12)
    lcj2.switch()
    print(34)
    lcj2.switch()
def f2():
    print(56)
    lcj1.switch()
    print(78)
lcj1 = greenlet(f1)
lcj2 = greenlet(f2)
lcj1.switch()
#第一步:lcj1.switch():执行f1函数并输出12
#第二步:在函数f1中lcj2.switch(),则执行f2函数并输出56
#第三步:在f2函数执行lcj1.switch(),则执行f1函数,并输出34
#第四步:在f1函数执行lcj2.switch(),则执行f2函数,并输出78
#最后输出顺序为:12,56,34,78

  gevent:高级

import gevent
def foo():
    print("running in foo")
    gevent.sleep(1)
    print(Explicit context switch to foo agein)

def bar():
    print(explicit context to bar)
    gevent.sleep(1)
    print(implicit context switch back to bar)
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])
# running in foo
# explicit context to bar
# Explicit context switch to foo agein
# implicit context switch back to bar

  遇到IO操作自动切换:  

from gevent import monkey;monkey.patch_all()
#monkey.patch_all()  修改原来socket,   对IO请求进行封装
import gevent
import requests
def f(url):
    print(GET: %s% url)  #获取所有URL
    resp = requests.get(url)  #获取URL
    data = resp.text  #获取URL内容
    print("%d bytes received from %s." % (len(data),url))
gevent.joinall([
    gevent.spawn(f, "https://www.python.org/"),
    gevent.spawn(f, "https://www.yahoo.com/"),
    gevent.spawn(f, "https://github.com/"),
])

 

 

Python_进程、线程及协程

标签:

原文地址:http://www.cnblogs.com/lcj0703/p/5682035.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!