标签:
Rocket是一个轻量级,多线程,符合WSGI规范的web框架。
Rocket使用一个线程监听连接,接收到连接之后放到Queue中,有worker线程进行处理。
Rocket含有以下属性:
min_threads 必须是一个大于0的数字,Rocket在同一时刻一定有大于min_threads在执行,默认这个值是10
max_threads 是指最多有多少个线程同时在执行,默认是没有限制的,对于cPython来说因为存在GIL,所以线程数量较多不存在问题;对于Jpython,由于是真正的多线程,如果web的瓶颈在于CPU,建议max_threads为CPU核数的1.5倍,避免多线程切换降低效率。
queue_size 是指默认可以接收的连接个数,通常这个值由系统决定。
Web.py中使用到了Rocket框架,初始代码如下:
class Rocket(object): """The Rocket class is responsible for handling threads and accepting and dispatching connections.""" def __init__(self, interfaces=(‘127.0.0.1‘, 8000), method=‘wsgi‘, app_info=None, min_threads=None, max_threads=None, queue_size=None, timeout=600, handle_signals=True): self.handle_signals = handle_signals self.startstop_lock = Lock() self.timeout = timeout if not isinstance(interfaces, list): self.interfaces = [interfaces] else: self.interfaces = interfaces if min_threads is None: min_threads = DEFAULTS[‘MIN_THREADS‘] if max_threads is None: max_threads = DEFAULTS[‘MAX_THREADS‘] if not queue_size: if hasattr(socket, ‘SOMAXCONN‘): queue_size = socket.SOMAXCONN else: queue_size = DEFAULTS[‘LISTEN_QUEUE_SIZE‘] if max_threads and queue_size > max_threads: queue_size = max_threads if isinstance(app_info, dict): app_info[‘server_software‘] = SERVER_SOFTWARE self.monitor_queue = Queue() self.active_queue = Queue() self._threadpool = ThreadPool(get_method(method), app_info=app_info, active_queue=self.active_queue, monitor_queue=self.monitor_queue, min_threads=min_threads, max_threads=max_threads) # Build our socket listeners self.listeners = [Listener( i, queue_size, self.active_queue) for i in self.interfaces] for ndx in range(len(self.listeners) - 1, 0, -1): if not self.listeners[ndx].ready: del self.listeners[ndx] if not self.listeners: log.critical("No interfaces to listen on...closing.") sys.exit(1) def _sigterm(self, signum, frame): log.info(‘Received SIGTERM‘) self.stop() def _sighup(self, signum, frame): log.info(‘Received SIGHUP‘) self.restart() def start(self, background=False): log.info(‘Starting %s‘ % SERVER_SOFTWARE) self.startstop_lock.acquire() try: # Set up our shutdown signals if self.handle_signals: try: import signal signal.signal(signal.SIGTERM, self._sigterm) signal.signal(signal.SIGUSR1, self._sighup) except: log.debug(‘This platform does not support signals.‘) # Start our worker threads self._threadpool.start() # Start our monitor thread self._monitor = Monitor(self.monitor_queue, self.active_queue, self.timeout, self._threadpool) self._monitor.setDaemon(True) self._monitor.start() # I know that EXPR and A or B is bad but I‘m keeping it for Py2.4 # compatibility. str_extract = lambda l: (l.addr, l.port, l.secure and ‘*‘ or ‘‘) msg = ‘Listening on sockets: ‘ msg += ‘, ‘.join( [‘%s:%i%s‘ % str_extract(l) for l in self.listeners]) log.info(msg) for l in self.listeners: l.start() finally: self.startstop_lock.release() if background: return while self._monitor.isAlive(): try: time.sleep(THREAD_STOP_CHECK_INTERVAL) except KeyboardInterrupt: # Capture a keyboard interrupt when running from a console break except: if self._monitor.isAlive(): log.error(traceback.format_exc()) continue return self.stop() def stop(self, stoplogging=False): log.info(‘Stopping %s‘ % SERVER_SOFTWARE) self.startstop_lock.acquire() try: # Stop listeners for l in self.listeners: l.ready = False # Encourage a context switch time.sleep(0.01) for l in self.listeners: if l.isAlive(): l.join() # Stop Monitor self._monitor.stop() if self._monitor.isAlive(): self._monitor.join() # Stop Worker threads self._threadpool.stop() if stoplogging: logging.shutdown() msg = "Calling logging.shutdown() is now the responsibility of the application developer. Please update your applications to no longer call rocket.stop(True)" try: import warnings raise warnings.DeprecationWarning(msg) except ImportError: raise RuntimeError(msg) finally: self.startstop_lock.release() def restart(self): self.stop() self.start()
start方法中执行下列步骤:
1、启动关闭锁加锁
2、注册终止和重启的信号
3、启动worker线程池
4、启动监控线程,添加监控线程到线程池中
5、启动所有的监听线程
6、释放启动关闭锁
7、进入while(监控线程存活)循环,每次休眠1s
stop方法:
1、启动关闭锁加锁
2、所有监听线程设置ready标记为为False
3、等待所有监听线程结束
4、等待监控线程结束
5、关闭worker线程池
6、输出关闭log
7、释放启动关闭锁
标签:
原文地址:http://www.cnblogs.com/doudouyoutang/p/4567382.html