码迷,mamicode.com
首页 > 其他好文 > 详细

使用gevent包实现concurrent.futures.executor 相同的公有方法。组成鸭子类

时间:2019-07-09 00:19:52      阅读:99      评论:0      收藏:0      [点我收藏+]

标签:cut   运行   sock   tle   star   exit   防止   register   util   

类名不同,但公有方法的名字和提供的基本功能大致相同,但两个类没有共同继承的祖先或者抽象类 接口来规定他,叫鸭子类。

 

使并发核心池能够在 threadpoolexetor和geventpoolexecutor自由选一种切换。

 

 

实现方式。

# -*- coding: utf-8 -*-
# @Author  : ydf
# @Time    : 2019/7/2 14:11
import atexit
import time
import warnings
from collections import Callable

import gevent
from gevent import pool as gevent_pool
from gevent import monkey

from gevent.queue import JoinableQueue

from app.utils_ydf import LoggerMixin, nb_print, LogManager


def check_gevent_monkey_patch(raise_exc=True):
    if not monkey.is_module_patched(socket):
        if raise_exc:
            warnings.warn(f检测到 你还没有打gevent包的猴子补丁,请在所运行的脚本第一行写上  【import gevent.monkey;gevent.monkey.patch_all()】  这句话。)
            raise Exception(f检测到 你还没有打gevent包的猴子补丁,请在所运行的脚本第一行写上  【import gevent.monkey;gevent.monkey.patch_all()】  这句话。)
    else:
        return 1


logger_gevent_timeout_deco = LogManager(logger_gevent_timeout_deco).get_logger_and_add_handlers()


def gevent_timeout_deco(timeout_t):
    def _gevent_timeout_deco(f):
        def __gevent_timeout_deceo(*args, **kwargs):
            timeout = gevent.Timeout(timeout_t, )
            timeout.start()
            try:
                f(*args, **kwargs)
            except gevent.Timeout as t:
                logger_gevent_timeout_deco.error(f函数 {f} 运行超过了 {timeout_t} 秒)
                if t is not timeout:
                    nb_print(t)
                    # raise  # not my timeout
            finally:
                timeout.close()

        return __gevent_timeout_deceo

    return _gevent_timeout_deco


class GeventPoolExecutor(gevent_pool.Pool):
    def __init__(self, size=None, ):
        check_gevent_monkey_patch()
        super().__init__(size, )

    def submit(self, *args, **kwargs):
        self.spawn(*args, **kwargs)

    def shutdown(self):
        self.join()


if __name__ == __main__:
    monkey.patch_all()


    def f2(x):

        time.sleep(1)
        nb_print(x)


    pool = GeventPoolExecutor(4)
  
    for i in range(15):
        nb_print(f放入{i})
        pool.submit(gevent_timeout_deco(8)(f2), i)
    nb_print(66666666)

 

对于收尾任务,threadpoolexecutor和这个还有少量不同,这个geventpool在脚本退出前不去主动join(shutdown)他,最后四个任务就会丢失 。 

threadpoolexecutor起的是守护线程,按道理也会出现这样的结果,但是concurrent包里面做了atexit处理。这里也可以使用atexit.register注册shutdown达到同样的目的,不需要手动调用join防止脚本提前退出。

 

 

实现eventlet的核心池,同理。

 

使用gevent包实现concurrent.futures.executor 相同的公有方法。组成鸭子类

标签:cut   运行   sock   tle   star   exit   防止   register   util   

原文地址:https://www.cnblogs.com/ydf0509/p/11154542.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!