from flask import Flaskapp=Flask(__name__)#给对象起个名字print(__name__)
app.secret_key="用session之前需要加盐,这里的session其实就是存放在cookies 中的"
[ 关于__name__ 1、__name__
如果不是,则为这个模块的名称。 2、__main__
一般作为函数的入口,类似于C语言,尤其在大型工程中,常常有if __name__ == "__main__":
@app.route('/home',methods=["GET","POST"])#默认开启get,post 没有开启def home(): return '你妹' if __name__ == '__main__': app.run()
def __init__( self, import_name, static_url_path=None, #静态文件夹前缀 别名 <img src="/static/mr.jpg"> 修改static_url_path='ccc' <img src="/cccc/.jpg" > static_folder='static'不变 static_folder='static', #静态文件默认地址 static_host=None, host_matching=False, subdomain_matching=False, template_folder='templates',#模板文件 默认存放地址 instance_path=None, instance_relative_config=False, root_path=None):
配置文件:路由系统:视 图:请求相关:响 应:模板渲染:session:闪 现:中 间件:蓝 图:(blueprint)目录结构划分特殊的装饰器:类似于django的中间件
dir() 函数不带参数时,返回当前范围内的变量、方法和定义的类型列表; 带参数时,返回参数的属性、方法列表。如果参数包含方法__dir__(),该方法将被调用。 如果参数不包含__dir__(),该方法将最大限度地收集参数信息。getattr() 函数用于返回一个对象属性值。 import importlibpath="settings.Foo" p,c = path.rsplit('.',maxsplit=1)m = importlib.import_module(p)cls = getattr(m,c) https://www.cnblogs.com/yanxiatingyu/p/9851249.html #如何找到这个类? #for key in dir(cls): if key.isupper(): print(key,getattr(cls,key)) from datetime import timedelta app.config
'DEBUG': False,
'ENV': 'production',
'JSONIFY_MIMETYPE': 'application/json',
'PERMANENT_SESSION_LIFETIME': datetime.timedelta(31), #session 最大的保留时间 print(datetime.timedelta(31))>>31 days, 0:00:00
'SEND_FILE_MAX_AGE_DEFAULT': datetime.timedelta(0, 43200),
'SESSION_COOKIE_NAME': 'session', #session cookies中session的名称
'TESTING': False,
class Config(object):
DEBUG = False
DATABASE_URI = 'sqlite://memory:' ????????????
class ProductionConfig(Config):
DEBUG = True
class TestingConfig(Config):
app.config.from_object("settings.ProductionConfig")#正式版 每个类配置各有不同, 这样可以来回切换
app.config.from_object('settings.class_name')
def from_object(self, obj): if isinstance(obj, string_types): obj = import_string(obj) for key in dir(obj): if key.isupper(): self[key] = getattr(obj, key)
isinstance() 函数来判断一个对象是否是一个已知的类型,类似 type()。
isinstance() 与 type() 区别:
type() 不会认为子类是一种父类类型,不考虑继承关系。
isinstance() 会认为子类是一种父类类型,考虑继承关系。
如果要判断两个类型是否相同推荐使用 isinstance()。
路由系统-endpoint,反向生成url,默认函数名[类似与django里面的name,根据名字反向解析] -url_for('endpoint') 可以直接在另一个视图函数中直接url_for(endpoint_name) 进行反向解析
from flask import url_for@app.route('/index',methods=['GET','POST'],endpoint='n1')def index(): print(url_for('n1')) return 'index' 动态路由
@app.route('/index/<int:nid>',methods=['GET','POST'])def index(nid): url_for return '那你妹'
- @app.route('/user/<username>')
- @app.route('/post/<int:post_id>')
- @app.route('/post/<float:post_id>')
- @app.route('/post/<path:path>')
- @app.route('/login', methods=['GET', 'POST'])
: UnicodeConverter,
: UnicodeConverter,
: AnyConverter,
: PathConverter,
: IntegerConverter,
: FloatConverter,
: UUIDConverter,
定义endpoint=name 直接url_for(name)不定义 默认endpoint=函数名 直接url_for(function_name) 视图: FBV CBV 请求相关数据:
# 请求相关信息
# request.method #当前请求方式
# request.args #GET
# request.form #POST
# request.values
# request.cookies
# request.headers
# request.path #当前访问路径 ???
# request.full_path
# request.script_root
# request.url
# request.base_url
# request.url_root
# request.host_url
# request.host
# request.files #ImmutableMultiDict([('upload_file', <FileStorage: 'QQ图片20180911115431.png' ('image/png')>)])
# obj = request.files['the_file_name'] #<FileStorage: 'QQ图片20180911115431.png' ('image/png')>
# obj.save('/var/www/uploads/' + secure_filename(f.filename))
//判断 请求 类型if request.method == 'GET': print('GET')else: print('POST') 响应相关 form flask import jsonify #内部帮你序列化(类似与django里面的jsonResponse)import json返回值:返回的都是响应体: return "你妹" return json.dumps(dic) return render_template('index.html') return redirect('') 如何 在定制响应头呢?设置cookies from flask import make_responseobj= make_response(jsonify({"test":'消息'}))obj.headers['响应头内容']='哈哈哈'obj.set_cookie('key','value')reurn obj
# coding:utf- from flask import Flask,render_template,request,redirect,url_for from werkzeug.utils import secure_filename import os app = Flask(__name__) @app.route('/upload', methods=['POST', 'GET']) def upload(): if request.method == 'POST': f = request.files['file'] basepath = os.path.dirname(__file__) # 当前文件所在路径 upload_path = os.path.join(basepath, 'static\uploads',secure_filename(f.filename)) #注意:没有的文件夹一定要先创建,不然会提示没有该路径 f.save(upload_path) return redirect(url_for('upload')) return render_template('upload.html') if __name__ == '__main__': app.run(debug=True) <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>文件上传示例</h1> <form action="" enctype='multipart/form-data' method='POST'> <input type="file" name="file"> <input type="submit" value="上传"> </form> </body> </html>
File Upload Sample
@app.route('/uploader',method=['POST']) def uploader(): file= request.files.get('filename') file.save(os.path.join('uploader_folder'),f'{file.txt}') return jsonify({"msg":'ok'}) @app.route('/get_audio/<filename>') def get_audio(filename): file=os.path.join("audio",filename) return send_file(file)
from flask import Flask, request, render_template from geventwebsocket.websocket import WebSocket from geventwebsocket.handler import WebSocketHandler from gevent.pywsgi import WSGIServer import time import json app = Flask(__name__) @app.route('/chat') def chat(): # user_socket = request.environ.get("wsgi.websocket") # type:WebSocket # print(nickname, ":", request.environ.get("REMOTE_ADDR")) # # msg = user_socket.receive() # 接收数据 # print(f"user_name:{nickname}Say: {msg}") # # print(f"是否发送信息成功:{user_socket.send(json.dumps(str(time.time())))}") return render_template('chat.html') @app.route('/ws/<nickname>') def ws(nickname): print('访问') user_socket = request.environ.get("wsgi.websocket") # type: WebSocket print(nickname, ":", request.environ.get("REMOTE_ADDR")) while 1: msg = user_socket.receive() # 接收数据 print(f"user_name:{nickname}Say: {msg or '无消息'}") print(msg) if msg: print(f"是否发送信息成功:{user_socket.send(json.dumps(str(time.time())))}") msg = None if __name__ == '__main__': # app.run(debug=True) http_server = WSGIServer(('localhost', 5005), application=app, handler_class=WebSocketHandler) http_server.serve_forever()
Web socket
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>聊天室</h1> <button type="button" onclick="init_login_chat()">登录</button> <button type="button" onclick="send_msg()">发送</button> <script type="text/javascript"> let nickname = '我是web 网页'; let ws = null; ws = new WebSocket("ws://localhost:5005/ws/" + nickname); ws.onopen = function () { console.log('连接成功'); ws.send('nlsdjflasdj'); console.log('send') }; function init_login_chat() { // ws = new WebSocket("ws://localhost:5005/ws/" + nickname); // ws.onopen = function () { // console.log('连接成功'); // ws.send('nlsdjflasdj'); // console.log('send') // } ws.send('a'); } // console.log(ws); function receive_msg() { ws.onmessage = function (res) { let msg = JSON.parse(res.data); console.log(msg); }; } function send_msg() { // alert('df'); ws.send(JSON.stringify('你妹看看')); } // window.onbeforeunload= function(event) { return confirm("确定离开此页面吗?"); } window.onbeforeunload = function (e) { // ws.onclick = function () { // alert('退出'); // }; return alert('sdfsd'); } </script> </body> </html>
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <p><input type="text" id="nickname"> <button onclick="create_conn()">登录聊天室</button> </p> <p><input type="text" id="content"> <button onclick="send_msg()">发送消息</button> </p> <div id="message" style="width: 300px; border:2px solid red;"> </div> </body> <script type="text/javascript"> var ws = null; var nickname = null; function create_conn() { nickname = document.getElementById("nickname").value; ws = new WebSocket("ws://" + nickname); ws.onopen = function () { alert("欢迎登陆九聊"); }; ws.onmessage = function (data) { var ws_msg = JSON.parse(data.data); create_chat(ws_msg); }; } function create_chat(text, self) { var ptag = document.createElement("p"); if (self == "w") { ptag.style.cssText = "text-align: right"; ptag.innerText = text.msg + ":" + text.sender; } else { ptag.style.cssText = "text-align: left"; ptag.innerText = text.sender + ":" + text.msg; } document.getElementById("message").appendChild(ptag); } function send_msg() { var msg = document.getElementById("content").value; // msg_json = JSON.stringify({"to_user":"","msg":msg}); ws.send(msg_json); create_chat({sender:nickname,msg:msg}, "w"); }; </script> </html>
@app.route('/delete/<int:nid>')def delete(nid): return redirect(url_for('index')) 1from funtools improt wraps装饰器def auth(func): @functools.wraps(func) /@wraps(func) #装饰器修复技术,修复的是文档信息: 不加上这个 原函数内原信息就不会被复制
def inner(*args,**kwargs): if not session.get('user'): return redirect(url_for('login')) return func(*args,kwargs) return inner 2endpoint 默认是函数名,目的是反向生成url 3 装饰器先后顺序 @app.route('login',method=["GET","POST"]) @auth def login(): if request.method == "": pass return render_template('login.html',error="错误信息") 4 before_request@app.before_request#特殊的装饰器def name(): print('每个请求过来都需要先经过它,')#适合给大量 视图函数 添加规则等 if request.path == '/login':#当前路径 return None; if session.get('user'): return None return redirect('/login') from flask import Markup方式一 input=Markup('<input type="text" />')方式二{{ input|safe }} 特殊的装饰器@app.template_global()#注册为全局作用def glo(a,b): return a+b @app.template_filter()def fil(a,b,c): return a+b+c {{ 1|fil(2,3) }} {% extend '.html' %} 继承{% block content-name %} {% endblock %} [宏的使用 {% macro hong_name(name,type='text',value='') %} <h1>宏</h1> <input type="{{ type }}" name="{{ name }} value="{{ value }}"> <input type='submit' value="submit"> {% endmacro %} 宏的调用 {{ hong_name('n1') }} ]模板渲染 -基本数据类型,可以使用python语法 -传入函数: - django:自动执行 - flask:不自动执行 -全局定义函数 @app.template_global() def name(a,b): return a+b @app.template_filter() def name(a,b,c): return a+b+c {{ a|name(b,c) }} session 当请求刚到来,flask 读取cookie中session对应的值,将取出来的值解密并反序列化成字典, 当请求结束时,flask 会读取内存中字典的值,进行序列化+加密,写入到用户的cookies中 闪现from flask import get_flashed_messagesflask 和 特殊装饰器
flask('临时存储数据') #存入临时保存的值print(get_flashed_messages())#取出上面的值,貌似源码 就是pop了一下
flask('临时存储数据','error')flask('sdfsdsdfsd','info')#根据分类来取值,get_flashed_messages(category_filter=['error'])# 取出error type的信息 info就不取了 闪现:在session中存储一个数据,读取时通过pop将数据移除。from flask import get_flashed_messages 中间件 -- call 方法什么时候触发? 用户发起请求是,才执行-- 任务:在执行call方法之前,做一个操作,call 方法执行之后做一个操作 class Middleware(object): def __init__(self,old): self.old=old def __call__(self,*args,**kwargs): ret = self.old(*args,**kwargs) return ret #django 和 flask 的中间件 是有的不同的 有什么不同? # 什么是flask 的中间件? 作用是什么 ? if __name__== "__main__": app.wsgi_app = Middleware(app.wsgi_app) == app.__call__ = Middleware(app.__call__) app.run() 特殊的装饰器 before_request after_request template_global template_filter before_first_request errorhandler @app.errorhandler(404)def not_found(arg): print(arg) return '没找到' @app.before_requestdef x1(): print('before') @app.after_requestdef x2(response): print('after') return reponse before :没有参数、没有返回值after :有参数,有返回值 执行顺序 before ---> view_function ----> after before 先定义 先执行after 先reverse 了 一下,你懂得,所以后定义先执行 @app.before_first_requestdef one(): print('我只执行一次!')
from flask import Flask,requestrequest : request.args get[url地址当中的参数] request.form post[post 请求体中的FormData中的数据] request.values 获取所有参数(url,formdata) request.data request.json request.files {{ }} 非逻辑代码{% %} 逻辑代码 from flask import Flask,render_template,redirect,send_fileapp=Flask(__name__) app.run(debug=True) 等同于app.config['DEBUG']app.config['DEBUG']=True #环境运行 重启代码app.config['TESTING']=True#日志记录 app=Flask(__name__,
) Flask __init__()
def __init__( self, import_name, static_url_path=None, static_folder='static', static_host=None, host_matching=False, subdomain_matching=False, template_folder='templates', instance_path=None, instance_relative_config=False, root_path=None):
第二天 1 路由+视图 2 session 实现原理 3 蓝图 (flask 目录结构的划分) 4 threading.local [上下文管理] 5 上下文管理(第一次) django 和 flask 的区别 ? 相同点:都依赖于wsgi 不同点: django请求相关数据是通过参数一个一个 传递参数传递过来的 flask: 先放在某个地方以后在去值 什么是wsgi? web服务网管接口,就是一个协议,实现该协议的模块: - wsgiref - werkzeug 实现其协议的模块本质上就是socket服务端 用于接收用户请求,并处理。 一般web框架基于wsgi实现,这样实现关注点分离 from wsgiref.simple_server import make_server def run_server(environ,start_response): start_response('200 OK',[('Content-Type','text/html')]) return [bytes('<h1>hello,web!</h1>',encoding='utf-8'),] if __name__=="__main__": httpd = make_server('',8000,RunServer) print("Serving HTTP on port 8000 。。。") httpd.serve_forever() 02 python fullstack s9day116 内容回顾.mp4from werkzeug.wrappers import Responsefrom werkzeug.serving import run_simpleclass Flask(object): def __call__(self,environ,start_response): response = Response('hello') return response(environ,start_response) def run(self): run_simple('',8000,self) app = Flask() if __name__ == '__main__': app.run()
"""A decorator that is used to register a view function for a given URL rule. This does the same thing as :meth:`add_url_rule` but is intended for decorator usage:: @app.route('/') def index(): return 'Hello World' For more information refer to :ref:`url-route-registrations`. :param rule: the URL rule as string :param endpoint: the endpoint for the registered URL rule. Flask itself assumes the name of the view function as endpoint :param options: the options to be forwarded to the underlying :class:`~werkzeug.routing.Rule` object. A change to Werkzeug is handling of method options. methods is a list of methods this rule should be limited to (``GET``, ``POST`` etc.). By default a rule just listens for ``GET`` (and implicitly ``HEAD``). Starting with Flask 0.6, ``OPTIONS`` is implicitly added and handled by the standard request handling. """
def route(self, rule, **options):
def decorator(f): endpoint = options.pop('endpoint', None) self.add_url_rule(rule, endpoint, f, **options) return f return decorator
def tiankong(): print('tiankong !') def priName(arg): print("priName({})".format(arg.__name__)) print("print:{}".format(arg.__name__)) priName(tiankong)
路由设置的两种方式: @app.route('/index')#'/index' 这里是url 和下面的函数名是两个概念 def index(): return "index" def index(): return "index" app.add_url_rule("/index",None, index) url endpoint view_function_name 子域名from flask import Flask,Views,url_for app =Flask(import_name = __name__ )app.config['SERVER_NAME'] = 'wupeiqi.com:5000'#一定要把这里配置加上 #当用户访问admin.wupeiqi.com 的时候以下视图函数被触发 相当于二级域名@app.route('/',subdomain="admin")def static_index(): pass #只要是以http://xxx.wupeiqi.com:5000/ 就能被这里访问@app.route('/dynamic',subdomain="<username>")def username_index(username): return username+'.your~domain.tld' from flask import Flask,viewsapp = Flask(__name__) @app.route('/index',redirect_to='/new')def index():#老系统 pass CBV from flask import views,from funtools import wrapper#装饰器修复技术 class UserView(views.MethodView): methods = ['GET'] # 这里设置 GET / POST 是否允许访问限制 decorators = [wrapper,] #自动执行里面的装饰器 def get(self,*args,**kwargs): return "GET" deg post(self,*args,**kwargs): return 'POST' app.add_url_rule('/user',None,UserView.as_view('')) 自定义正则 from flask import Flask app = Flask(__name__) from werkzeug.routing import BaseConverter class RegexConverter(BaseConverter): '''' 自定义url 匹配正则表达式 ''' def __init__(self,map,regex): super(RegexConverter,slef).__init__(map) self.regex = regex ..... pass app.url_map.converters['reg']=RegexConverter #是注册 还是 添加呢? day30 month9 am10from flask import Flaskapp = Flask(__name__) #先执行 decorator= app.route('/index')[route 返回一个函数 decorator(view_function)]# @decorator @app.route('/index')def index(): return "index" [ 源码 def route(self,rule,**options): def decorator(f): endpoint = options.pop('endpoint',None) self.add_url_rule(rule,endpoint,f,**options) # rule path # endpoint 别名 用于逆向解析 # f function——view # options # self app= Flask(__name__) return f return decorator]# 路由系统的本质就是self.add_url_rule(rule,endpoint,f,**options)# #注册路由还可以这样写 def index(): return '注册路由还可以这样写' app.add_url_rule('/index',None,index) #django 和 flaskdef index(): return '其实就是for循环' routes = [ ('/xxx',index,), ('/...',...,), ....]for route in routes:#不一定对 if len(route)==2: app.add_url_rule(route[0],None,route[1]) app.app_url_rule(**route) [ 源码 @setupmethod def add_url_urle(self,rule,endpoint=None,view_func=None,provide_automatic_options=None,**options): if endpoint is None: endpoint = _endpoint_from_func(view_func) options['endpoint'] = endpoint methods = options.pop('method',None) if methods is None: methods = getattr(view_func,'methods',None) or ('GET',) if isinstance(methods,string_types): raise TypeError('这个错误有点长') methods = set(item.upper() for item in methods ) required_methods = set(getattr(view_func,'required_methods',())) ... 底下还有很长 def _endpoint_from_view_func(view_func): assert view_func is not None,'expected view func if endpoint is not provided.' return view_func.__name__#获取函数名,view_func传进来的是function-view]
route 参数from flask import Flaskapp = Flask(__name__)@app.route(rule='/index',endpoint='',methods=['GET','POST'],default={'k':'v'},)def index(k): return 'sample'
endpoint=None, 名称,用于反向生成url,即url_for('名称') methods=None, 允许请求方式,如:['GET','POST'] 默认只开启GET,所以POST 需要手动设置 rule, URL规则 view_func, 视图函数名称 defaults=None, 默认值,当URL中无参数,函数需要参数时,使用defaults={'k':'value'}为函数提供参数 strict_slashes=None, 对URL最后的 / 符号是否严格要求, 如:www.baidu.com/yanyi 和 www.baidu.com/yanyi/ @app.route('/index',strict_slashes=True) 仅能访问 www.baidu.com/index redirect_to=None 重定向指定地址 如:@app.route('/index/<int:nid>',redirect_to='/home/<nid>') 或 def func(adapter,nid): return '/home/8989' @app.route('/index/<int:nid>',redirct_to=func)subdomain=None 子域名访问 from flask import Flask,views,url_for app =Flask(__name__) #必须先要配置SERVER_NAME app.config['SERVER_NAME'] = 'xiarenwang.com:8888' #本机 #app.config['SERVER_NAME']='localhost:8080' @app.route('/',subdomain='home') def index(): return 'home.xiarenwang.com' #浏览器输入:http://index.localhost:8080/ @app.route('/dynamic',subdomain="<username>") def username_index(username): return username+'your-domain.xx' ip 与 域名的对应关系 修改 windows 下: path:C:\Windows\System32\drivers\etc\hosts 找到host 文件,并且添加以下映射关系 xiarenwang.com index.xiarenwang.com admin.xiarenwang.com
自定义正则from flask import Flaskfrom werkzeug.routing import BaseConverter app = Flask(__name__) # 定制自定义转换器类class RegexConverter(BaseConverter): """ 自定义URL 匹配正则表达式 """ def __init__(self,map,regex):#map 是自动传值的 super(RegexConverter,self).__init__(map) self.regex = regex #在flask 源码里面会自动读取(加载妥当还是读取妥当?)self.regex,会根据你写的正则,来进行匹配 def to_python(self,value): """ 路由匹配时,匹配成功后传递给视图函数中参数的值 """ return int(value) def to_url(self,vlaue):#反向生成url 触发 """ 使用url_for 反向生成URL时,传递的参数经过该方法出来,返回的值用于生成URL中的参数 """ app.url_map.converters['reg'] = RegexConverter #这里给RegexConverter 进行注册#在原有转换器的基础上添加 reg [#添加到DEFAUTL_CONVERTERS字典中#:the deafult converter mapping for the map.DEFAULT_CONVERTERS = { '':'' ... 'reg':RegexConverter}] @app.route('/index/<reg("\d+"):nid>')def index(nid): return "index" #reg 代指的就是RegexConverter这个类。RegexConverter() 实例化##流程 1. 用户发送请求 2. falsk 内部进行正则匹配 3.调用to_python(正则匹配的结果)方法 4.to_python方法的返回值会交给视图函数的参数 @app.route('/index/<reg("\d+"):nid>')def index(nid): print(nid,type(nid)) return "index" from flask import url_for@app.route('/index/<reg("\d+"):nid>')def index(nid): print(url_for('index',nid=8888)) #假如nid是8888,进行反向解析的时候,会先将nid传给to_url, #to_url(value) 这个value 就是nid ,return 的val 就是反向解析的结果 return "index" if __name__ == "__main__": app.run()
app.url_map.converters['xx']= XxConverterconverters = default_converters.copy()default_converters = ImmutableDict(DEFAULT_CONVERTERS)DEFAUTL_CONVERS= { #内置转换器}
Session 实现原理(源码) from flask import Flask#实例化对象app = Flask(__name__) #设置路由app.route('/index')def index(): return "index" if __name__ == "__main__": app.run()#本质就是启动socket服务端 app.__call__ #用户请求到达,会执行__call__ app.wsgi_app app.request_class app.session_interface #设置路由本质: app.url_map() url_map() = Map()#是个Map对象 #保存所有路由关系对象 app.url_map= [ ('/index',index) ] #environ 是请求相关的所有数据[由wsgi做了初步的封装]#start_response 用于设置响应相关数据[ def __call__(self,environ,start_response): return self.wsgi_app(environ,start_response)] __call__ 什么时候触发?为什么触发?(貌似对象执行了()会触发)[ def wsgi_app(self,environ,start_response): ctx = self.request_context(environ) #1.获取environ后,再次将environ 进行封装 #2.从environ中获取名称为session的cookie,解密,反序列化 #3.在 error = None try: try: ctx.push() # 4. 执行视图函数 response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except: error = sys.exc_info()[1] raise return response(environ,start_response) finally: if self.should_ignore_error(error): error = None """ 5. 获取session,加密,序列化 写入cookie 6. 清空数据 """ ctx.auto_pop(error) ]sys.exc_info() ???????? [
def request_context(self, environ): return RequestContext(self, environ)
class RequestContext(object): def __init__(self, app, environ, request=None): self.app = app if request is None: request = app.request_class(environ) self.request = request self.url_adapter = app.create_url_adapter(self.request) self.flashes = None self.session = None .....
if self.session is None: session_interface = self.app.session_interface self.session = session_interface.open_session( self.app, self.request ) if self.session is None: self.session = session_interface.make_null_session(self.app)
#code more from .wrapper import Requestclass Flask(_PackageBoundObject):
request_class = Request ....
session_interface = SecureCookieSessionInterface()
# code more
class SecureCookieSessionInterface(SessionInterface): pass
def open_session(self, app, request): s = self.get_signing_serializer(app) if s is None: return None val = request.cookies.get(app.session_cookie_name) if not val: return self.session_class() max_age = total_seconds(app.permanent_session_lifetime) try: data = s.loads(val, max_age=max_age) return self.session_class(data) except BadSignature: return self.session_class()
class SessionInterface(object): pass
]作业 流程图????
class SessionInterface(object): """The basic interface you have to implement in order to replace the default session interface which uses werkzeug's securecookie implementation. The only methods you have to implement are :meth:`open_session` and :meth:`save_session`, the others have useful defaults which you don't need to change. The session object returned by the :meth:`open_session` method has to provide a dictionary like interface plus the properties and methods from the :class:`SessionMixin`. We recommend just subclassing a dict and adding that mixin:: class Session(dict, SessionMixin): pass If :meth:`open_session` returns ``None`` Flask will call into :meth:`make_null_session` to create a session that acts as replacement if the session support cannot work because some requirement is not fulfilled. The default :class:`NullSession` class that is created will complain that the secret key was not set. To replace the session interface on an application all you have to do is to assign :attr:`flask.Flask.session_interface`:: app = Flask(__name__) app.session_interface = MySessionInterface() .. versionadded:: 0.8 """ #: :meth:`make_null_session` will look here for the class that should #: be created when a null session is requested. Likewise the #: :meth:`is_null_session` method will perform a typecheck against #: this type. null_session_class = NullSession #: A flag that indicates if the session interface is pickle based. #: This can be used by Flask extensions to make a decision in regards #: to how to deal with the session object. #: #: .. versionadded:: 0.10 pickle_based = False def make_null_session(self, app): """Creates a null session which acts as a replacement object if the real session support could not be loaded due to a configuration error. This mainly aids the user experience because the job of the null session is to still support lookup without complaining but modifications are answered with a helpful error message of what failed. This creates an instance of :attr:`null_session_class` by default. """ return self.null_session_class() def is_null_session(self, obj): """Checks if a given object is a null session. Null sessions are not asked to be saved. This checks if the object is an instance of :attr:`null_session_class` by default. """ return isinstance(obj, self.null_session_class) def get_cookie_domain(self, app): """Returns the domain that should be set for the session cookie. Uses ``SESSION_COOKIE_DOMAIN`` if it is configured, otherwise falls back to detecting the domain based on ``SERVER_NAME``. Once detected (or if not set at all), ``SESSION_COOKIE_DOMAIN`` is updated to avoid re-running the logic. """ rv = app.config['SESSION_COOKIE_DOMAIN'] # set explicitly, or cached from SERVER_NAME detection # if False, return None if rv is not None: return rv if rv else None rv = app.config['SERVER_NAME'] # server name not set, cache False to return none next time if not rv: app.config['SESSION_COOKIE_DOMAIN'] = False return None # chop off the port which is usually not supported by browsers # remove any leading '.' since we'll add that later rv = rv.rsplit()[].lstrip('.') if '.' not in rv: # Chrome doesn't allow names without a '.' # this should only come up with localhost # hack around this by not setting the name, and show a warning warnings.warn( '"{rv}" is not a valid cookie domain, it must contain a ".".' ' Add an entry to your hosts file, for example' ' "{rv}.localdomain", and use that instead.'.format(rv=rv) ) app.config['SESSION_COOKIE_DOMAIN'] = False return None ip = is_ip(rv) if ip: warnings.warn( 'The session cookie domain is an IP address. This may not work' ' as intended in some browsers. Add an entry to your hosts' ' file, for example "localhost.localdomain", and use that' ' instead.' ) # if this is not an ip and app is mounted at the root, allow subdomain # matching by adding a '.' prefix if self.get_cookie_path(app) == '/' and not ip: rv = '.' + rv app.config['SESSION_COOKIE_DOMAIN'] = rv return rv def get_cookie_path(self, app): """Returns the path for which the cookie should be valid. The default implementation uses the value from the ``SESSION_COOKIE_PATH`` config var if it's set, and falls back to ``APPLICATION_ROOT`` or uses ``/`` if it's ``None``. """ return app.config['SESSION_COOKIE_PATH'] \ or app.config['APPLICATION_ROOT'] def get_cookie_httponly(self, app): """Returns True if the session cookie should be httponly. This currently just returns the value of the ``SESSION_COOKIE_HTTPONLY`` config var. """ return app.config['SESSION_COOKIE_HTTPONLY'] def get_cookie_secure(self, app): """Returns True if the cookie should be secure. This currently just returns the value of the ``SESSION_COOKIE_SECURE`` setting. """ return app.config['SESSION_COOKIE_SECURE'] def get_cookie_samesite(self, app): """Return ``'Strict'`` or ``'Lax'`` if the cookie should use the ``SameSite`` attribute. This currently just returns the value of the :data:`SESSION_COOKIE_SAMESITE` setting. """ return app.config['SESSION_COOKIE_SAMESITE'] def get_expiration_time(self, app, session): """A helper method that returns an expiration date for the session or ``None`` if the session is linked to the browser session. The default implementation returns now + the permanent session lifetime configured on the application. """ if session.permanent: return datetime.utcnow() + app.permanent_session_lifetime def should_set_cookie(self, app, session): """Used by session backends to determine if a ``Set-Cookie`` header should be set for this session cookie for this response. If the session has been modified, the cookie is set. If the session is permanent and the ``SESSION_REFRESH_EACH_REQUEST`` config is true, the cookie is always set. This check is usually skipped if the session was deleted. .. versionadded:: 0.11 """ return session.modified or ( session.permanent and app.config['SESSION_REFRESH_EACH_REQUEST'] ) def open_session(self, app, request): """This method has to be implemented and must either return ``None`` in case the loading failed because of a configuration error or an instance of a session object which implements a dictionary like interface + the methods and attributes on :class:`SessionMixin`. """ raise NotImplementedError() def save_session(self, app, session, response): """This is called for actual sessions returned by :meth:`open_session` at the end of the request. This is still called during a request context so if you absolutely need access to the request you can do that. """ raise NotImplementedError() session_json_serializer = TaggedJSONSerializer() class SecureCookieSessionInterface(SessionInterface): """The default session interface that stores sessions in signed cookies through the :mod:`itsdangerous` module. """ #: the salt that should be applied on top of the secret key for the #: signing of cookie based sessions. salt = 'cookie-session' #: the hash function to use for the signature. The default is sha1 digest_method = staticmethod(hashlib.sha1) #: the name of the itsdangerous supported key derivation. The default #: is hmac. key_derivation = 'hmac' #: A python serializer for the payload. The default is a compact #: JSON derived serializer with support for some extra Python types #: such as datetime objects or tuples. serializer = session_json_serializer session_class = SecureCookieSession def get_signing_serializer(self, app): if not app.secret_key: return None signer_kwargs = dict( key_derivation=self.key_derivation, digest_method=self.digest_method ) return URLSafeTimedSerializer(app.secret_key, salt=self.salt, serializer=self.serializer, signer_kwargs=signer_kwargs) def open_session(self, app, request): s = self.get_signing_serializer(app) if s is None: return None val = request.cookies.get(app.session_cookie_name) if not val: return self.session_class() max_age = total_seconds(app.permanent_session_lifetime) try: data = s.loads(val, max_age=max_age) return self.session_class(data) except BadSignature: return self.session_class() def save_session(self, app, session, response): domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) # If the session is modified to be empty, remove the cookie. # If the session is empty, return without setting the cookie. if not session: if session.modified: response.delete_cookie( app.session_cookie_name, domain=domain, path=path ) return # Add a "Vary: Cookie" header if the session was accessed at all. if session.accessed: response.vary.add('Cookie') if not self.should_set_cookie(app, session): return httponly = self.get_cookie_httponly(app) secure = self.get_cookie_secure(app) samesite = self.get_cookie_samesite(app) expires = self.get_expiration_time(app, session) val = self.get_signing_serializer(app).dumps(dict(session)) response.set_cookie( app.session_cookie_name, val, expires=expires, httponly=httponly, domain=domain, path=path, secure=secure, samesite=samesite )
蓝图(blueprint):给目录结构 from flask import Flaskapp = Flask(__name__) app # 报错 ValueError: urls must start with a leading slash# 路由注册时 少写了/
@ac.route('login')def login(): return 'login'
以下是小蓝图 例子
from crm import create_app app = create_app() if __name__ == '__main__': app.run()
crm folder
from flask import Flask from .views.account import ac from .views.user import uc def create_app(): app = Flask(__name__) @app.before_request def filter(): print('过滤全局!全局方法') app.register_blueprint(ac,url_prefix='/api')#加前缀,url_prefix='/api' #浏览器输入http:// app.register_blueprint(uc) return app # 过滤全局!全局方法 # /Oct/ ::] - # 过滤局部!uc方法 # 过滤全局!全局方法 # /Oct/ ::] - # 在蓝图里面可以全部加特殊装饰器,也可以是局部的 #例子:before_request #这个 项目目录结构 是小蓝图的例子[小型应用程序] #大蓝图 就和django 基本上一样了[大型应用程序] #
from flask import Blueprint uc = Blueprint('uc',__name__) @uc.before_request def filter(): print('过滤局部!uc方法') @uc.route('/list') def list(): return 'List' @uc.route('/detail') def detail(): return 'detail'
#账户相关 from flask import Blueprint,render_template ac = Blueprint('ac',__name__,template_folder='log')#设置模板文件夹,优先在templates里面找模板文件,找不到再到template_folder 设置的文件夹里面去找 # ac = Blueprint('ac',__name__,template_folder='log',static_url_path='') #还可以设置静态文件 什么鬼? 如何用的? @ac.route('/login') def login(): # return render_template('/log/login.html')#这样可以访问的 return render_template('login.html') @ac.route('/logout') def logout(): return 'logout'
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> 用户登录 </body> </html>
thread.local 数据隔离 防止数据混乱
import threadingfrom threading import localimport time
obj = local() def task(i): obj.xxxx = i time.sleep(1.5) print(obj.xxxx, i) class o(local): x = 0 def walk(i): o.x = i time.sleep(2) print(o.x, i) for i in range(10): t = threading.Thread(target=task, args=(i,)) # t=threading.Thread(target=walk,args=(i,)) t.start() # 另外一种线程隔离的方式 dic = {}def task(i): # 给每个线程开辟独立空间 ident = threading.get_ident() # 获取线程id if ident in dic: dic[ident]['xxx'] = i else: dic[ident] = {'xxx': i} import greenlet # 为每个协程开辟空间,做数据隔离def walk(i): ident = greenlet.getcurrent() # 获取协程的id if ident in dic: dic[ident]['xxx'] = i else: dic[ident] = {'xxx': i}
class Local(object): DIC = { } def __getattr__(self, item):#获取属性值的时候被触发 print('__getattr__',item) ident = threading.get_ident() if ident in self.DIC: return self.DIC return None def __setattr__(self, key, value):#设置属性值的时候被触发 ident =threading.get_ident() if ident in self.DIC: self.DIC[ident][key]=value else: self.DIC[ident]={key:value}
通过这个自定义 Local 可以实现相同的功能 在进行升级 改造一下
import threadingtry: import greenlet get_ident = greenlet.getcurrentexcept Exception as e: get_ident =threading.get_ident class Loca(object): DIC ={} def __getattr__(self, item): ident = get_ident() if ident in self.DIC: return self.DIC[ident].get(item) return None def __setattr__(self, key, value): ident = get_ident() if ident in self.DIC: self.DIC[ident][key] = value else: self.DIC[ident] ={key:value}
内容回顾day 117 回顾 day 1161.django 和 flask 的区别? 不同点:request 操作的机制 共同点:都有视图、路由、都是基于wsgi 协议 flask:扩建性高 2. flask 提供的组件: - 配置文件: - 使用 - 原理 - 路由系统: [过装饰器实现] [可以不通过装饰器实现] - 书写方式 - 参数 - 重定向 - 子域名 - 路由本质 装饰器 + 闭包 实现的 - 视图 - CBV class _ view - 蓝图 - 作用: - 目录结构的划分 - 前缀 - 应用特殊装饰器 - 特殊装饰器 before_request after_request [内部做了一个反转] 貌似有六个 ??? 3. session 实现原理 ???
s9day117前戏: -偏函数:自动帮你传参数 import functools def index(a1,a2): return a1+a2 #new_func = functions.partial(index,“自动帮你传递的参数”) ret = index(1,2) print(ret)
new_func = functions.partial(index,“自动帮你传递的参数”)
ret = new_func(‘自己需要传递的参数’) print(ret) - 面向对象 - 执行父类的方法 class Base(object): def func(self): print('Base.func') class Foo(Base): def func(self): #方式一:根据mro的顺序执行方法 super(Foo,self).func() #方式二:主动执行Base类的方法 Base.func(self) print('Foo.func') #注意super 和 Base 两者之间的区别 obj = Foo() obj.func() class Base(object): def func(self): super(Base,self).func() print('Base.func') class Bar(object): def func(self): print('Bar.func') class Foo(Base,Bar): pass #sample - 1obj = Foo()obj.func()print(Foo.__mro__) #smaple - 2 #报错obj = Base()obj.func() 根据smaple -1 和 sample -2 super 不是找父类的而是根据mro顺序找 class Foo(object): def __init__(self): self.storage = {} def __setattr__(self,key,value): print('self.storage') #这样写直接会报错 #注意执行顺序 和 调用 顺序 #以及属性的存在周期 class Foo(object): def __init__(self): object.__setattr__(self,'storage',{}) def __setattr__(): print(self.storage) #这样调用没毛病
栈:先进先出 [类似弹夹]class Stack(object): def __init__(self): self.data = [] def push(self,val): self.data.append(val) def pop(self):#删除 return self.data.pop() def top(self):#每次都取最后一个 return self.data[-1] 或 if len(data)== 0: return None return self.data[len(data)-1]
_statck = Stack()_stack.push('a')_stack.push('b')
Local 对象 """ { ident:{} } """ from threading import get_ident class Local(object): def __init__(self): object.__setattr__(self,'storage',{}) def __setattr__(self,key,value): ident = get_ident()#获取唯一标记 if ident not in self.storage: self.storage[ident] = { key : value } else: self.storage[ident][key] = value def __getattr__(self,item): ident = get_ident() if ident in self.storage: return returnn self.storage[ident].get(item) #return None try: from greenlet import getcurrent as get_identexcept: from threading import get_ident
from threading import get_ident class Local(object): def __init__(self): object.__setattr__(self,'storage',{}) def __setattr__(self,key,value): ident = get_ident()#获取唯一标记 if ident not in self.storage: self.storage[ident] = { key : value } else: self.storage[ident][key] = value def __getattr__(self,item): ident = get_ident() if ident in self.storage: return returnn self.storage[ident].get(item) #return None
Local 对象
from flask import globals
# context locals_request_ctx_stack = LocalStack() #在LocalStack() 类 文件上面就是Local()
class Local(object): __slots__ = ('__storage__', '__ident_func__') def __init__(self): object.__setattr__(self, '__storage__', {}) object.__setattr__(self, '__ident_func__', get_ident) def __iter__(self): return iter(self.__storage__.items()) def __call__(self, proxy): """Create a proxy for a name.""" return LocalProxy(self, proxy) def __release_local__(self): self.__storage__.pop(self.__ident_func__(), None) def __getattr__(self, name): try: return self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name) def __setattr__(self, name, value): ident = self.__ident_func__() storage = self.__storage__ try: storage[ident][name] = value except KeyError: storage[ident] = {name: value} def __delattr__(self, name): try: del self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name)
s9day118 代码统计 pymysql 数据库连接池 DBUtils 初步认识:SQLAlchemy [flask 最有价值的就是上下文管理 ]内容回归: falsk : 路由 视图 蓝图 上下文管理:request __call__ 请求进来执行__call__方法为什么执行__call__方法,因为wsgi
代码统计 [shutil ???????
class FileStorage(object):
def save(self, dst, buffer_size=16384): from shutil import copyfileobj close_dst = False if isinstance(dst, string_types): dst = open(dst, 'wb') close_dst = True try: copyfileobj(self.stream, dst, buffer_size) finally: if close_dst: dst.close() shutil.py
def copyfileobj(fsrc, fdst, length=16*1024): """copy data from file-like object fsrc to file-like object fdst""" while 1: buf = fsrc.read(length) if not buf: break fdst.write(buf) ] from flask import Flaskapp = Flask(__name__)app.config['MAX_CONTENT_LENGTH'] = 1024*1024*7 #控制文件上传大小为7m #假如用户上传文件大小超过7m 会报413的错误 @index.route('/upload',methods=["GET","POST"])def upload(): if request.method == "GET": return render_template('upload.html') #from werkzeug.datastructures import FileStorage file_obj = request.fiels.get('code') #print(type(file_obj)) #print(file_obj.filename)#文件名称 #print(file_obj.stream)#文件内容 name_ext = file_obj.filename.rsplite('.',maxsplit=1) if len(name_exit) != 2:#说明没有后缀名 return '请上传zip压缩文件' if name_ext[1] != "zip": return "请上传zip压缩文件" file_path = os.path.join('files',file_obj.filename)#处理保存路径 #从file_obj.stream 中读取内容,在写入文件 file_obj.save(file_path) #保存文件 return “上传成功”
解压zip文件import shutil#通过open打开压缩文件,读取内容在进行解压shutil._unpack_zipfile("需要解压的文件路径","解压之后需要存放的路径")
文件上传与文件解压3接收用户上传文件,并解压到指定目录import shutilshutil._unpack_zipfile(file_obj.stream,'解压目录')#名称需要做唯一性处理import uuidtarget_path = os.path.join('files',uuid.uuid4())#或着每个文件都有唯一性的目录
目录的遍历#listdir 当前目录所有文件夹for item in os.listdir(target_path): print(item) total_num = 0 #总行数for base_path ,folder_list,file_list in os.walk(target_path): for file_name in file_list: file_path = os.path.join(base_path,file_name) #单个文件的完整路径 file_ext = file_path.rsplit('.',maxsplit=1) if len(file_exit) != 2: continue if file_ext[1] != 'py': continue file_num = 0 with open(file_path,'rb') as f: for line in f: #空格 换行 line = line.strip()# if not line: continue if line.startswith(b'#'):#注释 continue #第三中情况 """ 三引号 file_num += 1 print(file_num,file_path) total_num += file_num strip() ??????????
pymysql 操作数据库helper.py
import pymysqlfrom ..settings import Configdef connect(): """ 创建连接 :return: """ conn = Config.POOL.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) return conn,cursor def connect_close(conn,cursor): """ 释放 连接 :param conn: :param cursor: :return: """ cursor.close() conn.close() def fetch_all(sql,args): """ 获取所有select * :param sql: :param args: :return: """ conn,cursor = connect() cursor.execute(sql,args) record_list = cursor.fetchall() connect_close(conn,cursor) return record_list def fetch_one(sql,args): """ 返回符合sql的第一条数据 :param sql: :param args: :return: """ conn,cursor = connect() cursor.execute(sql,args) result = cursor.fetchone() connect_close(conn,cursor) return result def insert(sql,args): """ 返回受影响的行数,插入数据 :param sql: :param args: :return: """ conn,cursor = connect() row = cursor.execute(sql,args) conn.commit() connect_close(conn,cursor) return row
from DBUtils.PooledDB import PooledDB,SharedDBConnectionimport pymysql# 加盐CONFIG_KEYS = { "SALE": bytes('小鸡顿蘑菇', encoding='utf-8')} class Config(object): SALT = bytes('小鸡顿蘑菇', encoding='utf-8') #设置password keys SECREY_KEY = "passlslk。、dflsadk fsaldk" #设置session keys MAX_CONTENT_LENGTH = 1024*1024*10 #设置上传最大值 POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='', port=3306, user='root', password='', database='flask', charset='utf8mb4' )
s9day119 内容回归-. django 和 falsk 的认识-. Flask 基础: 配置文件 app.config.form_object()#两种方式 反射+ importlib 路由系统 装饰器 @app.route() 参数 url endpoint url_for 默认情况下 等于视图函数名 methods GET/POST [] list 等 自定义加装饰器: functools.wraps endpoint 默认是函数名 写路由的两种方式 装饰器:@app.route() add_url_rule() 自定义支持正则的url session: 实现原理:以加密的形式存到了浏览器的cookies里面 蓝图: 目录结构的划分 前缀 特殊装饰器: - 上下文管理 threading.local 数 据隔离[给每个线程开辟内存空间,使得线程之间进行数据隔离] 应用:DBUtils 中为每个线程创建一个数据库连接时使用 面向对象的特殊方法: getattr setattr delattr 偏函数 单例模式 请求上下文的流程: 源码流程: 请求到来执行 __call__ wsgi_app() ctx = RequestContext(): 封装= 请求数据 + 空session ctx.push():将ctx 传给LocalStack对象,LocalStack再将数据传给Local 存储起来 问题:Lcoal 中是如何存储? __storage__={ ident:{} } 问题:LocalStack 作用? ident:{stack:[ctx]} -. 数据库 前端 bootstramp 什么是响应式布局? 根据窗口的不同,显示不同的效果 python 里面那些比较重要: 生成器、 ?????? 迭代器、 ?????? 装饰器、?????? 反射 、 ????? 上下文管理:LocalProxy上下文管理: 请求上下文:request App 上下文:app/g第三方组件:wtforms 使用 原理