Flask day4 flask-session、dbutils数据库连接池,wtforms基础操作
内容回顾
1.jdango和flask框架的认识?
-对于请求的处理方式不一样:django是直接以参数的形式传入,flask则是上下文管理的机制
2.flask中上下文管理机制?
-当一个请求过来之后
先通过wsgi处理
然后会执行app.__call__方法
该方法又会执行wsgi_app方法
会创建一个ctx而ctx又是一个RequestContext的对象里面含有request和session
之后会通过localstack将ctx放入local中以{线程id号:{'stactk':[ctx]}形式存储
取值时是基于localproxy类再调用一个函数让localstack从local中取出ctx中的request或者session
3.为什么把请求放到RequestContext中?
-ctx = RequestContext()——> request, session
对于请求数据request和session会经常用放一起便于导入
4.Local对象的作用?
-看过Local源码,和threading.local相似,但是又有不同
-Local中可以基于协程greenlet获取唯一标识,粒度更细。
5.LocalStack对象的作用?
-对Local对象中的数据进行操作。
-将Local对象中的数据维护成了一个栈(先进后出)
-local = {
1231:{stack:[ctx,ctx...]}
}
6.上下文管理?
-请求上下文:request/session
-APP和g:app/g
7.什么是g?
-一次请求周期内的全局变量在reques_before时定义,后面可以直接调用
8.获取session/g
-LocalProxy
9.技术:
-反射
-面向对象,封装
-__dict__
-线程(threading.local)
-笔试:自己写一个类+列表实现栈。(LocalStack实例)
10.实例化的类可被for循环
-该类的__iter__方法返回一个可迭代对象
class Foo:
def __iter__(self):
# 返回迭代器
# return iter([11, 22, 33, 'xx'])
# 返回生成器(特殊的迭代器)
yield 11
yield 22
yield 33
yield 'xx'
obj = Foo()
for i in obj:
print(i)
>>>>
11
22
33
xx
今日内容
1.flask-session
将session改为redis session
from flask import Flask, request, session
from flask_session import RedisSessionInterface
from redis import Redis
# 默认方式将session存入cookie返回到浏览器保存
# from flask.sessions import SecureCookieSessionInterface
# app.session_interface = SecureCookieSessionInterface()
# 方式一:redis保存session可以数据保存在后台
app.session_interface = RedisSessionInterface(
redis=Redis(host='localhost', port=6379),
key_prefix='flask_xx' # session中随即字符串的 flask_xx+uuid4
)
# 方式二:redis保存session
from flask_session import Session
from redis import Redis
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='localhost', port=6379)
Session(app)
修改session内层数据的两种方式
home.py
from flask import Blueprint, session
home = Blueprint('home', __name__)
@home.route('/index')
def index():
# session['user_info'] = {'k1': 1, 'k2': 2} # 会调用__setitem__方法里面会执行on_update将modified改为True
user_info = session.get('user_info')
print("原来的值", user_info)
session['user_info']['k1'] = 777 # 只执行了__getitem__方法此时modified还是为False,所有不会修改cookie中的session
user_info = session.get('user_info')
print("修改后的值", user_info)
# 方式一:手动将modified改为True
session['modified'] = True
# 方式二:将SESSION_REFRESH_EACH_REQUEST在配置中设置为True(这个还能刷新session的超时时间)、同时还要在登录成功之后还要让session.permanent = True
return 'index'
@home.route('/test')
def test():
user_info = session.get('user_info')
print(user_info)
return 'test'
account.py
from flask import Blueprint, redirect, render_template,request,session
from uuid import uuid4
account = Blueprint('account', __name__)
@account.route('/login', methods=['GET', 'POST'])
def login():
error=''
if request.method == 'POST':
user = request.form.get('user')
pwd = request.form.get('pwd')
if user=='lem' and pwd == '123':
uid = uuid4()
session.permanent = True
session['user_info'] = {'id':uid, 'name':user}
return redirect('/index')
else:
error = '用户名或者密码错误'
return render_template('login.html', error=error)
settings.py
from datetime import timedelta
class Config:
DEBUG = True
SECRET_KEY = 'lem' # session加密的盐
PERMANENT_SESSION_LIFETIME = timedelta(minutes=30) # 超时时间30分钟失效
SESSION_REFRESH_EACH_REQUEST = True # 每次更新session重复保存以保证session超时时间改变
class DevelopmentConfig(Config):
pass
class ProductionConfig(Config):
pass
class TestingConfig(Config):
pass
总结flask_session:
作用:将默认保存的签名cookie中的值保存到 redis/memcached
应用:
-方式一:
-配置
-app.config['SESSION_TYPE'] = 'redis'
-app.config['SESSION_REDIS'] = Redis(host='localhost', port=6379)
-替换
-from flask_session import Session
-Session(app)
-方式二:
-from flask_session import RedisSessionInterface
-app.session_interface = RedisSessionInterface(redis=Redis(host='localhost',port=6379), key_prefix='flask_xx' )
注意:session中存储的是字典,修改字典内部元素时,会造成数据不更新。
-1.modified=True
-2.SESSION_REFRESH_EACH_REQUEST = True and session.permanent = True
2.数据库连接池:DBUtils(pymysql)
了解DBUtils
模式:
-每个线程创建一个链接,关闭(默认不关闭),线程终止时,才关闭链接
-创建共享连接池
应用:
-只要写原生SQL,就要用数据库链接池
import pymysql
from dbutils.pooled_db import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用连接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,连接池中至少创建的空闲的连接,0表示不创建
maxcached=5, # 连接池中最多闲置的连接,0和None不限制
# 连接池中最多共享的连接数量,0和None表示全部共享.PS:无用,因为pymysql和MYSQLdb等模块的threadsafety = 1,而threadsafety=1时_maxshared默认为0,所有永远都是所有连接都共享
maxshared=3,
blocking=True, # 连接池中如果没有可用的连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个连接池最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to...", "set time zone..."]
# ping Mysql服务端,检查是否服务可用。如:0=None=never,1=default=whenever it is requested,2=when a cursor is created, 4=when a query is executed,7=always
ping=0, # 工作中一般都是0/4/7
host='localhost',
port=3306,
user='root',
password='123456',
database='flask',
charset='utf8'
)
def func():
"""
检测当前正在运行连接数是否小于最大连接数,如果不小于则:等待或者抛出raise TooManyConnections异常
否则优先去初始化时创建的连接中获取连接,SteadyDBConnection
然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回
如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回
一旦关闭链接后,连接就返回到连接池让后续线程继续使用
:return:
"""
conn = POOL.connection()
# print(th, "连接被拿走了", conn1._con)
# print(th, "池子里目前有", pool.idle_cache,'\r\n')
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute("select * from user")
res = cursor.fetchall()
conn.close()
return res
用于flask中
配置数据库的参数
class Config:
DEBUG = True
SECRET_KEY = 'lem' # session加密的盐
PERMANENT_SESSION_LIFETIME = timedelta(minutes=30) # 超时时间30分钟失效
SESSION_REFRESH_EACH_REQUEST = True # 每次更新session重复保存以保证session超时时间改变
SESSION_TYPE = 'redis'
class DevelopmentConfig(Config):
ENV = 'development'
SESSION_REDIS = Redis(host='192.168.11.213', port=6379)
SESSION_KEY_PREFIX='lem'
PYMYSQL_HOST = "localhost"
PYMYSQL_PORT = 3306
PYMYSQL_USER = 'root'
PYMYSQL_PASSWORD = '123456'
PYMYSQL_DATABASE = 'flask'
PYMYSQL_CHARSET = 'utf8'
将数据里连接池放到app.config
类似于redis(注意app创建时间和连接池创建时调用app的配置信息的顺序)
pool.py
import pymysql
from dbutils.pooled_db import PooledDB
def init_pool(app):
POOL = PooledDB(
creator=pymysql, # 使用连接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,连接池中至少创建的空闲的连接,0表示不创建
maxcached=5, # 连接池中最多闲置的连接,0和None不限制
# 连接池中最多共享的连接数量,0和None表示全部共享.PS:无用,因为pymysql和MYSQLdb等模块的threadsafety = 1,而threadsafety=1时_maxshared默认为0,所有永远都是所有连接都共享
maxshared=3,
blocking=True, # 连接池中如果没有可用的连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个连接池最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to...", "set time zone..."]
# ping Mysql服务端,检查是否服务可用。如:0=None=never,1=default=whenever it is requested,2=when a cursor is created, 4=when a query is executed,7=always
ping=0, # 工作中一般都是0/4/7
# 读取app的配置文件中的参数
host=app.config['PYMYSQL_HOST'],
port=app.config['PYMYSQL_PORT'],
user=app.config['PYMYSQL_USER'],
password=app.config['PYMYSQL_PASSWORD'],
database=app.config['PYMYSQL_DATABASE'],
charset=app.config['PYMYSQL_CHARSET']
)
# 给app的上面的POOL添加到app的config中
app.config['PYMYSQL_POOL'] = POOL
__init__.py
from flask import Flask
from .views import account, home
import settings
from flask_session import Session
from apps.utils.mysql_pool import init_pool
def create_user():
app = Flask(__name__)
app.config.from_object(settings.DevelopmentConfig)
app.register_blueprint(account.account)
app.register_blueprint(home.home)
# 将session替换成redis session
# Session(app)
init_pool(app) # 将app以参数的形式传到上面的init_pool中,该方法与上面Session(app)类似
return app
helper.py
import pymysql
from flask import current_app
# 创建一个操作sql的类,让需要执行sql的地方导入即可
class MysqlHelper:
@staticmethod
def open():
POOL = current_app.config['PYMYSQL_POOL']
conn = POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
return conn, cursor
@staticmethod
def close(conn, cursor):
conn.commit()
cursor.close()
conn.close()
@classmethod
def fetch_one(cls, sql, args):
conn, cursor = cls.open()
cursor.execute(sql, args)
obj = cursor.fetchone()
cls.close(conn, cursor)
return obj
@classmethod
def fetch_all(cls, sql, args):
conn, cursor = cls.open()
cursor.execute(sql, args)
obj = cursor.fetchall()
cls.close(conn, cursor)
return obj
3.wtforms
作用:用于对python web框架做表单验证
使用:
class MyForm(Form):
user = 类(正则,插件)
字段 = 类(正则,插件)
字段 = 类(正则,插件)
字段 = 类(正则,插件)
字段 = 类(正则,插件)
字段 = 类(正则,插件)
obj = MyForm()
# 生成HTML标签
form.obj # 会调用类.__str__方法==> 插件.xx方法
# 验证
obj = MyForm(formdata=request.form)
if form.validate():
# 内部找到所有的字段:user+用户发过来的数据=>正则校验
wtforms登录校验
from wtforms import Form
from wtforms.fields import simple, core, html5
from wtforms import validators
from wtforms import widgets
# 前端提交post请求时不做校验 form表单加novalidate
class LoginForm(Form):
user = simple.StringField(
label='用户名',
validators=[
validators.DataRequired(message='用户名不能为空.'),
# validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'}
)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.'),
# validators.Length(min=8, message='用户名长度必须大于%(min)d'),
# validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
# message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
@account.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if request.method == 'POST':
form = LoginForm(formdata=request.form)
if form.validate():
print("用户提交数据通过校验,提交的数据为:", form.data) # {'user': 'hina', 'pwd': '123'}
obj = MysqlHelper.fetch_one(
"select id, username from user where username=%(username)s and password=%(password)s", form.data)
if obj:
session.permanent = True
session['user_info'] = {'id': obj['id'], 'name': obj['username']}
return redirect('/index')
return render_template('login.html', form=form)
wtforms注册校验
from wtforms import Form, StringField, PasswordField, RadioField, SelectField, SelectMultipleField
from wtforms.fields import simple, core, html5
from wtforms import validators
from wtforms import widgets
import email_validator
class RegisterForm(Form):
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired()
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'},
default='hina'
)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
pwd_confirm = simple.PasswordField(
label='重复密码',
validators=[
validators.DataRequired(message='重复密码不能为空.'),
validators.EqualTo('pwd', message="两次密码输入不一致")
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
email = html5.EmailField(
label='邮箱',
validators=[
validators.DataRequired(message='邮箱不能为空.'),
validators.Email(message='邮箱格式错误'),
],
widget=widgets.TextInput(input_type='email'),
render_kw={'class': 'form-control'}
)
gender = core.RadioField(
label='性别',
choices=(
(1, '男'),
(2, '女'),
),
coerce=int
)
city = core.SelectField(
label='城市',
choices=MysqlHelper.fetch_all("select id, name from city", {}, None),
#
# ('bj', '北京'),
# ('sh', '上海'),
# )
coerce = int
)
hobby = core.SelectMultipleField(
label='爱好',
choices=(
(1, '篮球'),
(2, '足球'),
),
coerce=int
)
favor = core.SelectMultipleField(
label='喜好',
choices=(
(1, '篮球'),
(2, '足球'),
),
widget=widgets.ListWidget(prefix_label=False),
option_widget=widgets.CheckboxInput(),
coerce=int,
default=[1, 2]
)
def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
self.city.choices = MysqlHelper.fetch_all("select id, name from city", {}, None)
def validate_pwd_confirm(self, field):
"""
自定义pwd_confirm字段规则,例:与pwd字段是否一致
:param field:
:return:
"""
# 最开始初始化时,self.data中已经有所有的值
if field.data != self.data['pwd']:
# raise validators.ValidationError("密码不一致") # 继续后续验证
raise validators.StopValidation("密码不一致") # 不再继续后续验证
def validate_name(self, field):
print(field.data) # 当前name传来的值
print(self.data) # 当前传来的所有值 name, gender ....
obj = MysqlHelper.fetch_one("select id from user where username=%s", (field.data,))
if obj:
# raise validators.ValidationError("用户名已存在") # 继续后续验证
raise validators.StopValidation("用户名已存在") # 不再继续后续验证
@account.route('/register', methods=['GET', 'POST'])
def register():
form = RegisterForm()
if request.method == 'POST':
form = RegisterForm(formdata=request.form)
if form.validate():
print(form.data)
else:
print(form.errors)
return render_template('register.html', form=form)
# 由于在请求到来之前先要去数据库获取city的id和name所以又出现之前的获取不到current_app注册的配置PYMYSQL_POOL需要直接写在配置文件中导入
class DevelopmentConfig(Config):
ENV = 'development'
SESSION_REDIS = Redis(host='192.168.11.213', port=6379)
SESSION_KEY_PREFIX = 'lem'
PYMYSQL_HOST = "localhost"
PYMYSQL_PORT = 3306
PYMYSQL_USER = 'root'
PYMYSQL_PASSWORD = '123456'
PYMYSQL_DATABASE = 'flask'
PYMYSQL_CHARSET = 'utf8'
PYMYSQL_CONFIG = {"host": 'localhost', "user": 'root', "password": '123456', 'port': 3306, 'db': 'flask',
'charset': 'utf8'}
PYMYSQL_POOL = PooledDB(
creator=pymysql, # 使用连接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,连接池中至少创建的空闲的连接,0表示不创建
maxcached=5, # 连接池中最多闲置的连接,0和None不限制
# 连接池中最多共享的连接数量,0和None表示全部共享.PS:无用,因为pymysql和MYSQLdb等模块的threadsafety = 1,而threadsafety=1时_maxshared默认为0,所有永远都是所有连接都共享
maxshared=0,
blocking=True, # 连接池中如果没有可用的连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个连接池最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to...", "set time zone..."]
# ping Mysql服务端,检查是否服务可用。如:0=None=never,1=default=whenever it is requested,2=when a cursor is created, 4=when a query is executed,7=always
ping=0, # 工作中一般都是0/4/7
host="localhost",
port=3306,
user="root",
password="123456",
database="flask",
charset="utf8",
)
总结:
-上下文管理
-falsk-session
-wtforms
-name = simple.StringField()
UnboundField(StringField, 计数器)
-FormMeta.__call__