码迷,mamicode.com
首页 > 编程语言 > 详细

python---tornado补充(异步非阻塞)

时间:2018-06-30 14:50:58      阅读:199      评论:0      收藏:0      [点我收藏+]

标签:若是   self   change   control   for   move   main   lock   头信息   

一:正常访问(同一线程中多个请求是同步阻塞状态)

import tornado.ioloop
import tornado.web
import tornado.websocket
import datetime,time

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("main")class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        time.sleep(10)
        self.write("index")

st ={
    "template_path": "template",#模板路径配置
    "static_path":static,
}

#路由映射   匹配执行,否则404
application = tornado.web.Application([
    ("/main",MainHandler),
    ("/index",IndexHandler),
],**st)

if __name__=="__main__":
    application.listen(8080)

    #io多路复用
    tornado.ioloop.IOLoop.instance().start()

我们先访问index,再去访问main,查看情况

技术分享图片

二:使用future模块,实现异步非阻塞

import tornado.ioloop
import tornado.web
import tornado.websocket
import time
from tornado.concurrent import Future

class MainHandler(tornado.web.RequestHandler):
    def get(self):

        self.write("main")class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        future = Future()
        tornado.ioloop.IOLoop.current().add_timeout(time.time()+5,self.done)    #会在结束后为future中result赋值
        yield future

    def done(self,*args,**kwargs):
        self.write("index")
        self.finish()  #关闭请求连接,必须在回调中完成

st ={
    "template_path": "template",#模板路径配置
    "static_path":static,
}

#路由映射   匹配执行,否则404
application = tornado.web.Application([
    ("/main",MainHandler),
    ("/index",IndexHandler),
],**st)

if __name__=="__main__":
    application.listen(8080)

    #io多路复用
    tornado.ioloop.IOLoop.instance().start()

技术分享图片

技术分享图片

三:在tornado中使用异步IO请求模块

import tornado.ioloop
import tornado.web
import tornado.websocket
import time
from tornado.concurrent import Future
from tornado import httpclient
from tornado import gen

class MainHandler(tornado.web.RequestHandler):
    def get(self):

        self.write("main")

    def post(self, *args, **kwargs):
        pass

class IndexHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        http = httpclient.AsyncHTTPClient()
        yield http.fetch("http://www.google.com",self.done)

    def done(self):
        self.write("index")
        self.finish()

    def post(self, *args, **kwargs):
        pass

st ={
    "template_path": "template",#模板路径配置
    "static_path":static,
}

#路由映射   匹配执行,否则404
application = tornado.web.Application([
    ("/main",MainHandler),
    ("/index",IndexHandler),
],**st)

if __name__=="__main__":
    application.listen(8080)

    #io多路复用
    tornado.ioloop.IOLoop.instance().start()

四:请求间交互,使用future

import tornado.ioloop
import tornado.web
import tornado.websocket
from tornado.concurrent import Future
from tornado import gen

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("main")


class IndexHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        future = Future()
        future.add_done_callback(self.done)
        yield future    #由于future中的result中值一直未被赋值,所有客户端一直等待

    def done(self,*args,**kwargs):
        self.write("index")
        self.finish()

st ={
    "template_path": "template",#模板路径配置
    "static_path":static,
}

#路由映射   匹配执行,否则404
application = tornado.web.Application([
    ("/main",MainHandler),
    ("/index",IndexHandler),
],**st)

if __name__=="__main__":
    application.listen(8080)

    #io多路复用
    tornado.ioloop.IOLoop.instance().start()

技术分享图片

 

 我们可以在另一个请求中去为这个future中result赋值,使当前请求返回

技术分享图片

 

import tornado.ioloop
import tornado.web
import tornado.websocket
from tornado.concurrent import Future
from tornado import gen

future = None

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        global future
        future.set_result(None)  #为Future中result赋值
        self.write("main")


class IndexHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        global future
        future = Future()
        future.add_done_callback(self.done)
        yield future    #由于future中的result中值一直未被赋值,所有客户端一直等待

    def done(self,*args,**kwargs):
        self.write("index")
        self.finish()

st ={
    "template_path": "template",#模板路径配置
    "static_path":static,
}

#路由映射   匹配执行,否则404
application = tornado.web.Application([
    ("/main",MainHandler),
    ("/index",IndexHandler),
],**st)

if __name__=="__main__":
    application.listen(8080)

    #io多路复用
    tornado.ioloop.IOLoop.instance().start()

技术分享图片

 五:自定义web框架(同步)

