Flask是一个基于Python开发并且依赖jinja2模板和Werkzeug WSGI服务的一个微型框架,对于Werkzeug本质是Socket服务端,其用于接收http请求并对请求进行预处理,然后触发Flask框架,开发人员基于Flask框架提供的功能对请求进行相应的处理,并返回给用户,如果要返回给用户复杂的内容时,需要借助jinja2模板来实现对模板的处理,即:将模板和数据进行渲染,将渲染后的字符串返回给用户浏览器。
“微”(micro) 并不表示你需要把整个 Web 应用塞进单个 Python 文件(虽然确实可以 ),也不意味着 Flask 在功能上有所欠缺。微框架中的“微”意味着 Flask 旨在保持核心简单而易于扩展。Flask 不会替你做出太多决策——比如使用何种数据库。而那些 Flask 所选择的——比如使用何种模板引擎——则很容易替换。除此之外的一切都由可由你掌握。如此,Flask 可以与您珠联璧合。
默认情况下,Flask 不包含数据库抽象层、表单验证,或是其它任何已有多种库可以胜任的功能。然而,Flask 支持用扩展来给应用添加这些功能,如同是 Flask 本身实现的一样。众多的扩展提供了数据库集成、表单验证、上传处理、各种各样的开放认证技术等功能。Flask 也许是“微小”的,但它已准备好在需求繁杂的生产环境中投入使用。
pip3 install flask
from werkzeug.wrappers import Request, Response @Request.application def hello(request): return Response(‘Hello World!‘) if __name__ == ‘__main__‘: from werkzeug.serving import run_simple run_simple(‘localhost‘, 4000, hello) werkzeug
一. 基本使用
from flask import Flask app = Flask(__name__) @app.route(‘/‘) def hello_world(): return ‘Hello World!‘ if __name__ == ‘__main__‘: app.run()
二、配置文件
flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为: { ‘DEBUG‘: get_debug_flag(default=False), 是否开启Debug模式 ‘TESTING‘: False, 是否开启测试模式 ‘PROPAGATE_EXCEPTIONS‘: None, ‘PRESERVE_CONTEXT_ON_EXCEPTION‘: None, ‘SECRET_KEY‘: None, ‘PERMANENT_SESSION_LIFETIME‘: timedelta(days=31), ‘USE_X_SENDFILE‘: False, ‘LOGGER_NAME‘: None, ‘LOGGER_HANDLER_POLICY‘: ‘always‘, ‘SERVER_NAME‘: None, ‘APPLICATION_ROOT‘: None, ‘SESSION_COOKIE_NAME‘: ‘session‘, ‘SESSION_COOKIE_DOMAIN‘: None, ‘SESSION_COOKIE_PATH‘: None, ‘SESSION_COOKIE_HTTPONLY‘: True, ‘SESSION_COOKIE_SECURE‘: False, ‘SESSION_REFRESH_EACH_REQUEST‘: True, ‘MAX_CONTENT_LENGTH‘: None, ‘SEND_FILE_MAX_AGE_DEFAULT‘: timedelta(hours=12), ‘TRAP_BAD_REQUEST_ERRORS‘: False, ‘TRAP_HTTP_EXCEPTIONS‘: False, ‘EXPLAIN_TEMPLATE_LOADING‘: False, ‘PREFERRED_URL_SCHEME‘: ‘http‘, ‘JSON_AS_ASCII‘: True, ‘JSON_SORT_KEYS‘: True, ‘JSONIFY_PRETTYPRINT_REGULAR‘: True, ‘JSONIFY_MIMETYPE‘: ‘application/json‘, ‘TEMPLATES_AUTO_RELOAD‘: None, } 方式一: app.config[‘DEBUG‘] = True PS: 由于Config对象本质上是字典,所以还可以使用app.config.update(...) 方式二: app.config.from_pyfile("python文件名称") 如: settings.py DEBUG = True app.config.from_pyfile("settings.py") app.config.from_envvar("环境变量名称") 环境变量的值为python文件名称名称,内部调用from_pyfile方法 app.config.from_json("json文件名称") JSON文件名称,必须是json格式,因为内部会执行json.loads app.config.from_mapping({‘DEBUG‘:True}) 字典格式 app.config.from_object("python类或类的路径") app.config.from_object(‘pro_flask.settings.TestingConfig‘) settings.py class Config(object): DEBUG = False TESTING = False DATABASE_URI = ‘sqlite://:memory:‘ class ProductionConfig(Config): DATABASE_URI = ‘mysql://user@localhost/foo‘ class DevelopmentConfig(Config): DEBUG = True class TestingConfig(Config): TESTING = True PS: 从sys.path中已经存在路径开始写 PS: settings.py文件默认路径要放在程序root_path目录,如果instance_relative_config为True,则就是instance_path目录
外部文件中:k=v k要大写
# NAME=‘ctz‘ # PWD=‘123‘ class BaseConfig(object): AA=123456 class TestConfig(BaseConfig): DB=‘127.0.0.1‘ class DevConfig(BaseConfig): DB=‘127.0.0.2‘ class ProConfig(BaseConfig): DB=‘1270.0.0.3‘
from flask import Flask import settings app=Flask(__name__) #方式一 #app.config[‘COOK‘]=‘ctz‘ ‘‘‘ settings.py NAME=‘ctz‘ PWD=‘123‘ ‘‘‘ #方式二 #app.config.from_pyfile(‘settings.py‘) #方式三 # import os # os.environ[‘FLASK_SETTING‘]=‘settings.py‘ # app.config.from_envvar(‘FLASK_SETTING‘) #方式四(推荐使用) app.config.from_object(‘settings.DevConfig‘) @app.route(‘/index‘,methods=[‘POST‘,‘GET‘],strict_slashes=False) def index(): print(app.config) return ‘helloworld‘ if __name__ == ‘__main__‘: app.run(debug=True)
三、路由系统
@app.route(‘/user/<username>‘) @app.route(‘/post/<int:post_id>‘) @app.route(‘/post/<float:post_id>‘) @app.route(‘/post/<path:path>‘) @app.route(‘/login‘, methods=[‘GET‘, ‘POST‘])
常用路由系统有以上五种,所有的路由系统都是基于以下对应关系来处理:
DEFAULT_CONVERTERS = { ‘default‘: UnicodeConverter, ‘string‘: UnicodeConverter, ‘any‘: AnyConverter, ‘path‘: PathConverter, ‘int‘: IntegerConverter, ‘float‘: FloatConverter, ‘uuid‘: UUIDConverter, }
四、视图函数
1.fbv
from flask import Flask # 实例化Flask对象 app = Flask(__name__) # 生成路由关系,并把关系保存到某个地方,app对象的 url_map字段中 @app.route(‘/xxxx‘) # @decorator def index(): return "Index" if __name__ == ‘__main__‘: # 启动程序,监听用户请求 # 一旦请求到来,执行 app.__call__方法 # 封装用户请求 # 进行路由匹配 app.run()
from flask import Flask # 实例化Flask对象 app = Flask(__name__) # def index(): # return "Index" # app.add_url_rule(‘/xxx‘, "n1", index) if __name__ == ‘__main__‘: # 启动程序,监听用户请求 # 一旦请求到来,执行 app.__call__方法 # 封装用户请求 # 进行路由匹配 app.run()
简单使用
from flask import Flask,render_template,request,redirect,session app = Flask(__name__,template_folder=‘templates‘,static_url_path=‘/xxxxxx‘) app.secret_key = "sdfasdfasdf3fsdf" @app.route(‘/login‘,methods=[‘GET‘,"POST"]) def login(): if request.method == ‘GET‘: return render_template(‘login.html‘) else: user = request.form.get(‘user‘) pwd = request.form.get(‘pwd‘) if user == ‘alex‘ and pwd == ‘123‘: session[‘user_info‘] = user return redirect(‘/index‘) else: return render_template(‘login.html‘,msg=‘用户名或密码错误‘) # return render_template(‘login.html‘,**{‘msg‘:‘用户名或密码错误‘}) @app.route(‘/index‘,methods=[‘GET‘]) def index(): if not session.get(‘user_info‘): return redirect(‘/login‘) return "欢迎登录" if __name__ == ‘__main__‘: app.run()
2.cbv
class IndexView(views.View): methods = [‘GET‘] decorators = [auth, ] def dispatch_request(self): print(‘Index‘) return ‘Index!‘ app.add_url_rule(‘/index‘, view_func=IndexView.as_view(name=‘index‘)) # name=endpoint
3.@app.route和app.add_url_rule参数:
rule, URL规则 view_func, 视图函数名称 defaults=None, 默认值,当URL中无参数,函数需要参数时,使用defaults={‘k‘:‘v‘}为函数提供参数 endpoint=None, 名称,用于反向生成URL,即: url_for(‘名称‘) methods=None, 允许的请求方式,如:["GET","POST"] strict_slashes=None, 对URL最后的 / 符号是否严格要求, 如: @app.route(‘/index‘,strict_slashes=False), 访问 http://www.xx.com/index/ 或 http://www.xx.com/index均可 @app.route(‘/index‘,strict_slashes=True) 仅访问 http://www.xx.com/index redirect_to=None, 重定向到指定地址 如: @app.route(‘/index/<int:nid>‘, redirect_to=‘/home/<nid>‘) 或 def func(adapter, nid): return "/home/888" @app.route(‘/index/<int:nid>‘, redirect_to=func) subdomain=None, 子域名访问 from flask import Flask, views, url_for app = Flask(import_name=__name__) app.config[‘SERVER_NAME‘] = ‘wupeiqi.com:5000‘ @app.route("/", subdomain="admin") def static_index(): """Flask supports static subdomains This is available at static.your-domain.tld""" return "static.your-domain.tld" @app.route("/dynamic", subdomain="<username>") def username_index(username): """Dynamic subdomains are also supported Try going to user1.your-domain.tld/dynamic""" return username + ".your-domain.tld" if __name__ == ‘__main__‘: app.run()
4.自定义正则路由匹配
from flask import Flask, views, url_for from werkzeug.routing import BaseConverter app = Flask(import_name=__name__) class RegexConverter(BaseConverter): """ 自定义URL匹配正则表达式 """ def __init__(self, map, regex): super(RegexConverter, self).__init__(map) self.regex = regex def to_python(self, value): """ 路由匹配时,匹配成功后传递给视图函数中参数的值 :param value: :return: """ return int(value) def to_url(self, value): """ 使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数 :param value: :return: """ val = super(RegexConverter, self).to_url(value) return val # 添加到flask中 app.url_map.converters[‘regex‘] = RegexConverter @app.route(‘/index/<regex("\d+"):nid>‘) def index(nid): print(url_for(‘index‘, nid=‘888‘)) return ‘Index‘ if __name__ == ‘__main__‘: app.run() b. 自定制正则路由匹配
5.和Django比较
Django: /index/ func /index/ IndexClass.as_view() Flask: FBV: @app.route(‘/index‘,endpoint=‘xx‘) def index(nid): url_for(‘xx‘,nid=123) return "Index" def index(nid): url_for(‘xx‘,nid=123) return "Index" app.add_url_rule(‘/index‘,index) CBV: def auth(func): def inner(*args, **kwargs): result = func(*args, **kwargs) return result return inner class IndexView(views.MethodView): # methods = [‘POST‘] decorators = [auth,] def get(self): v = url_for(‘index‘) print(v) return "GET" def post(self): return "GET" app.add_url_rule(‘/index‘, view_func=IndexView.as_view(name=‘index‘)) if __name__ == ‘__main__‘: app.run()
五 模板
1、模板的使用
Flask使用的是Jinja2模板,所以其语法和Django无差别
2、自定义模板方法
Flask中自定义模板方法的方式和Bottle相似,创建一个函数并通过参数的形式传入render_template,如:
from flask import Flask,url_for,request,redirect,render_template,jsonify,make_response,Markup from urllib.parse import urlencode,quote,unquote app = Flask(__name__) def test(a1,a2): return a1+a2 @app.template_global() def sb(a1,a2): return a1 + a2 + 100 @app.template_filter() def db(a1, a2, a3): return a1 + a2 + a3 @app.route(‘/index‘,endpoint=‘xx‘) def index(): v1 = "字符串" v2 = [11,22,33] v3 = {‘k1‘:‘v1‘,‘k2‘:‘v2‘} v4 = Markup("<input type=‘text‘ />") return render_template(‘index.html‘,v1=v1,v2=v2,v3=v3,v4=v4,test=test) if __name__ == ‘__main__‘: # app.__call__ app.run()
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>模板</h1> {%block body %} {%endblock%} </body> </html>
{% extends ‘layout.html‘%} {%block body %} {{v1}} <ul> {% for item in v2 %} <li>{{item}}</li> {% endfor %} </ul> {{v2.1}} <ul> {% for k,v in v3.items() %} <li>{{k}} {{v}}</li> {% endfor %} </ul> {{v3.k1}} {{v3.get(‘k1‘)}} {{v4}} <!--{{v4|safe}}--> <h1>{{test(1,19)}}</h1> {{sb(1,2)}} {{ 1|db(2,3)}} {% macro xxxx(name, type=‘text‘, value=‘‘) %} <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> {% endmacro %} {{ xxxx(‘n1‘) }} {%endblock%}
注意:Markup等价django的mark_safe,函数得带()
六、请求和响应
from flask import Flask
from flask import request
from flask import render_template
from flask import redirect
from flask import make_response
app = Flask(__name__)
@app.route(‘/login.html‘, methods=[‘GET‘, "POST"])
def login():
# 请求相关信息
# request.method
# request.args
# request.form
# request.values
# request.cookies
# request.headers
# request.path
# request.full_path
# request.script_root
# request.url
# request.base_url
# request.url_root
# request.host_url
# request.host
# request.files
# obj = request.files[‘the_file_name‘]
# obj.save(‘/var/www/uploads/‘ + secure_filename(f.filename))
# 响应相关信息
# return "字符串"
# return render_template(‘html模板路径‘,**{})
# return redirect(‘/index.html‘)
# response = make_response(render_template(‘index.html‘))
# response是flask.wrappers.Response类型
# response.delete_cookie(‘key‘)
# response.set_cookie(‘key‘, ‘value‘)
# response.headers[‘X-Something‘] = ‘A value‘
# return response
return "内容"
if __name__ == ‘__main__‘:
app.run()
模仿django的request.GET.urlencode
from flask import Flask,url_for,request,redirect,render_template,jsonify,make_response from urllib.parse import urlencode,quote,unquote app = Flask(__name__) @app.route(‘/index‘,endpoint=‘xx‘) def index(): from werkzeug.datastructures import ImmutableMultiDict #模仿django的request.urlencode # get_data = request.args # get_dict = get_data.to_dict() # get_dict[‘xx‘] = ‘18‘ # url = urlencode(get_dict) # print(url) # print(request.query_string) # print(request.args) # val = "%E6%8A%8A%E5%87%A0%E4%B8%AA" # print(unquote(val)) # # return "Index" # return "Index" # return redirect() # return render_template() # return jsonify(name=‘alex‘,age=‘18‘) response = make_response(‘xxxxx‘) response.headers[‘xxx‘] = ‘123123‘ return response if __name__ == ‘__main__‘: # app.__call__ app.run()
七、Session
除请求对象之外,还有一个 session 对象。它允许你在不同请求间存储特定用户的信息。它是在 Cookies 的基础上实现的,并且对 Cookies 进行密钥签名要使用会话,你需要设置一个密钥。
-
设置:session[‘username‘] = ‘xxx‘
- 删除:session.pop(‘username‘, None)
方法 和字典一样
from flask import Flask, session, redirect, url_for, escape, request app = Flask(__name__) @app.route(‘/‘) def index(): if ‘username‘ in session: return ‘Logged in as %s‘ % escape(session[‘username‘]) return ‘You are not logged in‘ @app.route(‘/login‘, methods=[‘GET‘, ‘POST‘]) def login(): if request.method == ‘POST‘: session[‘username‘] = request.form[‘username‘] return redirect(url_for(‘index‘)) return ‘‘‘ <form action="" method="post"> <p><input type=text name=username> <p><input type=submit value=Login> </form> ‘‘‘ @app.route(‘/logout‘) def logout(): # remove the username from the session if it‘s there session.pop(‘username‘, None) return redirect(url_for(‘index‘)) # set the secret key. keep this really secret: app.secret_key = ‘A0Zr98j/3yX R~XHH!jmN]LWX/,?RT‘
from flask import Flask,session,Session from urllib.parse import urlencode,quote,unquote from werkzeug.local import LocalProxy app = Flask(__name__) app.secret_key =‘sdfsdfsdf‘ app.config[‘SESSION_COOKIE_NAME‘] = ‘session_lvning‘ """ ‘SESSION_COOKIE_NAME‘: ‘session‘, ‘SESSION_COOKIE_DOMAIN‘: None, ‘SESSION_COOKIE_PATH‘: None, ‘SESSION_COOKIE_HTTPONLY‘: True, ‘SESSION_COOKIE_SECURE‘: False, ‘SESSION_REFRESH_EACH_REQUEST‘: True, ‘PERMANENT_SESSION_LIFETIME‘: timedelta(days=31) """ @app.route(‘/index‘,endpoint=‘xx‘) def index(): # session本质上操作的是字典,假设session保存在数据库 # session[‘xxx‘] = 123 # session[‘xx1‘] = 123 # session[‘xx2‘] = 123 # session[‘xx3‘] = 123 # del session[‘xx2‘] session[‘xx3‘] = 123 return "xxx" if __name__ == ‘__main__‘: # app.__call__ app.run()
八 闪现
闪现是基于session实现的,当然用session也可以实现闪现的方法,闪现和session的区别就是 闪现用了一次后值就没有了
from flask import Flask,session,Session,flash,get_flashed_messages,redirect,render_template,request app = Flask(__name__) app.secret_key =‘sdfsdfsdf‘ @app.route(‘/users‘) def users(): # msg = request.args.get(‘msg‘,‘‘) # msg = session.get(‘msg‘) # if msg: # del session[‘msg‘] v = get_flashed_messages() print(v) msg = ‘‘ return render_template(‘users.html‘,msg=msg) @app.route(‘/useradd‘) def user_add(): # 在数据库中添加一条数据 # 假设添加成功,在跳转到列表页面时,显示添加成功 # return redirect(‘/users?msg=添加成功‘) # session[‘msg‘] = ‘添加成功‘ flash(‘添加成功‘) return redirect(‘/users‘) if __name__ == ‘__main__‘: app.run()
九 扩展(类似与django的中间件)
from flask import Flask,session,Session,flash,get_flashed_messages,redirect,render_template,request app = Flask(__name__) app.secret_key =‘sdfsdfsdf‘ @app.before_request def process_request1(): print(‘process_request1‘) @app.after_request def process_response1(response): print(‘process_response1‘) return response @app.before_request def process_request2(): print(‘process_request2‘) @app.after_request def process_response2(response): print(‘process_response2‘) return response @app.route(‘/index‘) def index(): print(‘index‘) return ‘Index‘ @app.route(‘/order‘) def order(): print(‘order‘) return ‘order‘ @app.route(‘/test‘) def test(): print(‘test‘) return ‘test‘ if __name__ == ‘__main__‘: app.run()
import pymysql from urllib.parse import urlencode from flask import Flask, render_template, request, session, redirect from utils.pager import Pagination app = Flask(__name__) app.secret_key = ‘ctz123‘ app.config class MysqlCon(object): @classmethod def getCon(cls): con = pymysql.connect(host="localhost", user="root", password="root", database="pro1", charset="utf8") return con VALID_URL = [‘/login‘, ] @app.route(‘/‘) def hello_world(): return ‘Hello World!‘ @app.before_request def peocess_request(): user = session.get(‘user‘) if request.path in VALID_URL: return None if not user: return redirect(‘/login‘) @app.route(‘/login‘, methods=[‘POST‘, ‘GET‘], strict_slashes=False) def login(): if request.method == ‘GET‘: return render_template(‘login.html‘) else: username = request.form.get(‘username‘) pwd = request.form.get(‘pwd‘) con = MysqlCon.getCon() cursor = con.cursor(cursor=pymysql.cursors.DictCursor) sql_user = ‘select * from userinfo where username=%s and pwd=%s‘ cursor.execute(sql_user, [username, pwd]) cursor.close() con.close() user = cursor.fetchone() if user: session[‘user‘] = {‘username‘: username, ‘pwd‘: pwd} return redirect(‘/userlist‘) else: return render_template(‘login.html‘, msg=‘用户名或密码错误‘) @app.route(‘/userlist‘, methods=[‘POST‘, ‘GET‘], strict_slashes=False) def userlist(): con = MysqlCon.getCon() cursor = con.cursor(cursor=pymysql.cursors.DictCursor) sql_list = ‘select * from userinfo‘ cursor.execute(sql_list) userlist = cursor.fetchall() current_page = request.args.get(‘page‘, 1) total_count = len(userlist) cursor.close() con.close() base_url = request.path parmas = request.args.to_dict() pageObj = Pagination(current_page, total_count, base_url, parmas) per_page_list = userlist[pageObj.start:pageObj.end] return render_template(‘list.html‘, userlist=per_page_list, pageObj=pageObj) @app.route(‘/adduser‘, methods=[‘GET‘, ‘POST‘], strict_slashes=False) def addUser(): if request.method == ‘GET‘: return render_template(‘addUser.html‘) else: username = request.form.get(‘username‘) pwd = request.form.get(‘pwd‘) age = request.form.get(‘age‘) email = request.form.get(‘email‘) print(username, pwd, age, email) con = MysqlCon.getCon() cursor = con.cursor(cursor=pymysql.cursors.DictCursor) sql = ‘insert into userinfo(username,pwd,age,email)values(%s,%s,%s,%s)‘ cursor.execute(sql, [username, pwd, age, email]) con.commit() cursor.close() con.close() return redirect(‘/userlist‘) @app.route(‘/edituser/<int:sid>‘, methods=[‘GET‘, ‘POST‘]) def editUser(sid): if request.method == ‘GET‘: con = MysqlCon.getCon() cursor = con.cursor(cursor=pymysql.cursors.DictCursor) sql = ‘select * from userinfo where id=%s‘ cursor.execute(sql, [sid, ]) user = cursor.fetchone() cursor.close() con.close() return render_template(‘editUser.html‘, user=user) else: parmars = urlencode(request.args.to_dict()) con = MysqlCon.getCon() cursor = con.cursor(cursor=pymysql.cursors.DictCursor) username = request.form.get(‘username‘) pwd = request.form.get(‘pwd‘) age = request.form.get(‘age‘) email = request.form.get(‘email‘) sql = ‘update userinfo set username=%s,pwd=%s,age=%s,email=%s where id=%s‘ cursor.execute(sql, [username, pwd, age, email, sid]) con.commit() cursor.close() con.close() return redirect(‘%s?%s‘ % (‘/userlist‘, parmars)) @app.route(‘/deluser/<int:sid>‘, methods=[‘POST‘, ‘GET‘]) def delUser(sid): con = MysqlCon.getCon() cursor = con.cursor(cursor=pymysql.cursors.DictCursor) sql = ‘delete from userinfo where id=%s‘ cursor.execute(sql, [sid, ]) con.commit() cursor.close() con.close() parmars = urlencode(request.args.to_dict()) return redirect(‘%s?%s‘ % (‘/userlist‘, parmars)) if __name__ == ‘__main__‘: app.run(debug=True)