码迷,mamicode.com
首页 > 编程语言 > 详细

进程池与线程池、协程、协程实现TCP服务端并发、IO模型

时间:2019-08-20 01:22:33      阅读:116      评论:0      收藏:0      [点我收藏+]

标签:cep   ext   sock   一个   回调函数   工具   join   +=   手动   

进程池与线程池、协程、协程实现TCP服务端并发、IO模型

一、进程池与线程池

1、线程池

'''
开进程开线程都需要消耗资源,只不过两者比较的情况下线程消耗的资源比较少
在计算机能够承受范围内最大限度的利用计算机
什么是池?
    在保证计算机硬件安全的情况下最大限度的利用计算机
    池其实是降低了程序的运行效率,但是保证了计算机硬件的安全
    (硬件的发展跟不上软件的速度)
'''
from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(5)  # 括号中可以传参数,指定线程池内的线程个数,也可以不传,不传默认是当前所在计算机的CPU个数乘5


def task(n):
    print(n)
    time.sleep(2)


# pool.submit(task, 1)  # 朝线程池中提交任务,异步提交

'''
任务的提交方式:
    同步:原地等待任务的返回结果
    异步:不等待任务的返回结果,直接执行下一行代码
异步的结果怎么拿?
'''
for i in range(20):
    res = pool.submit(task, i)  # 任务(task)的返回结果,是Future类的一个对象
    print(res)  # <Future at 0x31aeeb0 state=pending>,这个对象是及时生成的,所以可以立马返回,不改变异步执行
    # 但是res的值是在任务执行完以后才会有
    # print(res.result())  # 通过result取值,并且是原地等待结果的返回,这一行代码直接将异步执行改为了同步执行
# 如果还是想要程序异步执行,同时还能拿到任务的返回结果,就要用一个列表将res全部放进去,待任务全部提交完以后,再for循环拿出res的值

# 异步提交任务,待任务全部执行完毕后,拿到任务的返回值
from concurrent.futures import ThreadPoolExecutor
import time


def task(n):
    print(n)
    time.sleep(1)
    return n ** 2


pool = ThreadPoolExecutor(5)
t_list = []
for i in range(20):
    res = pool.submit(task, i)
    t_list.append(res)
pool.shutdown()  # 关闭池子,等待池子中所有的任务执行完毕后,才会往下运行代码
for t in t_list:
    print('>>>>:', t.result())

2、进程池+异步回调机制

from concurrent.futures import ProcessPoolExecutor
import time
import os

pool = ProcessPoolExecutor(5)


def task(n):
    print(n,os.getpid())  #获取当前进程号
    time.sleep(1)
    return n ** 2


def call_back(n):
    print('拿到了异步提交任务的返回结果', n.result())


if __name__ == '__main__':
    t_list = []
    for i in range(20):
        res = pool.submit(task, i).add_done_callback(call_back)  # 提交任务的时候,绑定一个回调函数,一旦该任务有结果,立刻执行对应的回调函数
        t_list.append(res)

pool.shutdown()
for t in t_list:
    print('>>>:', t.result())
'''
异步回调机制:当异步提交的任务有返回结果之后,会自动触发回调函数的执行
根据打印出的进程号,可以发现:
    池子中创建的进程创建一次就不会再创建了
    至始至终用的都是最初的那几个
    这样的话节省开辟进程的资源
上述结论对线程同样适用
'''

二、协程

进程:资源单位
线程:执行单位
协程:单线程下实现并发
并发:切换+保存状态
    ps:看起来像是同时运行的,就可以称之为并发
协程:完全是程序员自己意淫出来的名词
    单线程下实现并发
并发的条件:多道技术
    空间上的复用:共用同一套操作系统
    时间上的复用:切换+保存状态
程序员自己通过代码自己检测程序中的IO
一旦遇到IO自己通过代码切换
给操作系统的感觉就是你这个线程没有任何的IO
ps:欺骗操作系统,让他误以为你这个程序一直没有IO
    从而保证程序在运行态和就绪态来回切换
    提升代码的运行效率
切换+保存状态就一定能够提升效率吗?
    当你的任务是IO密集型的情况下 提升效率
    如果你的任务是计算密集型的   降低效率
极限提升CPU工作效率的方式:
    多进程下开多线程
    多线程下再开协程
# 串行执行  1.5458002090454102
import time


def func1():
    for i in range(10000000):
        i + 1


def func2():
    for i in range(10000000):
        i + 1


start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)

# 基于yield并发执行  2.3516733646392822
# yield可以保存上一次的结果
import time


def func1():
    while True:
        10000000 + 1
        yield


def func2():
    g = func1()
    for i in range(10000000):
        # time.sleep(100)  # 模拟IO,yield并不会捕捉到并自动切换
        i + 1
        next(g)


start = time.time()
func2()
stop = time.time()
print(stop - start)
'''
需要找到一个能够识别IO的一个工具————gevent模块,这是一个第三方模块,需要我们手动下载
'''
from gevent import monkey;monkey.patch_all()
from gevent import spawn
import time
'''
注意gevent模块没办法自动识别time.sleep()等IO情况
需要你手动再配置一个参数
from gevent import monkey;monkey.patch_all(),使spawn能够监测time.sleep()等IO
由于该模块经常使用,所以建议写成一行
'''


def heng():
    print('哼')
    time.sleep(2)
    print('哼')


def ha():
    print('哈')
    time.sleep(3)
    print('哈')


def heiheihei():
    print('嘿嘿嘿')
    time.sleep(4)
    print('嘿嘿嘿')


start = time.time()
g1 = spawn(heng)  # 对传入的函数名,加括号自动调用,并且监测其状态
g2 = spawn(ha)
g3 = spawn(heiheihei)
g1.join()  # 等待任务运行完毕
g2.join()
g3.join()
print(time.time() - start)  # 4.0027806758880615

三、通过协程实现TCP服务端并发

# 服务端
from gevent import monkey;monkey.patch_all()
from gevent import spawn
import socket

server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)


def talk(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            print(data.decode('utf-8'))
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()


def server1():
    while True:
        conn, addr = server.accept()
        spawn(talk, conn)


if __name__ == '__main__':
    g1 = spawn(server1)
    g1.join()
    
    
# 客户端
import socket
from threading import Thread, current_thread


def client1():
    client = socket.socket()
    client.connect(('127.0.0.1', 8080))
    n = 0
    while True:
        data = f'{current_thread().name} {n}'
        client.send(data.encode('utf-8'))
        res = client.recv(1024)
        print(res.decode('utf-8'))
        n += 1


for i in range(400):
    t = Thread(target=client1)
    t.start()

进程池与线程池、协程、协程实现TCP服务端并发、IO模型

标签:cep   ext   sock   一个   回调函数   工具   join   +=   手动   

原文地址:https://www.cnblogs.com/DcentMan/p/11380557.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!