码迷,mamicode.com
首页 > 编程语言 > 详细

线程池

时间:2018-07-18 23:30:16      阅读:205      评论:0      收藏:0      [点我收藏+]

标签:sleep   bin   注意   last   imp   服务   分享图片   cpu核数   recent   

之前用的multiprocessing.Process和threading.Thread都是一个线程只能执行一个任务,如果想用一个线程执行多个任务,该怎么办呢?

from threading import Thread

def func():
    print(执行一个线程)

if __name__ == __main__:
    t = Thread(target=func)
    t.start()
    t.start()

>>
执行一个线程
Traceback (most recent call last):
  File "H:/exercise/并发/进程池与线程池/demo.py", line 9, in <module>
    t.start()
  File "C:\python36\lib\threading.py", line 842, in start
    raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once

可重复利用的线程

from threading import Thread
import queue
import time

class MyThread(Thread):
    def __init__(self):
        super().__init__()
        self.queue = queue.Queue() #实例一个队列
        self.daemon = True # 主线程结束,子线程也当结束
        self.start() # 实例的时候就启动线程

    def run(self):  # 不断获取并执行任务
        while True:
            func,args,kwargs = self.queue.get()  # 等待获取任务,没有任务就阻塞
            try:
                func(*args,**kwargs)  # 执行任务
            finally:
                self.queue.task_done()  # 告诉queue 这次任务执行完毕 队列计数器会减1

    def apply_async(self,func,args=(),kwargs={}):
        self.queue.put((func,args,kwargs)) # 把任务添加到队列中 队列计数器会加1

    def join(self):
        self.queue.join() # 若队列还有任务,则会阻塞,若队列没有任务了,不会阻塞 队列根据计数器判断是否还有任务

def func1():
    time.sleep(2)
    print(任务1)
def func2():
    time.sleep(2)
    print(任务2)
def func3():
    time.sleep(2)
    print(任务3)

if __name__ == __main__:
    thread = MyThread()
    thread.apply_async(func1) # 添加任务到线程队列
    thread.apply_async(func2)
    thread.apply_async(func3)
    thread.join()  # 如果没有这个 会因为主线程结束,子线程不执行,有了这个,当队列任务没执行完之前,将阻塞再这里
    print(执行完毕!)

>>
任务1
任务2
任务3
执行完毕!

线程池

技术分享图片

技术分享图片

技术分享图片

 

 

线程池的简单实现

方法一:

import threading
import queue
import time

class MyThreadPool:
    def __init__(self,n):
        self.queue = queue.Queue()
        for i in range(n):
            threading.Thread(target=self.worker,daemon=True).start()

    def worker(self): #不断获取并执行任务
        while True:
            func,args,kwargs = self.queue.get() # 获取任务 没有任务就阻塞
            func(*args,**kwargs)  #执行任务
            self.queue.task_done() #每次队列加元素,计数器+1,每次task_done() 计数器-1,如果计数器为0,认为队列任务结束

    def apply_async(self,func,args=(),kwargs={}): # 把任务放到队列
        self.queue.put((func,args,kwargs))

    def join(self):
        self.queue.join() #queue.join() 队列里有元素,会阻塞,直到队列没有元素,才会不阻塞

def func1():
    time.sleep(2)
    print(任务1)
def func2():
    time.sleep(2)
    print(任务2)
def func3():
    time.sleep(2)
    print(任务3)

def func4(name):
    time.sleep(2)
    print(name)

if __name__ == __main__:
    pool = MyThreadPool(3)
    pool.apply_async(func1)
    pool.apply_async(func2)
    pool.apply_async(func3)
    pool.apply_async(func4,args=(Jack,))
    pool.join() # 阻塞直至所有任务执行完,如果没有,会因为主线程结束而不执行子线程

>>
两秒后打印:
任务3
任务1
任务2
再过两秒打印:
Jack

方法二:面向对象的方式

import threading
import queue
import time


class MyThread(threading.Thread):
    def __init__(self,queue):
        super().__init__()
        self.queue = queue
        self.daemon = True  # 主线程结束,子线程也应该结束
        self.start()

    def run(self):  # 不断获取并执行任务
        while True:
            func,args,kwargs = self.queue.get() # 获取任务 没有任务就阻塞
            func(*args,**kwargs)   # 执行任务
            self.queue.task_done()  # 每次队列加元素,计数器+1,每次task_done() 计数器-1,如果计数器为0,认为队列任务结束


class Threadpool():
    def __init__(self,n):
        self.queue = queue.Queue()
        for i in range(n):
            MyThread(self.queue)  # 开启n个线程,队列等待接受任务

    def apply_async(self,func,args=(),kwargs={}): # 把任务放到队列
        self.queue.put((func,args,kwargs))

    def join(self):
        self.queue.join() #queue.join() 队列里有元素,会阻塞,直到队列没有元素,才会不阻塞

def func1():
    time.sleep(2)
    print(任务1)
def func2():
    time.sleep(2)
    print(任务2)
def func3():
    time.sleep(2)
    print(任务3)

if __name__ == __main__:
    pool = Threadpool(2)
    pool.apply_async(func1)
    pool.apply_async(func2)
    pool.apply_async(func3)
    pool.join() # 阻塞直至所有任务执行完,如果没有join,会因为主线程结束而不执行子线程
    print(任务执行完毕)

>>
2s后打印
任务2
任务1
再过2s打印
任务3
任务执行完毕

注意:线程是由解释器调度的,我们无法控制线程的执行顺序。

python自带的线程池

使用进程池

from multiprocessing import Pool #进程池
import time
def func(n):
    time.sleep(2)
    print(n)

if __name__ == __main__:
    pool = Pool(4) # 实例进程池,不传参数默认是CPU核数
    for i in range(10):
        pool.apply_async(func,args=(i,)) #把任务提交到队列
    pool.close() #关闭进程池,不让再提交任务
    pool.join() #等待队列任务都完成,规定在join()之前先要close

使用线程池

from multiprocessing.pool import ThreadPool 
import time
def func(n):
    time.sleep(2)
    print(n)

if __name__ == __main__:
    pool = ThreadPool(4) # 实例线程池,不传参数默认是CPU核数
    for i in range(10):
        pool.apply_async(func,args=(i,)) #把任务提交到队列
    pool.close() #关闭线程池,不让再提交任务
    pool.join() #等待队列任务都完成,规定在join()之前先要close

两个库的api基本一致,不过执行起来不一样,进程对计算密集型的操作比较拿手,可以调度多个CPU执行。线程是轻量级的进程,但是只是一个CPU执行,更适用于IO密集型操作。

使用线程池来实现并发服务器

import socket
from multiprocessing.pool import ThreadPool

def worker(conn,addr):
    while True:
        data = conn.recv(1024)
        if data:
            print(data.decode())
            conn.send(data)
        else:
            conn.close()
            print({}已关闭.format(addr))
            break

if __name__ == __main__:
    pool = ThreadPool()
    sock = socket.socket()
    sock.bind((‘‘,9999))
    sock.listen(5)
    print(开始监听!)
    while True:
        conn,addr = sock.accept()
        print({}已连接.format(addr))
        pool.apply_async(worker,args=(conn,addr))

 

线程池

标签:sleep   bin   注意   last   imp   服务   分享图片   cpu核数   recent   

原文地址:https://www.cnblogs.com/woaixuexi9999/p/9332880.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!