码迷,mamicode.com
首页 > 系统相关 > 详细

网络编程基础---并发编程----Manager(共享字典,列表)---joinabelqueue----进程池---回调函数

时间:2017-10-12 17:01:19      阅读:268      评论:0      收藏:0      [点我收藏+]

标签:lock   time   ssi   syn   amp   llb   for   角色   while   

生产者消费者模型:

1 程序中有两类角色
     一类负责生产数据(生产者) 一类负责处理数据(消费者)

2 引入生产者消费者模型的目的:
    平衡生产者 与 消费者之间的 速度差

3 如何实现:
    生产者---队列----消费者  (解耦合)

 

1  joinablequeue:

from multiprocessing import Process,JoinableQueue
import time,random

def producer(name,q,food):
    for i in range(1,10):
        time.sleep(0.2)
        res=%s 制作的第%s %s%(name,i,food)
        q.put(res)
    q.join()   # 等到队列里面没有内容
def consumer(name,q):
    while True:
        res=q.get()
        if not res:break
        print(%s 吃了 %s%(name,res))
        q.task_done()  #  取一次 次数减一

if __name__==__main__:
    q=JoinableQueue()
    p1=Process(target=producer,args=(egon,q,baozi))
    p2=Process(target=producer,args=(alex,q,baozi))
    p3=Process(target=producer,args=(elen,q,baozi))
    c1=Process(target=consumer,args=(a,q))  #共享的q
    c2=Process(target=consumer,args=(b,q))  #共享的q
    p1.start()
    p2.start()
    p3.start()

    c1.daemon=True
    c2.daemon=True  # 消费者随着生产者执行结束   守护进程---

    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

 

2 .Manager:

 

from multiprocessing import Manager,Process,Lock

def work(d,lock):
    with lock:
        d[count]-=1

if __name__==__main__:
m
=Manager() d=m.dict({count:50}) # Manager 创建共享字典 # d=m.list() lock=Lock() p_l=[] for i in range(20): p=Process(target=work,args=(d,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(d)

 

 

 

3.同步异步 --- 阻塞非阻塞:

同步调用---指的是提交任务的方式 ==== apply 提交晚任务 原地等待任务结束
阻塞(进程的一种状态)
---遇到IO就阻塞--剥夺cpu执行权限 异步调用---提交完任务后 不会在原地等待 会继续提交下一个任务
非阻塞(进程处于 运行状态 或者 就绪状态)

 

4.进程池:

              ===================== 进程池 ==================

                        并发量不高-----大并发下不能使用

from multiprocessing import Pool       # 开进程 控制进程的数量
import time,os,random

def work(n):
    print(%s i working %os.getpid())
    time.sleep(random.randint(1,2))  #  阻塞时间
    return n

if __name__==__main__:

    p=Pool(4)   # 四个进程  进程池   一共只有四个进程--- 一个完成任务 接着 去做另一个任务
    obj_ls=[]

    for i in range(10):
        # res=p.apply(work,args=(i,))        # 同步调用 等待任务结束 拿到结果---- 提交启动进程任务 p= Process(target=work) -- p.start()
        # print(res)
        obj=p.apply_async(work,args=(i,))    # 异步调用---只不断的提交任务到  进程池 开进程 ----并不拿结果
        obj_ls.append(obj)
        # print(obj.get()) # 等待结果

    p.close()  # 关闭apply_async请求
    p.join()  # 等待进程池结束

    for obj in obj_ls:
        print(obj.get())

 

5  回调函数:

 

import requests
import os
from multiprocessing import Pool,Process
def get(url): print(%s get %s%(os.getpid(),url)) response = requests.get(url) if response.status_code==200: return {url:url,text:response.text} def parse(data): print(os.getpid(),data) res=%s :%s\n %(data[url],len(data[text])) with open(demo.txt,a) as f: f.write(res) if __name__==__main__: urls=[https://www.baidu.com, https://www.hao123.com, http://cn.bing.com/?mkt=zh-CN&mkt=zh-CN&mkt=zh-CN&mkt=zh-CN&mkt=zh-CN&mkt=zh-CN, ] p=Pool() url_ls=[] for url in urls: url_ls.append(p.apply_async(get,args=(url,),callback=parse)) # 主进程 负责回调函数 # get函数的返回值-->> 作为parse函数的参数 p.close() p.join()

 

网络编程基础---并发编程----Manager(共享字典,列表)---joinabelqueue----进程池---回调函数

标签:lock   time   ssi   syn   amp   llb   for   角色   while   

原文地址:http://www.cnblogs.com/big-handsome-guy/p/7656591.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!