# coding:utf8
# __author:  Administrator
# date:      2018/6/30 0030
# /usr/bin/env python
import socket
from select import select
import re

class HttpResponse(object):
    """
    封装响应信息
    """
    def __init__(self, content=‘‘):
        self.content = content
        ‘‘‘

        ‘‘‘
        self.status = "HTTP/1.1 200 OK"
        self.headers = {}
        self.cookies = {}

        self.initResponseHeader()

    def changeStatus(self,status_code,status_desc):
        self.status = "HTTP/1.1 %s %s"%(status_code,status_desc)

    def initResponseHeader(self):
        self.headers[Content-Type]=text/html; charset=utf-8
        self.headers[X-Frame-Options]=SAMEORIGIN
        self.headers[X-UA-Compatible]=IE=10
        self.headers[Cache-Control]=private, max-age=10
        self.headers[Vary]=Accept-Encoding
        self.headers[Connection]=keep-alive

    def response(self):
        resp_content = None
        header_list = [self.status,]
        for item in self.headers.items():
            header_list.append("%s: %s"%(item[0],item[1]))

        header_str = "\r\n".join(header_list)
        resp_content = "\r\n\r\n".join([header_str,self.content])
        return bytes(resp_content, encoding=utf-8)

class HttpRequest:
    def __init__(self,content):
        """content:用户传递的请求头信息,字节型"""
        self.content = content
        self.header_bytes = bytes()
        self.body_bytes = bytes()

        self.header_str = ""
        self.body_str = ""

        self.header_dict = {}

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize()
        self.initialize_headers()

    def initialize(self):
        data = self.content.split(b"\r\n\r\n",1)
        if len(data) == 1:  #全是请求头
            self.header_bytes = self.content
        else:   #含有请求头和请求体
            self.header_bytes,self.body_bytes = data
        self.header_str = str(self.header_bytes,encoding="utf-8")
        self.body_str = str(self.body_bytes,encoding="utf-8")

    def initialize_headers(self):
        headers = self.header_str.split("\r\n")
        first_line = headers[0].split(" ")
        if len(first_line) == 3:
            self.method,self.url,self.protocol = first_line
        for line in headers[1:]:
            k_v = line.split(":",1)
            if len(k_v) == 2:
                k,v = k_v
                self.header_dict[k] = v


def main(request):
    return "main"

def index(request):
    return "index"

routers = [
    ("/main/",main),
    (/index/,index),
]

def run():
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((127.0.0.1, 8080))
    sock.listen(128)
    sock.setblocking(False)

    inputs = []
    inputs.append(sock)
    while True:
        rlist, wlist, elist = select(inputs, [], [], 0.05)  # http是单向的,我们只获取请求即可
        for r in rlist:
            if r == sock:  # 有新的请求到来
                conn, addr = sock.accept()
                conn.setblocking(False)
                inputs.append(conn)
            else:  # 客户端请求数据
                data = b""
                # 开始获取请求头
                while True:
                    try:
                        chunk = r.recv(1024)
                        data += chunk
                    except BlockingIOError as e:
                        chunk = None
                    if not chunk:
                        break

                # 处理请求头,请求体
                request = HttpRequest(data)
                #1.获取url
                #2.路由匹配
                #3.执行函数,获取返回值
                #4.将返回值发送
                flag = False
                func = None
                for route in routers:
                    if re.match(route[0],request.url):
                        flag = True
                        func = route[1]
                        break
                if flag:
                    result = func(request)
                    response = HttpResponse(result)
                    r.sendall(response.response())
                else:
                    response = HttpResponse("Not Found")
                    response.changeStatus(404,"Not Page")
                    r.sendall(response.response())
                inputs.remove(r)
                r.close()


if __name__ == "__main__":
    run()

技术分享图片

技术分享图片

 

 未实现异步非阻塞

 六:完善自定义web框架(异步)

import socket
from select import select
import re,time

class HttpResponse(object):
    """
    封装响应信息
    """
    def __init__(self, content=‘‘):
        self.content = content
        ‘‘‘

        ‘‘‘
        self.status = "HTTP/1.1 200 OK"
        self.headers = {}
        self.cookies = {}

        self.initResponseHeader()

    def changeStatus(self,status_code,status_desc):
        self.status = "HTTP/1.1 %s %s"%(status_code,status_desc)

    def initResponseHeader(self):
        self.headers[Content-Type]=text/html; charset=utf-8
        self.headers[X-Frame-Options]=SAMEORIGIN
        self.headers[X-UA-Compatible]=IE=10
        self.headers[Cache-Control]=private, max-age=10
        self.headers[Vary]=Accept-Encoding
        self.headers[Connection]=keep-alive

    def response(self):
        resp_content = None
        header_list = [self.status,]
        for item in self.headers.items():
            header_list.append("%s: %s"%(item[0],item[1]))

        header_str = "\r\n".join(header_list)
        resp_content = "\r\n\r\n".join([header_str,self.content])
        return bytes(resp_content, encoding=utf-8)

