码迷,mamicode.com
首页 > 编程语言 > 详细

Python 线程

时间:2017-09-08 11:57:24      阅读:184      评论:0      收藏:0      [点我收藏+]

标签:用法   exe   个人   syn   down   ssi   .com   线程池   rand   

一、定义:

  线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是执行单位

二、线程定义方式:

  1、使用替换threading模块提供的Thread

from threading import Thread
from multiprocessing import Process

def task():
    print(is running)

if __name__ == __main__:
    t=Thread(target=task,)
    # t=Process(target=task,)
    t.start()
    print()

  2、自定义类,继承Thread

from threading import Thread
from multiprocessing import Process
class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(%s is running %self.name)

if __name__ == __main__:
    t=MyThread(egon)
    # t=Process(target=task,)
    t.start()
    print()

三、多线程共享同一个进程内的资源

  因为线程间的数据是共享的所以都会用同一个资源

from threading import Thread
from multiprocessing import Process
n=100
def work():
    global n
    n=0

if __name__ == __main__:

    # p=Process(target=work,)
    # p.start()
    # p.join()
    # print(‘主‘,n)

    t=Thread(target=work,)
    t.start()
    t.join()
    print(,n)

四、其它相关函数

Thread实例对象的方法
  # isAlive(): 返回线程是否活动的。
  # getName(): 返回线程名。
  # setName(): 设置线程名。

threading模块提供的一些方法:
  # threading.currentThread(): 返回当前的线程变量。
  # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
from threading import Thread,activeCount,enumerate,current_thread
import time
def task():
    print(%s is running %current_thread().getName())
    time.sleep(2)

if __name__ == __main__:
    t=Thread(target=task,)
    t.start()
    t.join()
    print(t.is_alive())
    print(t.getName())
    print(enumerate())
    print()
    print(activeCount())
current_thread的用法
from threading import Thread,activeCount,enumerate,current_thread
from multiprocessing import Process
import time

def task():
    print(%s is running %current_thread().getName())
    time.sleep(2)

if __name__ == __main__:
    p=Process(target=task)
    p.start()
    print(current_thread())
from threading import Thread,activeCount,enumerate,current_thread
from multiprocessing import Process
import time

def task():
    print(%s is running %current_thread().getName())
    time.sleep(2)

if __name__ == __main__:
    t1=Thread(target=task)
    t2=Thread(target=task)
    t3=Thread(target=task)
    t1.start()
    t2.start()
    t3.start()
    print(current_thread())

五、守护线程

      守护线程则是主线程等待其它非守护线程结束,主线程结束则守护线程结束

#再看:守护线程

from threading import Thread
import time

def task1():
    print(123)
    time.sleep(10)
    print(123done)

def task2():
    print(456)
    time.sleep(1)
    print(456done)

if __name__ == __main__:
    t1=Thread(target=task1)
    t2=Thread(target=task2)
    t1.daemon=True
    t1.start()
    t2.start()
    print()

六、线程互斥锁

  即:线程中谁抢到了锁谁去执行,没有抢到的则在等待

from threading import Thread,Lock
import time
n=100
def work():
    global n
    mutex.acquire()#抢到锁加锁
    temp=n
    time.sleep(0.1)
    n=temp-1
    mutex.release()#解锁

if __name__ == __main__:
    mutex=Lock()
    l=[]
    start=time.time()
    for i in range(100):
        t=Thread(target=work)
        l.append(t)
        t.start()

    for t in l:
        t.join()
    print(run time:%s value:%s %(time.time()-start,n))

七:互斥锁与join的区别

互斥锁只是在重要的代码阶段加上谁抢到谁处理,而join则是一个一个的全部把所有的代码都执行,大大加大执行代码的时间

join实例:

from threading import Thread,Lock
import time
n=100
def work():
    time.sleep(0.05)
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1


if __name__ == __main__:
    start=time.time()
    for i in range(100):
        t=Thread(target=work)
        t.start()
        t.join()

    print(run time:%s value:%s %(time.time()-start,n))

互斥锁实例:

#互斥锁
from threading import Thread,Lock
import time
n=100
def work():
    time.sleep(0.05)
    global n
    mutex.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    mutex.release()

if __name__ == __main__:
    mutex=Lock()
    l=[]
    start=time.time()
    for i in range(100):
        t=Thread(target=work)
        l.append(t)
        t.start()

    for t in l:
        t.join()
    print(run time:%s value:%s %(time.time()-start,n))

