tornado框架——进阶
自定义Session组件
Session
面向对象基础
面向对象中通过索引的方式访问对象,需要内部实现 __getitem__ 、__delitem__、__setitem__方法
class Foo(object):
def __getitem__(self, key):
print ‘__getitem__‘,key
def __setitem__(self, key, value):
print ‘__setitem__‘,key,value
def __delitem__(self, key):
print ‘__delitem__‘,key
obj = Foo()
result = obj[‘k1‘]
#obj[‘k2‘] = ‘wupeiqi‘
#del obj[‘k1‘]
Tornado扩展
Tornado框架中,默认执行Handler的get/post等方法之前默认会执行 initialize方法,所以可以通过自定义的方式使得所有请求在处理前执行操作...
class BaseHandler(tornado.web.RequestHandler):
def initialize(self): # 执行Handler的get/post等方法之前默认会执行
self.xxoo = "wupeiqi"
class MainHandler(BaseHandler):
def get(self):
print(self.xxoo)
self.write(‘index‘)
class IndexHandler(BaseHandler):
def get(self):
print(self.xxoo)
self.write(‘index‘)
session
session其实就是定义在服务器端用于保存用户回话的容器,其必须依赖cookie才能实现。
import config
from hashlib import sha1
import os
import time
create_session_id = lambda: sha1(bytes(‘%s%s‘ % (os.urandom(16), time.time()), encoding=‘utf-8‘)).hexdigest()
class SessionFactory:
@staticmethod
def get_session_obj(handler):
obj = None
if config.SESSION_TYPE == "cache":
obj = CacheSession(handler)
elif config.SESSION_TYPE == "memcached":
obj = MemcachedSession(handler)
elif config.SESSION_TYPE == "redis":
obj = RedisSession(handler)
return obj
class CacheSession:
session_container = {}
session_id = "__sessionId__"
def __init__(self, handler):
self.handler = handler
client_random_str = handler.get_cookie(CacheSession.session_id, None)
if client_random_str and client_random_str in CacheSession.session_container:
self.random_str = client_random_str
else:
self.random_str = create_session_id()
CacheSession.session_container[self.random_str] = {}
expires_time = time.time() + config.SESSION_EXPIRES
handler.set_cookie(CacheSession.session_id, self.random_str, expires=expires_time)
def __getitem__(self, key):
ret = CacheSession.session_container[self.random_str].get(key, None)
return ret
def __setitem__(self, key, value):
CacheSession.session_container[self.random_str][key] = value
def __delitem__(self, key):
if key in CacheSession.session_container[self.random_str]:
del CacheSession.session_container[self.random_str][key]
class RedisSession:
def __init__(self, handler):
pass
class MemcachedSession:
def __init__(self, handler):
pass
分布式Session
import sys
import math
from bisect import bisect
if sys.version_info >= (2, 5):
import hashlib
md5_constructor = hashlib.md5
else:
import md5
md5_constructor = md5.new
class HashRing(object):
"""一致性哈希"""
def __init__(self,nodes):
‘‘‘初始化
nodes : 初始化的节点,其中包含节点已经节点对应的权重
默认每一个节点有32个虚拟节点
对于权重,通过多创建虚拟节点来实现
如:nodes = [
{‘host‘:‘127.0.0.1:8000‘,‘weight‘:1},
{‘host‘:‘127.0.0.1:8001‘,‘weight‘:2},
{‘host‘:‘127.0.0.1:8002‘,‘weight‘:1},
]
‘‘‘
self.ring = dict()
self._sorted_keys = []
self.total_weight = 0
self.__generate_circle(nodes)
def __generate_circle(self,nodes):
for node_info in nodes:
self.total_weight += node_info.get(‘weight‘,1)
for node_info in nodes:
weight = node_info.get(‘weight‘,1)
node = node_info.get(‘host‘,None)
virtual_node_count = math.floor((32*len(nodes)*weight) / self.total_weight)
for i in xrange(0,int(virtual_node_count)):
key = self.gen_key_thirty_two( ‘%s-%s‘ % (node, i) )
if self._sorted_keys.__contains__(key):
raise Exception(‘该节点已经存在.‘)
self.ring[key] = node
self._sorted_keys.append(key)
def add_node(self,node):
‘‘‘ 新建节点
node : 要添加的节点,格式为:{‘host‘:‘127.0.0.1:8002‘,‘weight‘:1},其中第一个元素表示节点,第二个元素表示该节点的权重。
‘‘‘
node = node.get(‘host‘,None)
if not node:
raise Exception(‘节点的地址不能为空.‘)
weight = node.get(‘weight‘,1)
self.total_weight += weight
nodes_count = len(self._sorted_keys) + 1
virtual_node_count = math.floor((32 * nodes_count * weight) / self.total_weight)
for i in xrange(0,int(virtual_node_count)):
key = self.gen_key_thirty_two( ‘%s-%s‘ % (node, i) )
if self._sorted_keys.__contains__(key):
raise Exception(‘该节点已经存在.‘)
self.ring[key] = node
self._sorted_keys.append(key)
def remove_node(self,node):
‘‘‘ 移除节点
node : 要移除的节点 ‘127.0.0.1:8000‘
‘‘‘
for key,value in self.ring.items():
if value == node:
del self.ring[key]
self._sorted_keys.remove(key)
def get_node(self,string_key):
‘‘‘获取 string_key 所在的节点‘‘‘
pos = self.get_node_pos(string_key)
if pos is None:
return None
return self.ring[ self._sorted_keys[pos]].split(‘:‘)
def get_node_pos(self,string_key):
‘‘‘获取 string_key 所在的节点的索引‘‘‘
if not self.ring:
return None
key = self.gen_key_thirty_two(string_key)
nodes = self._sorted_keys
pos = bisect(nodes, key)
return pos
def gen_key_thirty_two(self, key):
m = md5_constructor()
m.update(key)
return long(m.hexdigest(), 16)
def gen_key_sixteen(self,key):
b_key = self.__hash_digest(key)
return self.__hash_val(b_key, lambda x: x)
def __hash_val(self, b_key, entry_fn):
return (( b_key[entry_fn(3)] << 24)|(b_key[entry_fn(2)] << 16)|(b_key[entry_fn(1)] << 8)| b_key[entry_fn(0)] )
def __hash_digest(self, key):
m = md5_constructor()
m.update(key)
return map(ord, m.digest())
"""
nodes = [
{‘host‘:‘127.0.0.1:8000‘,‘weight‘:1},
{‘host‘:‘127.0.0.1:8001‘,‘weight‘:2},
{‘host‘:‘127.0.0.1:8002‘,‘weight‘:1},
]
ring = HashRing(nodes)
result = ring.get_node(‘98708798709870987098709879087‘)
print result
"""
异步非阻塞
基本使用
装饰器 + Future 从而实现Tornado的异步非阻塞
class AsyncHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
future = Future()
future.add_done_callback(self.doing)
yield future
# 或
# tornado.ioloop.IOLoop.current().add_future(future,self.doing)
# yield future
def doing(self,*args, **kwargs):
self.write(‘async‘)
self.finish()
当发送GET请求时,由于方法被@gen.coroutine装饰且yield 一个 Future对象,那么Tornado会等待,等待用户向future对象中放置数据或者发送信号,如果获取到数据或信号之后,就开始执行doing方法。异步非阻塞体现在当在Tornaod等待用户向future对象中放置数据时,还可以处理其他请求。
注意:在等待用户向future对象中放置数据或信号时,此连接是不断开的。
同步阻塞和异步非阻塞对比
同步阻塞
class SyncHandler(tornado.web.RequestHandler):
def get(self):
self.doing()
self.write(‘sync‘)
def doing(self):
time.sleep(10)
异步非阻塞
class AsyncHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
future = Future()
tornado.ioloop.IOLoop.current().add_timeout(time.time() + 5, self.doing)
yield future
def doing(self, *args, **kwargs):
self.write(‘async‘)
self.finish()
httpclient类库
Tornado提供了httpclient类库用于发送Http请求,其配合Tornado的异步非阻塞使用。
import tornado.web
from tornado import gen
from tornado import httpclient
# 方式一:
class AsyncHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self, *args, **kwargs):
print(‘进入‘)
http = httpclient.AsyncHTTPClient()
data = yield http.fetch("http://www.google.com")
print(‘完事‘,data)
self.finish(‘6666‘)
# 方式二:
# class AsyncHandler(tornado.web.RequestHandler):
# @gen.coroutine
# def get(self):
# print(‘进入‘)
# http = httpclient.AsyncHTTPClient()
# yield http.fetch("http://www.google.com", self.done)
#
# def done(self, response):
# print(‘完事‘)
# self.finish(‘666‘)
application = tornado.web.Application([
(r"/async", AsyncHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
基于异步非阻塞和Tornado-MySQL实现用户登录示例
"""
需要先安装支持异步操作Mysql的类库:
Tornado-MySQL: https://github.com/PyMySQL/Tornado-MySQL#installation
pip3 install Tornado-MySQL
"""
import tornado.web
from tornado import gen
import tornado_mysql
from tornado_mysql import pools
POOL = pools.Pool(
dict(host=‘127.0.0.1‘, port=3306, user=‘root‘, passwd=‘123‘, db=‘cmdb‘),
max_idle_connections=1,
max_recycle_sec=3)
@gen.coroutine
def get_user_by_conn_pool(user):
cur = yield POOL.execute("SELECT SLEEP(%s)", (user,))
row = cur.fetchone()
raise gen.Return(row)
@gen.coroutine
def get_user(user):
conn = yield tornado_mysql.connect(host=‘127.0.0.1‘, port=3306, user=‘root‘, passwd=‘123‘, db=‘cmdb‘,
charset=‘utf8‘)
cur = conn.cursor()
# yield cur.execute("SELECT name,email FROM web_models_userprofile where name=%s", (user,))
yield cur.execute("select sleep(10)")
row = cur.fetchone()
cur.close()
conn.close()
raise gen.Return(row)
class LoginHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
self.render(‘login.html‘)
@gen.coroutine
def post(self, *args, **kwargs):
user = self.get_argument(‘user‘)
data = yield gen.Task(get_user, user)
if data:
print(data)
self.redirect(‘http://www.oldboyedu.com‘)
else:
self.render(‘login.html‘)
application = tornado.web.Application([
(r"/login", LoginHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
加入Form组件
1.开源组件——Tyrion
Tyrion是一个基于Python实现的支持多个WEB框架的Form表单验证组件,其完美的支持Tornado、Django、Flask、Bottle Web框架。Tyrion主要有两大重要动能:
- 表单验证
- 生成HTML标签
- 保留上次提交内容
对于表单验证,告别书写重复的正则表达式对用户提交的数据进行验证的工作,从此解放双手,跟着我左手右手一个慢动作...
对于生成HTML标签,不在人工书写html标签,让Tyrion帮你自动创建...
对于保留上次提交内容,由于默认表单提交后页面刷新,原来输入的内容会清空,Tyrion可以保留上次提交内容。
github:https://github.com/WuPeiqi/Tyrion
2.自定义表单验证组件
在Web程序中往往包含大量的表单验证的工作,如:判断输入是否为空,是否符合规则。
html部分
<!DOCTYPE html>
<html>
<head lang="en">
<meta charset="UTF-8">
<title></title>
<link href="{{static_url("commons.css")}}" rel="stylesheet" />
</head>
<body>
<h1>hello</h1>
<form action="/index" method="post">
<p>hostname: <input type="text" name="host" /> </p>
<p>ip: <input type="text" name="ip" /> </p>
<p>port: <input type="text" name="port" /> </p>
<p>phone: <input type="text" name="phone" /> </p>
<input type="submit" />
</form>
</body>
</html>
HTML
python部分
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import tornado.ioloop
import tornado.web
from hashlib import sha1
import os, time
import re
class MainForm(object):
def __init__(self):
self.host = "(.*)"
self.ip = "^(25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}$"
self.port = ‘(\d+)‘
self.phone = ‘^1[3|4|5|8][0-9]\d{8}$‘
def check_valid(self, request):
form_dict = self.__dict__
for key, regular in form_dict.items():
post_value = request.get_argument(key)
# 让提交的数据 和 定义的正则表达式进行匹配
ret = re.match(regular, post_value)
print key,ret,post_value
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.render(‘index.html‘)
def post(self, *args, **kwargs):
obj = MainForm()
result = obj.check_valid(self)
self.write(‘ok‘)
settings = {
‘template_path‘: ‘template‘,
‘static_path‘: ‘static‘,
‘static_url_prefix‘: ‘/static/‘,
‘cookie_secret‘: ‘aiuasdhflashjdfoiuashdfiuh‘,
‘login_url‘: ‘/login‘
}
application = tornado.web.Application([
(r"/index", MainHandler),
], **settings)
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
由于验证规则可以代码重用,所以可以如此定义:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import tornado.ioloop
import tornado.web
import re
class Field(object):
def __init__(self, error_msg_dict, required):
self.id_valid = False
self.value = None
self.error = None
self.name = None
self.error_msg = error_msg_dict
self.required = required
def match(self, name, value):
self.name = name
if not self.required:
self.id_valid = True
self.value = value
else:
if not value:
if self.error_msg.get(‘required‘, None):
self.error = self.error_msg[‘required‘]
else:
self.error = "%s is required" % name
else:
ret = re.match(self.REGULAR, value)
if ret:
self.id_valid = True
self.value = ret.group()
else:
if self.error_msg.get(‘valid‘, None):
self.error = self.error_msg[‘valid‘]
else:
self.error = "%s is invalid" % name
class IPField(Field):
REGULAR = "^(25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}$"
def __init__(self, error_msg_dict=None, required=True):
error_msg = {} # {‘required‘: ‘IP不能为空‘, ‘valid‘: ‘IP格式错误‘}
if error_msg_dict:
error_msg.update(error_msg_dict)
super(IPField, self).__init__(error_msg_dict=error_msg, required=required)
class IntegerField(Field):
REGULAR = "^\d+$"
def __init__(self, error_msg_dict=None, required=True):
error_msg = {‘required‘: ‘数字不能为空‘, ‘valid‘: ‘数字格式错误‘}
if error_msg_dict:
error_msg.update(error_msg_dict)
super(IntegerField, self).__init__(error_msg_dict=error_msg, required=required)
class CheckBoxField(Field):
def __init__(self, error_msg_dict=None, required=True):
error_msg = {} # {‘required‘: ‘IP不能为空‘, ‘valid‘: ‘IP格式错误‘}
if error_msg_dict:
error_msg.update(error_msg_dict)
super(CheckBoxField, self).__init__(error_msg_dict=error_msg, required=required)
def match(self, name, value):
self.name = name
if not self.required:
self.id_valid = True
self.value = value
else:
if not value:
if self.error_msg.get(‘required‘, None):
self.error = self.error_msg[‘required‘]
else:
self.error = "%s is required" % name
else:
if isinstance(name, list):
self.id_valid = True
self.value = value
else:
if self.error_msg.get(‘valid‘, None):
self.error = self.error_msg[‘valid‘]
else:
self.error = "%s is invalid" % name
class FileField(Field):
REGULAR = "^(\w+\.pdf)|(\w+\.mp3)|(\w+\.py)$"
def __init__(self, error_msg_dict=None, required=True):
error_msg = {} # {‘required‘: ‘数字不能为空‘, ‘valid‘: ‘数字格式错误‘}
if error_msg_dict:
error_msg.update(error_msg_dict)
super(FileField, self).__init__(error_msg_dict=error_msg, required=required)
def match(self, name, value):
self.name = name
self.value = []
if not self.required:
self.id_valid = True
self.value = value
else:
if not value:
if self.error_msg.get(‘required‘, None):
self.error = self.error_msg[‘required‘]
else:
self.error = "%s is required" % name
else:
m = re.compile(self.REGULAR)
if isinstance(value, list):
for file_name in value:
r = m.match(file_name)
if r:
self.value.append(r.group())
self.id_valid = True
else:
self.id_valid = False
if self.error_msg.get(‘valid‘, None):
self.error = self.error_msg[‘valid‘]
else:
self.error = "%s is invalid" % name
break
else:
if self.error_msg.get(‘valid‘, None):
self.error = self.error_msg[‘valid‘]
else:
self.error = "%s is invalid" % name
def save(self, request, upload_path=""):
file_metas = request.files[self.name]
for meta in file_metas:
file_name = meta[‘filename‘]
with open(file_name,‘wb‘) as up:
up.write(meta[‘body‘])
class Form(object):
def __init__(self):
self.value_dict = {}
self.error_dict = {}
self.valid_status = True
def validate(self, request, depth=10, pre_key=""):
self.initialize()
self.__valid(self, request, depth, pre_key)
def initialize(self):
pass
def __valid(self, form_obj, request, depth, pre_key):
"""
验证用户表单请求的数据
:param form_obj: Form对象(Form派生类的对象)
:param request: Http请求上下文(用于从请求中获取用户提交的值)
:param depth: 对Form内容的深度的支持
:param pre_key: Html中name属性值的前缀(多层Form时,内部递归时设置,无需理会)
:return: 是否验证通过,True:验证成功;False:验证失败
"""
depth -= 1
if depth < 0:
return None
form_field_dict = form_obj.__dict__
for key, field_obj in form_field_dict.items():
print key,field_obj
if isinstance(field_obj, Form) or isinstance(field_obj, Field):
if isinstance(field_obj, Form):
# 获取以key开头的所有的值,以参数的形式传至
self.__valid(field_obj, request, depth, key)
continue
if pre_key:
key = "%s.%s" % (pre_key, key)
if isinstance(field_obj, CheckBoxField):
post_value = request.get_arguments(key, None)
elif isinstance(field_obj, FileField):
post_value = []
file_list = request.request.files.get(key, None)
for file_item in file_list:
post_value.append(file_item[‘filename‘])
else:
post_value = request.get_argument(key, None)
print post_value
# 让提交的数据 和 定义的正则表达式进行匹配
field_obj.match(key, post_value)
if field_obj.id_valid:
self.value_dict[key] = field_obj.value
else:
self.error_dict[key] = field_obj.error
self.valid_status = False
class ListForm(object):
def __init__(self, form_type):
self.form_type = form_type
self.valid_status = True
self.value_dict = {}
self.error_dict = {}
def validate(self, request):
name_list = request.request.arguments.keys() + request.request.files.keys()
index = 0
flag = False
while True:
pre_key = "[%d]" % index
for name in name_list:
if name.startswith(pre_key):
flag = True
break
if flag:
form_obj = self.form_type()
form_obj.validate(request, depth=10, pre_key="[%d]" % index)
if form_obj.valid_status:
self.value_dict[index] = form_obj.value_dict
else:
self.error_dict[index] = form_obj.error_dict
self.valid_status = False
else:
break
index += 1
flag = False
class MainForm(Form):
def __init__(self):
# self.ip = IPField(required=True)
# self.port = IntegerField(required=True)
# self.new_ip = IPField(required=True)
# self.second = SecondForm()
self.fff = FileField(required=True)
super(MainForm, self).__init__()
#
# class SecondForm(Form):
#
# def __init__(self):
# self.ip = IPField(required=True)
# self.new_ip = IPField(required=True)
#
# super(SecondForm, self).__init__()
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.render(‘index.html‘)
def post(self, *args, **kwargs):
# for i in dir(self.request):
# print i
# print self.request.arguments
# print self.request.files
# print self.request.query
# name_list = self.request.arguments.keys() + self.request.files.keys()
# print name_list
# list_form = ListForm(MainForm)
# list_form.validate(self)
#
# print list_form.valid_status
# print list_form.value_dict
# print list_form.error_dict
# obj = MainForm()
# obj.validate(self)
#
# print "验证结果:", obj.valid_status
# print "符合验证结果:", obj.value_dict
# print "错误信息:"
# for key, item in obj.error_dict.items():
# print key,item
# print self.get_arguments(‘favor‘),type(self.get_arguments(‘favor‘))
# print self.get_argument(‘favor‘),type(self.get_argument(‘favor‘))
# print type(self.get_argument(‘fff‘)),self.get_argument(‘fff‘)
# print self.request.files
# obj = MainForm()
# obj.validate(self)
# print obj.valid_status
# print obj.value_dict
# print obj.error_dict
# print self.request,type(self.request)
# obj.fff.save(self.request)
# from tornado.httputil import HTTPServerRequest
# name_list = self.request.arguments.keys() + self.request.files.keys()
# print name_list
# print self.request.files,type(self.request.files)
# print len(self.request.files.get(‘fff‘))
# obj = MainForm()
# obj.validate(self)
# print obj.valid_status
# print obj.value_dict
# print obj.error_dict
# obj.fff.save(self.request)
self.write(‘ok‘)
settings = {
‘template_path‘: ‘template‘,
‘static_path‘: ‘static‘,
‘static_url_prefix‘: ‘/static/‘,
‘cookie_secret‘: ‘aiuasdhflashjdfoiuashdfiuh‘,
‘login_url‘: ‘/login‘
}
application = tornado.web.Application([
(r"/index", MainHandler),
], **settings)
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
以上内容均转载及整理自武沛齐老师博客