class HttpRequest:
    def __init__(self,content):
        """content:用户传递的请求头信息,字节型"""
        self.content = content
        self.header_bytes = bytes()
        self.body_bytes = bytes()

        self.header_str = ""
        self.body_str = ""

        self.header_dict = {}

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize()
        self.initialize_headers()

    def initialize(self):
        data = self.content.split(b"\r\n\r\n",1)
        if len(data) == 1:  #全是请求头
            self.header_bytes = self.content
        else:   #含有请求头和请求体
            self.header_bytes,self.body_bytes = data
        self.header_str = str(self.header_bytes,encoding="utf-8")
        self.body_str = str(self.body_bytes,encoding="utf-8")

    def initialize_headers(self):
        headers = self.header_str.split("\r\n")
        first_line = headers[0].split(" ")
        if len(first_line) == 3:
            self.method,self.url,self.protocol = first_line
        for line in headers[1:]:
            k_v = line.split(":",1)
            if len(k_v) == 2:
                k,v = k_v
                self.header_dict[k] = v

class Future:
    def __init__(self,timeout):
        self.result = None
        self.timeout = timeout
        self.start = time.time()

    def add_callback_done(self,callback,request):
        self.callback = callback
        self.request = request

    def call(self):
        if self.result == "timeout":  #超时就不要去获取页面数据,直接返回超时
            return "timeout"
        if self.result:  #若是没有超时,去获取回调数据
            return self.callback(self.request)

def callback(request):
    print(request)
    return "async main"

f = None

def main(request):
    global f
    f = Future(10)
    f.add_callback_done(callback,request)  #设置回调
    return f

def index(request):
    return "index"

def stop(request):
    if f:
        f.result = True
    return "stop"

routers = [
    ("/main/",main),
    (/index/,index),
    (/stop/, stop),  #用于向future的result赋值
]

def run():
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((127.0.0.1, 8080))
    sock.listen(128)
    sock.setblocking(False)

    inputs = []
    async_request_dict = {}
    inputs.append(sock)
    while True:
        rlist, wlist, elist = select(inputs, [], [], 0.05)  # http是单向的,我们只获取请求即可
        for r in rlist:
            if r == sock:  # 有新的请求到来
                conn, addr = sock.accept()
                conn.setblocking(False)
                inputs.append(conn)
            else:  # 客户端请求数据
                data = b""
                # 开始获取请求头
                while True:
                    try:
                        chunk = r.recv(1024)
                        data += chunk
                    except BlockingIOError as e:
                        chunk = None
                    if not chunk:
                        break

                # 处理请求头,请求体
                request = HttpRequest(data)
                #1.获取url
                #2.路由匹配
                #3.执行函数,获取返回值
                #4.将返回值发送
                flag = False
                func = None
                for route in routers:
                    if re.match(route[0],request.url):
                        flag = True
                        func = route[1]
                        break
                if flag:
                    result = func(request)
                    if isinstance(result,Future):  #对于future对象,我们另外做异步处理,不阻塞当前操作
                        async_request_dict[r] = result
                        continue
                    response = HttpResponse(result)
                    r.sendall(response.response())
                else:
                    response = HttpResponse("Not Found")
                    response.changeStatus(404,"Not Page")
                    r.sendall(response.response())
                inputs.remove(r)
                r.close()

        for conn in list(async_request_dict.keys()):  #另外对future对象处理
            future = async_request_dict[conn]
            start = future.start
            timeout = future.timeout
            if (start+timeout) <= time.time():  #超时检测
                future.result = "timeout"
            if future.result:
                response = HttpResponse(future.call())  #获取回调数据
                conn.sendall(response.response())
                conn.close()
                del async_request_dict[conn]  #删除字典中这个链接,和下面inputs列表中链接
                inputs.remove(conn)

if __name__ == "__main__":
    run()

 技术分享图片

技术分享图片

 

 技术分享图片

 

python---tornado补充(异步非阻塞)

标签:若是   self   change   control   for   move   main   lock   头信息   

原文地址:https://www.cnblogs.com/ssyfj/p/9246153.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!