八:线程死锁与递规锁

  死锁:则是几个人在抢几把锁,但是一个人抢一把锁,在没有解这把锁,则是去抢另一把,则永远无法抢到,也没法解除当前的锁,由为死锁

from threading import Thread,Lock,RLock
import time
mutexA=Lock()
mutexB=Lock()
class Mythread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(\033[45m%s 抢到A锁\033[0m %self.name)
        mutexB.acquire()
        print(\033[44m%s 抢到B锁\033[0m %self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(\033[44m%s 抢到B锁\033[0m %self.name)
        time.sleep(1)
        mutexA.acquire()
        print(\033[45m%s 抢到A锁\033[0m %self.name)
        mutexA.release()
        mutexB.release()

递归锁:

  则需要threading导入RLock,用这个每一个人拿到的都是这把锁,解除这把锁之后才能拿到下把锁,这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

#递归锁
from threading import Thread,Lock,RLock
import time
mutex=RLock()
class Mythread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutex.acquire()
        print(\033[45m%s 抢到A锁\033[0m %self.name)
        mutex.acquire()
        print(\033[44m%s 抢到B锁\033[0m %self.name)
        mutex.release()
        mutex.release()

    def f2(self):
        mutex.acquire()
        print(\033[44m%s 抢到B锁\033[0m %self.name)
        time.sleep(1)
        mutex.acquire()
        print(\033[45m%s 抢到A锁\033[0m %self.name)
        mutex.release()
        mutex.release()

九:信号量

同进程的一样

Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()

from threading import Thread,current_thread,Semaphore
import time,random

sm=Semaphore(5)
def work():
    sm.acquire()
    print(%s 上厕所 %current_thread().getName())
    time.sleep(random.randint(1,3))
    sm.release()

if __name__ == __main__:
    for i in range(20):
        t=Thread(target=work)
        t.start()

十:Event

同进程的一样

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

技术分享
event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。
from threading import Thread,current_thread,Event
import time
event=Event()

def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise ConnectionError(链接失败)
        print(%s 等待第%s次链接mysql %(current_thread().getName(),count))
        event.wait(0.5)
        count+=1

    print(%s 链接ok % current_thread().getName())


def check_mysql():
    print(%s 正在检查mysql状态 %current_thread().getName())
    time.sleep(1)
    event.set()


if __name__ == __main__:
    t1=Thread(target=conn_mysql)
    t2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    t1.start()
    t2.start()
    check.start()

十一:定时器

  定义:指定n秒后执行某操作

from threading import Timer

def hello(n):
    print("hello, world",n)

t = Timer(3, hello,args=(11,))#3秒后执行
t.start()  # after 1 seconds, "hello, world" will be printed

十二:线程queue

  定义:线程的队列,使用import queue,用法与进程Queue一样

import queue

q=queue.Queue(3) #队列:先进先出
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())


q=queue.LifoQueue(3) #堆栈:后进先出
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())


q=queue.PriorityQueue(3) #数字越小优先级越高
q.put((10,data1))
q.put((11,data2))
q.put((9,data3))

print(q.get())
print(q.get())
print(q.get())

十三、线程池

定义:则是同时开启多少线程,如果并发则用的线程名则还是已开启的

#线程池
import requests #pip3 install requests
import os,time,threading
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
def get_page(url):
    print(<%s> get :%s %(threading.current_thread().getName(),url))
    respone = requests.get(url)
    if respone.status_code == 200:
        return {url:url,text:respone.text}

def parse_page(obj):
    dic=obj.result()
    print(<%s> parse :%s %(threading.current_thread().getName(),dic[url]))
    time.sleep(0.5)
    res=url:%s size:%s\n %(dic[url],len(dic[text])) #模拟解析网页内容
    with open(db.txt,a) as f:
        f.write(res)


if __name__ == __main__:

    # p=Pool(4)
    p=ThreadPoolExecutor(3) #同时开始3个线程
    urls = [
        http://www.baidu.com,
        http://www.baidu.com,
        http://www.baidu.com,
        http://www.baidu.com,
        http://www.baidu.com,
        http://www.baidu.com,
        http://www.baidu.com,
    ]


    for url in urls:
        # p.apply_async(get_page,args=(url,),callback=parse_page)
        p.submit(get_page,url).add_done_callback(parse_page)

    p.shutdown()
    print(主进程pid:,os.getpid())

 

Python 线程

标签:用法   exe   个人   syn   down   ssi   .com   线程池   rand   

原文地址:http://www.cnblogs.com/liuxiaowei/p/7493386.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!