码迷,mamicode.com
首页 > Web开发 > 详细

web.py 学习(二)Worker

时间:2015-06-19 01:31:27      阅读:309      评论:0      收藏:0      [点我收藏+]

标签:

Rocket Server 启动一个线程监听客户端的连接,收到连接将连接放置到队列中。线程池中的Worker会以这个连接进行初始化。Rocket中Worker的基类是:

class Worker(Thread):
    """The Worker class is a base class responsible for receiving connections
    and (a subclass) will run an application to process the the connection """

    def __init__(self,
                 app_info,
                 active_queue,
                 monitor_queue,
                 *args,
                 **kwargs):

        Thread.__init__(self, *args, **kwargs)

        # Instance Variables
        self.app_info = app_info
        self.active_queue = active_queue
        self.monitor_queue = monitor_queue

        self.size = 0
        self.status = "200 OK"
        self.closeConnection = True
        self.request_line = ""
        self.protocol = HTTP/1.1

        # Request Log
        self.req_log = logging.getLogger(Rocket.Requests)
        self.req_log.addHandler(NullHandler())

        # Error Log
        self.err_log = logging.getLogger(Rocket.Errors. + self.getName())
        self.err_log.addHandler(NullHandler())

    def _handleError(self, typ, val, tb):
        if typ == SSLError:
            if timed out in str(val.args[0]):
                typ = SocketTimeout
        if typ == SocketTimeout:
            if __debug__:
                self.err_log.debug(Socket timed out)
            self.monitor_queue.put(self.conn)
            return True
        if typ == SocketClosed:
            self.closeConnection = True
            if __debug__:
                self.err_log.debug(Client closed socket)
            return False
        if typ == BadRequest:
            self.closeConnection = True
            if __debug__:
                self.err_log.debug(Client sent a bad request)
            return True
        if typ == socket.error:
            self.closeConnection = True
            if val.args[0] in IGNORE_ERRORS_ON_CLOSE:
                if __debug__:
                    self.err_log.debug(Ignorable socket Error received...
                                       closing connection.)
                return False
            else:
                self.status = "999 Utter Server Failure"
                tb_fmt = traceback.format_exception(typ, val, tb)
                self.err_log.error(Unhandled Error when serving 
                                   connection:\n + \n.join(tb_fmt))
                return False

        self.closeConnection = True
        tb_fmt = traceback.format_exception(typ, val, tb)
        self.err_log.error(\n.join(tb_fmt))
        self.send_response(500 Server Error)
        return False

    def run(self):
        if __debug__:
            self.err_log.debug(Entering main loop.)

        # Enter thread main loop
        while True:
            conn = self.active_queue.get()

            if not conn:
                # A non-client is a signal to die
                if __debug__:
                    self.err_log.debug(Received a death threat.)
                return conn

            if isinstance(conn, tuple):
                conn = Connection(*conn)

            self.conn = conn

            if conn.ssl != conn.secure:
                self.err_log.info(Received HTTP connection on HTTPS port.)
                self.send_response(400 Bad Request)
                self.closeConnection = True
                conn.close()
                continue
            else:
                if __debug__:
                    self.err_log.debug(Received a connection.)
                self.closeConnection = False

            # Enter connection serve loop
            while True:
                if __debug__:
                    self.err_log.debug(Serving a request)
                try:
                    self.run_app(conn)
                except:
                    exc = sys.exc_info()
                    handled = self._handleError(*exc)
                    if handled:
                        break
                finally:
                    if self.request_line:
                        log_info = dict(client_ip=conn.client_addr,
                                        time=datetime.now().strftime(%c),
                                        status=self.status.split( )[0],
                                        size=self.size,
                                        request_line=self.request_line)
                        self.req_log.info(LOG_LINE % log_info)

                if self.closeConnection:
                    try:
                        conn.close()
                    except:
                        self.err_log.error(str(traceback.format_exc()))

                    break

    def run_app(self, conn):
        # Must be overridden with a method reads the request from the socket
        # and sends a response.
        self.closeConnection = True
        raise NotImplementedError(Overload this method!)

    def send_response(self, status):
        stat_msg = status.split( , 1)[1]
        msg = RESPONSE % (self.protocol,
                          status,
                          len(stat_msg),
                          text/plain,
                          stat_msg)
        try:
            self.conn.sendall(b(msg))
        except socket.timeout:
            self.closeConnection = True
            msg = Tried to send "%s" to client but received timeout error
            self.err_log.error(msg % status)
        except socket.error:
            self.closeConnection = True
            msg = Tried to send "%s" to client but received socket error
            self.err_log.error(msg % status)

    def read_request_line(self, sock_file):
        self.request_line = ‘‘
        try:
            # Grab the request line
            d = sock_file.readline()
            if PY3K:
                d = d.decode(ISO-8859-1)

            if d == \r\n:
                # Allow an extra NEWLINE at the beginning per HTTP 1.1 spec
                if __debug__:
                    self.err_log.debug(Client sent newline)

                d = sock_file.readline()
                if PY3K:
                    d = d.decode(ISO-8859-1)
        except socket.timeout:
            raise SocketTimeout(Socket timed out before request.)
        except TypeError:
            raise SocketClosed(
                SSL bug caused closure of socket.  See 
                "https://groups.google.com/d/topic/web2py/P_Gw0JxWzCs".)

        d = d.strip()

        if not d:
            if __debug__:
                self.err_log.debug(
                    Client did not send a recognizable request.)
            raise SocketClosed(Client closed socket.)

        self.request_line = d

        # NOTE: I‘ve replaced the traditional method of procedurally breaking
        # apart the request line with a (rather unsightly) regular expression.
        # However, Java‘s regexp support sucks so bad that it actually takes
        # longer in Jython to process the regexp than procedurally. So I‘ve
        # left the old code here for Jython‘s sake...for now.
        if IS_JYTHON:
            return self._read_request_line_jython(d)

        match = re_REQUEST_LINE.match(d)

        if not match:
            self.send_response(400 Bad Request)
            raise BadRequest

        req = match.groupdict()
        for k, v in req.iteritems():
            if not v:
                req[k] = ""
            if k == path:
                req[path] = r%2F.join(
                    [unquote(x) for x in re_SLASH.split(v)])

        self.protocol = req[protocol]
        return req

    def _read_request_line_jython(self, d):
        d = d.strip()
        try:
            method, uri, proto = d.split( )
            if not proto.startswith(HTTP) or                     proto[-3:] not in (1.0, 1.1) or                     method not in HTTP_METHODS:
                self.send_response(400 Bad Request)
                raise BadRequest
        except ValueError:
            self.send_response(400 Bad Request)
            raise BadRequest

        req = dict(method=method, protocol=proto)
        scheme = ‘‘
        host = ‘‘
        if uri == * or uri.startswith(/):
            path = uri
        elif :// in uri:
            scheme, rest = uri.split(://)
            host, path = rest.split(/, 1)
            path = / + path
        else:
            self.send_response(400 Bad Request)
            raise BadRequest

        query_string = ‘‘
        if ? in path:
            path, query_string = path.split(?, 1)

        path = r%2F.join([unquote(x) for x in re_SLASH.split(path)])

        req.update(path=path,
                   query_string=query_string,
                   scheme=scheme.lower(),
                   host=host)
        return req

    def read_headers(self, sock_file):
        try:
            headers = dict()
            lname = None
            lval = None
            while True:
                l = sock_file.readline()

                if PY3K:
                    try:
                        l = str(l, ISO-8859-1)
                    except UnicodeDecodeError:
                        self.err_log.warning(
                            Client sent invalid header:  + repr(l))

                if l.strip().replace(\0, ‘‘) == ‘‘:
                    break

                if l[0] in  \t and lname:
                    # Some headers take more than one line
                    lval +=   + l.strip()
                else:
                    # HTTP header values are latin-1 encoded
                    l = l.split(:, 1)
                    # HTTP header names are us-ascii encoded

                    lname = l[0].strip().upper().replace(-, _)
                    lval = l[-1].strip()

                headers[str(lname)] = str(lval)

        except socket.timeout:
            raise SocketTimeout("Socket timed out before request.")

        return headers

Worker继承自Thread,线程启动会执行run方法,在这个方法中做了以下的事情:

1、从连接队列中取得con对象

2、执行方法run_app(con),这个方法由子类实现

Rocket中Worker子类是:

class WSGIWorker(Worker):
    def __init__(self, *args, **kwargs):
        """Builds some instance variables that will last the life of the
        thread."""
        Worker.__init__(self, *args, **kwargs)

        if isinstance(self.app_info, dict):
            multithreaded = self.app_info.get(max_threads) != 1
        else:
            multithreaded = False
        self.base_environ = dict(
            {SERVER_SOFTWARE: self.app_info[server_software],
             wsgi.multithread: multithreaded,
             })
        self.base_environ.update(BASE_ENV)

        # Grab our application
        self.app = self.app_info.get(wsgi_app)

        if not hasattr(self.app, "__call__"):
            raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app))

        # Enable futures
        if has_futures and self.app_info.get(futures):
            executor = self.app_info[executor]
            self.base_environ.update({"wsgiorg.executor": executor,
                                      "wsgiorg.futures": executor.futures})

    def build_environ(self, sock_file, conn):
        """ Build the execution environment. """
        # Grab the request line
        request = self.read_request_line(sock_file)

        # Copy the Base Environment
        environ = self.base_environ.copy()

        # Grab the headers
        for k, v in self.read_headers(sock_file).iteritems():
            environ[str(HTTP_ + k)] = v

        # Add CGI Variables
        environ[REQUEST_METHOD] = request[method]
        environ[PATH_INFO] = request[path]
        environ[SERVER_PROTOCOL] = request[protocol]
        environ[SERVER_PORT] = str(conn.server_port)
        environ[REMOTE_PORT] = str(conn.client_port)
        environ[REMOTE_ADDR] = str(conn.client_addr)
        environ[QUERY_STRING] = request[query_string]
        if HTTP_CONTENT_LENGTH in environ:
            environ[CONTENT_LENGTH] = environ[HTTP_CONTENT_LENGTH]
        if HTTP_CONTENT_TYPE in environ:
            environ[CONTENT_TYPE] = environ[HTTP_CONTENT_TYPE]

        # Save the request method for later
        self.request_method = environ[REQUEST_METHOD]

        # Add Dynamic WSGI Variables
        if conn.ssl:
            environ[wsgi.url_scheme] = https
            environ[HTTPS] = on
            try:
                peercert = conn.socket.getpeercert(binary_form=True)
                environ[SSL_CLIENT_RAW_CERT] =                     peercert and ssl.DER_cert_to_PEM_cert(peercert)
            except Exception:
                print sys.exc_info()[1]
        else:
            environ[wsgi.url_scheme] = http

        if environ.get(HTTP_TRANSFER_ENCODING, ‘‘) == chunked:
            environ[wsgi.input] = ChunkedReader(sock_file)
        else:
            environ[wsgi.input] = sock_file

        return environ

    def send_headers(self, data, sections):
        h_set = self.header_set

        # Does the app want us to send output chunked?
        self.chunked = h_set.get(Transfer-Encoding, ‘‘).lower() == chunked

        # Add a Date header if it‘s not there already
        if not Date in h_set:
            h_set[Date] = formatdate(usegmt=True)

        # Add a Server header if it‘s not there already
        if not Server in h_set:
            h_set[Server] = HTTP_SERVER_SOFTWARE

        if Content-Length in h_set:
            self.size = int(h_set[Content-Length])
        else:
            s = int(self.status.split( )[0])
            if (s < 200 or s not in (204, 205, 304)) and not self.chunked:
                if sections == 1 or self.protocol != HTTP/1.1:
                    # Add a Content-Length header because it‘s not there
                    self.size = len(data)
                    h_set[Content-Length] = str(self.size)
                else:
                    # If they sent us more than one section, we blow chunks
                    h_set[Transfer-Encoding] = Chunked
                    self.chunked = True
                    if __debug__:
                        self.err_log.debug(Adding header...
                                           Transfer-Encoding: Chunked)

        if Connection not in h_set:
            # If the application did not provide a connection header,
            # fill it in
            client_conn = self.environ.get(HTTP_CONNECTION, ‘‘).lower()
            if self.environ[SERVER_PROTOCOL] == HTTP/1.1:
                # HTTP = 1.1 defaults to keep-alive connections
                if client_conn:
                    h_set[Connection] = client_conn
                else:
                    h_set[Connection] = keep-alive
            else:
                # HTTP < 1.1 supports keep-alive but it‘s quirky
                # so we don‘t support it
                h_set[Connection] = close

        # Close our connection if we need to.
        self.closeConnection = h_set.get(Connection, ‘‘).lower() == close

        # Build our output headers
        header_data = HEADER_RESPONSE % (self.status, str(h_set))

        # Send the headers
        if __debug__:
            self.err_log.debug(Sending Headers: %s % repr(header_data))
        self.conn.sendall(b(header_data))
        self.headers_sent = True

    def write_warning(self, data, sections=None):
        self.err_log.warning(WSGI app called write method directly.  This is 
                             deprecated behavior.  Please update your app.)
        return self.write(data, sections)

    def write(self, data, sections=None):
        """ Write the data to the output socket. """

        if self.error[0]:
            self.status = self.error[0]
            data = b(self.error[1])

        if not self.headers_sent:
            self.send_headers(data, sections)

        if self.request_method != HEAD:
            try:
                if self.chunked:
                    self.conn.sendall(b(%x\r\n%s\r\n % (len(data), data)))
                else:
                    self.conn.sendall(data)
            except socket.timeout:
                self.closeConnection = True
            except socket.error:
                # But some clients will close the connection before that
                # resulting in a socket error.
                self.closeConnection = True

    def start_response(self, status, response_headers, exc_info=None):
        """ Store the HTTP status and headers to be sent when self.write is
        called. """
        if exc_info:
            try:
                if self.headers_sent:
                    # Re-raise original exception if headers sent
                    # because this violates WSGI specification.
                    raise
            finally:
                exc_info = None
        elif self.header_set:
            raise AssertionError("Headers already set!")

        if PY3K and not isinstance(status, str):
            self.status = str(status, ISO-8859-1)
        else:
            self.status = status
        # Make sure headers are bytes objects
        try:
            self.header_set = Headers(response_headers)
        except UnicodeDecodeError:
            self.error = (500 Internal Server Error,
                          HTTP Headers should be bytes)
            self.err_log.error(Received HTTP Headers from client that contain
                                invalid characters for Latin-1 encoding.)

        return self.write_warning

    def run_app(self, conn):
        self.size = 0
        self.header_set = Headers([])
        self.headers_sent = False
        self.error = (None, None)
        self.chunked = False
        sections = None
        output = None

        if __debug__:
            self.err_log.debug(Getting sock_file)

        # Build our file-like object
        if PY3K:
            sock_file = conn.makefile(mode=rb, buffering=BUF_SIZE)
        else:
            sock_file = conn.makefile(BUF_SIZE)

        try:
            # Read the headers and build our WSGI environment
            self.environ = environ = self.build_environ(sock_file, conn)

            # Handle 100 Continue
            if environ.get(HTTP_EXPECT, ‘‘) == 100-continue:
                res = environ[SERVER_PROTOCOL] +  100 Continue\r\n\r\n
                conn.sendall(b(res))

            # Send it to our WSGI application
            output = self.app(environ, self.start_response)

            if not hasattr(output, __len__) and not hasattr(output, __iter__):
                self.error = (500 Internal Server Error,
                              WSGI applications must return a list or 
                              generator type.)

            if hasattr(output, __len__):
                sections = len(output)

            for data in output:
                # Don‘t send headers until body appears
                if data:
                    self.write(data, sections)

            if not self.headers_sent:
                # Send headers if the body was empty
                self.send_headers(‘‘, sections)

            if self.chunked and self.request_method != HEAD:
                # If chunked, send our final chunk length
                self.conn.sendall(b(0\r\n\r\n))

        # Don‘t capture exceptions here.  The Worker class handles
        # them appropriately.
        finally:
            if __debug__:
                self.err_log.debug(Finally closing output and sock_file)

            if hasattr(output, close):
                output.close()

            sock_file.close()

# Monolithic build...end of module: rocket/methods/wsgi.py

在runapp中主要执行了下面的代码:

self.environ = environ = self.build_environ(sock_file, conn)
            output = self.app(environ, self.start_response)

 

第一行是读取客户端的请求,并取得所有的请求头。

第二行是调用请求对应的方法,并取得输出。

 

app是worker初始化的时候传入的。

web.py 学习(二)Worker

标签:

原文地址:http://www.cnblogs.com/doudouyoutang/p/4587374.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!