码迷,mamicode.com
首页 > 其他好文 > 详细

gevent动态随时添加任务

时间:2017-07-20 15:30:30      阅读:164      评论:0      收藏:0      [点我收藏+]

标签:rom   cep   exce   join()   mon   multi   字符   from   需要   

关于爬虫,有scrapy框架,也有requests加协程 协程 进程的方法。

相关的包很多,比如threading 、threadpool、multiprocessing,还有threadpoolexecutor、processpoolexecutor这两个。

协程gevent pool的用法和threadpool 很像,但关于使用协程我需要一个像threadpoolexecutor一样的强大方法,可以随时追加任务的。

 

# -*- coding: utf-8 -*-
import gevent,time,redis
from gevent.pool import Pool
from gevent.queue import JoinableQueue
from gevent import monkey
monkey.patch_all(time=True)
class GeventPoolExecutor(object):
    def __init__ (self,max_works):
        self._q = JoinableQueue()

        for i in range(max_works):
            gevent.spawn(self.worker)
        self._q.join()

    def worker(self):
        while True:
            fn= self._q.get()
            try:
                exec(fn)
            except Exception,e:
                print e
            finally:
                pass
                self._q.task_done()

    def submit(self,fn):
        self._q.put(fn)

def func(a,b):
    print a*10,b+100
    time.sleep(2)

if __name__=="__main__":                   ###测试

    gevent_pool_executor=GeventPoolExecutor(max_works=3)
    while(1):

        #print gevent_pool_executor.q.qsize()
        if gevent_pool_executor._q.qsize()<5:
            functionx=redis.Redis(host=127.0.0.1, port=6379, db=0).blpop(ceshi)[1]
       #functionx是一个字符串,内容例如为func(1,2),但是是个字符串,在worker中用exec来执行他 gevent_pool_executor.submit(functionx)
else: time.sleep(0.001)

 

另一个文件

#coding=utf-8

r=redis.Redis(host=‘127.0.0.1‘, port=6379, db=0)

for i in range(100,2000):

   #r.rpush(‘seeds‘, json.dumps(dictx))
r.rpush(‘ceshi‘,‘func(%s,%s)‘%(i,i))
print ‘func(%s,%s)‘%(i,i)

 

gevent动态随时添加任务

标签:rom   cep   exce   join()   mon   multi   字符   from   需要   

原文地址:http://www.cnblogs.com/ydf0509/p/7210950.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!