flask
flask介绍安装快速入门
# django 大
# flask 小
# tornado 异步(2.x用的多,3.5以后用得少)
# Sanic
# FastAPi
# 安装:pip3 install flask
# https://www.cnblogs.com/liuqingzheng/p/11012099.html
# Flask是一个基于Python开发并且依赖jinja2模板和Werkzeug WSGI服务的一个微型框架
# 快速使用
from flask import Flask
app=Flask(__name__)
@app.route(‘/‘,methods=[‘GET‘,])
def index():
return ‘hello world lqz‘
if __name__ == ‘__main__‘:
app.run(port=8080)
# 一旦有请求过来,执行app(),对象()---->触发类的__call__()
# 请求一来,执行 app()--->Flask类的__call__方法执行
配置文件
from flask import Flask,request,render_template,redirect,session,url_for
app = Flask(__name__)
#配置信息
## 方式一:直接通过app对象设置,只能设置这两个,其他不支持
# app.debug = False # debug模式,开启了,就会热更新
# app.secret_key = ‘sdfsdfsdfsdf‘ # 秘钥,django配置文件中的秘钥
## 方式二:直接通过app对象的config(字典)属性设置
# app.config[‘DEBUG‘]=True
# print(app.config)
## 方式三:直接使用py文件
# app.config.from_pyfile("settings.py")
### 重点方式:后期用这种方式,使用类方式
# app.config.from_object("python类或类的路径")
app.config.from_object(‘settings.ProductionConfig‘)
print(app.config[‘DATABASE_URI‘])
### 其他方式:(了解)
#通过环境变量配置
# app.config.from_envvar("环境变量名称")
# app.config.from_json("json文件名称")
# app.config.from_mapping({‘DEBUG‘: True})
# print(app.config)
if __name__ == ‘__main__‘:
app.run()
‘‘‘
有很多内置配置参数,不需要掌握
{
‘DEBUG‘: get_debug_flag(default=False), 是否开启Debug模式
‘TESTING‘: False, 是否开启测试模式
‘PROPAGATE_EXCEPTIONS‘: None,
‘PRESERVE_CONTEXT_ON_EXCEPTION‘: None,
‘SECRET_KEY‘: None,
‘PERMANENT_SESSION_LIFETIME‘: timedelta(days=31),
‘USE_X_SENDFILE‘: False,
‘LOGGER_NAME‘: None,
‘LOGGER_HANDLER_POLICY‘: ‘always‘,
‘SERVER_NAME‘: None,
‘APPLICATION_ROOT‘: None,
‘SESSION_COOKIE_NAME‘: ‘session‘,
‘SESSION_COOKIE_DOMAIN‘: None,
‘SESSION_COOKIE_PATH‘: None,
‘SESSION_COOKIE_HTTPONLY‘: True,
‘SESSION_COOKIE_SECURE‘: False,
‘SESSION_REFRESH_EACH_REQUEST‘: True,
‘MAX_CONTENT_LENGTH‘: None,
‘SEND_FILE_MAX_AGE_DEFAULT‘: timedelta(hours=12),
‘TRAP_BAD_REQUEST_ERRORS‘: False,
‘TRAP_HTTP_EXCEPTIONS‘: False,
‘EXPLAIN_TEMPLATE_LOADING‘: False,
‘PREFERRED_URL_SCHEME‘: ‘http‘,
‘JSON_AS_ASCII‘: True,
‘JSON_SORT_KEYS‘: True,
‘JSONIFY_PRETTYPRINT_REGULAR‘: True,
‘JSONIFY_MIMETYPE‘: ‘application/json‘,
‘TEMPLATES_AUTO_RELOAD‘: None,
}
‘‘‘
路由系统
from flask import Flask,request,render_template,redirect,session,url_for
app = Flask(__name__)
app.debug = True # debug模式,开启了,就会热更新
app.secret_key = ‘sdfsdfsdfsdf‘ # 秘钥,django配置文件中的秘钥
# @app.route(‘/index/<name>‘,methods=[‘GET‘],endpoint=‘index‘,defaults={‘name‘:‘lqz‘},strict_slashes=True,redirect_to=‘http://www.baidu.com‘)
@app.route(‘/index/<string:name>/<int:pk>‘,methods=[‘GET‘],endpoint=‘index‘)
def index(name,pk):
print(name)
return ‘hello‘
# app.add_url_rule(‘/index‘,endpoint=‘index‘,view_func=index,defaults={‘name‘:‘lqz‘,‘age‘:19})
if __name__ == ‘__main__‘:
app.run()
## 路由本质app.add_url_rule
‘‘‘
路由系统的本质,就是 app.add_url_rule(路径, 别名, 函数内存地址, **options)
endpoint:如果不填,就是函数名(加装饰器时要注意)
与django路由类似
django与flask路由:flask路由基于装饰器,本质是基于:add_url_rule
add_url_rule 源码中,endpoint如果为空,endpoint = _endpoint_from_view_func(view_func),最终取view_func.__name__(函数名)
‘‘‘
#### add_url_rule的参数
‘‘‘
##### 记住
rule, URL规则
view_func, 视图函数名称
defaults = None, 默认值, 当URL中无参数,函数需要参数时,使用defaults = {‘k‘: ‘v‘} 为函数提供参数
endpoint = None, 名称,用于反向生成URL,即: url_for(‘名称‘)
methods = None, 允许的请求方式,如:["GET", "POST"]
###了解
#对URL最后的 / 符号是否严格要求
strict_slashes = None
@app.route(‘/index‘, strict_slashes=False)
#访问http://www.xx.com/index/ 或http://www.xx.com/index均可
@app.route(‘/index‘, strict_slashes=True)
#仅访问http://www.xx.com/index
#重定向到指定地址
redirect_to = None,
@app.route(‘/index/<int:nid>‘, redirect_to=‘/home/<nid>‘)
‘‘‘
### 路由转换器
‘‘‘
‘default‘: UnicodeConverter,
‘string‘: UnicodeConverter,
‘any‘: AnyConverter,
‘path‘: PathConverter,
‘int‘: IntegerConverter,
‘float‘: FloatConverter,
‘uuid‘: UUIDConverter,
‘‘‘
## 自定义转换器(不用)
CBV
from flask import Flask,request,render_template,redirect,session,url_for
from flask.views import View,MethodView
app = Flask(__name__)
app.debug = True # debug模式,开启了,就会热更新
app.secret_key = ‘sdfsdfsdfsdf‘ # 秘钥,django配置文件中的秘钥
class IndexView(MethodView): # 继承MethodView
def get(self):
url=url_for(‘aaa‘)
print(url)
return ‘我是get‘
def post(self):
return ‘我是post‘
app.add_url_rule(‘/index‘,view_func=IndexView.as_view(name=‘aaa‘))
‘‘‘
endpoint:如果传了,优先使用endpoint,如果不传使用as_view(name=‘aaa‘),但是name=‘aaa‘必须传
cbv要继承MethodView,只需要写get,post...
cbv要继承View,必须重写dispatch
‘‘‘
if __name__ == ‘__main__‘:
app.run(port=8888)
模板(不重要)
from flask import Flask,request,render_template,redirect,session,url_for,Markup
from flask.views import View,MethodView
app = Flask(__name__)
app.debug = True # debug模式,开启了,就会热更新
app.secret_key = ‘sdfsdfsdfsdf‘ # 秘钥,django配置文件中的秘钥
def test(a,b):
return a+b
class IndexView(MethodView): # 继承MethodView
def get(self):
url=url_for(‘aaa‘)
print(url)
# a=Markup(‘<a href="http://www.baidu.com">点我看美女</a>‘)
a=‘<a href="http://www.baidu.com">点我看美女</a>‘
return render_template(‘test.html‘,name=‘lqz‘,test=test,a=a)
def post(self):
return ‘我是post‘
app.add_url_rule(‘/index‘,view_func=IndexView.as_view(name=‘aaa‘))
if __name__ == ‘__main__‘:
app.run(port=8888)
‘‘‘
0 跟dtl完全一样,但是它可以执行函数
1.Markup等价django的mark_safe ,
2.extends,include一模一样
‘‘‘
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>{{name}}</h1>
<hr>
{{test(4,5)}}
<hr>
{{a|safe}}
{{a}}
</body>
</html>
请求响应
from flask import Flask,jsonify
from flask import views
from flask import Flask
from flask import request
from flask import render_template
from flask import redirect
from flask import make_response
app = Flask(__name__)
app.debug=True
@app.route(‘/login.html‘, methods=[‘GET‘, "POST"])
def login():
# 请求相关信息
# request.method 提交的方法
# print(request.args.get(‘name‘)) #get请求提交的数据---GET
# print(request.form) #post请求提交数据----POST
# print(request.values) # get和post的汇总
# print(request.query_string) #b‘name=lqz&age=19‘
# print(request.cookies)
# print(request.path)
# print(request.full_path)
# request.args get请求提及的数据
# request.form post请求提交的数据
# request.values post和get提交的数据总和
# request.cookies 客户端所带的cookie
# request.headers 请求头
# request.path 不带域名,请求路径
# request.full_path 不带域名,带参数的请求路径
# request.url 带域名带参数的请求路径
# request.base_url 带域名请求路径
# request.url_root 域名
# request.host_url 域名
# request.host 127.0.0.1:500
# request.files
# obj = request.files[‘the_file_name‘]
# obj.save(‘/var/www/uploads/‘ + secure_filename(f.filename))
# 响应相关信息
# return "字符串"
# return render_template(‘html模板路径‘,**{})
# return redirect(‘/index.html‘)
# return jsonify({‘k1‘:‘v1‘})
# response = make_response(render_template(‘index.html‘))
response = make_response(‘hello‘)
# response是flask.wrappers.Response类型
response.delete_cookie(‘session‘)
response.set_cookie(‘name‘, ‘lqz‘)
response.headers[‘X-Something‘] = ‘A value‘
return response
# return "内容"
if __name__ == ‘__main__‘:
app.run(port=8888)
session
# 使用
-增:session[‘name‘]=lqz
-查:session.get(‘name‘)
-删:session.pop(‘name‘)
## 源码分析SecureCookieSessionInterface
## open_session
‘‘‘
val = request.cookies.get(app.session_cookie_name)
data = s.loads(val, max_age=max_age)
return self.session_class(data)
‘‘‘
## save_session
‘‘‘
val = self.get_signing_serializer(app).dumps(dict(session))
response.set_cookie(
app.session_cookie_name,
val,
expires=expires,
)
‘‘‘‘name‘)
# set_cookie其他参数
key, 键
value=‘‘, 值
max_age=None, 超时时间 cookie需要延续的时间(以秒为单位)如果参数是\ None`` ,这个cookie会延续到浏览器关闭为止
expires=None, 超时时间(IE requires expires, so set it if hasn‘t been already.)
path=‘/‘, Cookie生效的路径,/ 表示根路径,特殊的:根路径的cookie可以被任何url的页面访问,浏览器只会把cookie回传给带有该路径的页面,这样可以避免将cookie传给站点中的其他的应用。
domain=None, Cookie生效的域名 你可用这个参数来构造一个跨站cookie。如, domain=".example.com"所构造的cookie对下面这些站点都是可读的:www.example.com 、 www2.example.com 和an.other.sub.domain.example.com 。如果该参数设置为 None ,cookie只能由设置它的站点读取
secure=False, 浏览器将通过HTTPS来回传cookie
httponly=False 只能http协议传输,无法被JavaScript获取(不是绝对,底层抓包可以获取到也可以被覆盖)
from flask import Flask,jsonify
from flask import views
from flask import Flask,session
from flask import request
from flask import render_template
from flask import redirect
from flask import make_response
app = Flask(__name__)
app.debug=True
app.secret_key=‘sdfsdfsadfasdf‘
app.session_interface
@app.route(‘/login.html‘, methods=[‘GET‘, "POST"])
def login():
session[‘name‘]=‘lqz‘
‘‘‘
#在django中发什么三件事,1,生成一个随机的字符串 2 往数据库存 3 写入cookie返回浏览器
#在flask中没有数据库,但session是怎样实现的?
# 生成一个密钥写入这个cookie,然后下次请求的时候,通过这个cookie解密,然后赋值给session
‘‘‘
response=make_response(‘hello‘)
# response.set_cookie(‘name‘, ‘lqz‘)
return response
@app.route(‘/index‘, methods=[‘GET‘, "POST"])
def index():
print(session.get(‘name‘))
return ‘我是首页‘
if __name__ == ‘__main__‘:
app.run(port=8080)
## 源码分析SecureCookieSessionInterface
## open_session
‘‘‘
val = request.cookies.get(app.session_cookie_name)
data = s.loads(val, max_age=max_age)
return self.session_class(data)
‘‘‘
## save_session
‘‘‘
val = self.get_signing_serializer(app).dumps(dict(session))
response.set_cookie(
app.session_cookie_name,
val,
expires=expires,
)
‘‘‘
闪现
from flask import Flask,jsonify,flash,get_flashed_messages
from flask import Flask
from flask import render_template
from flask import redirect
from flask import make_response
app = Flask(__name__)
app.debug=True
app.secret_key=‘sdfsdfsadfasdf‘
@app.route(‘/user‘, methods=[‘GET‘, "POST"])
def login():
try:
a=[1,2,3]
print(a[9])
except Exception as e:
print(e)
## 放到某个位置
# flash(str(e))
# 高级使用
flash(‘超时错误‘, category="x1")
flash(‘xx错误‘, category="x3")
# return redirect(‘/error?errors=%s‘%str(e))
return redirect(‘/error‘)
response=make_response(‘hello‘)
return response
@app.route(‘/error‘, methods=[‘GET‘, "POST"])
def error():
#从那个位置取出来
# errors=get_flashed_messages()
errors=get_flashed_messages(category_filter=[‘x1‘])
return render_template(‘error.html‘,errors=errors)
if __name__ == ‘__main__‘:
app.run(port=8080)
请求扩展
from flask import Flask,jsonify,flash,get_flashed_messages
from flask import Flask,request
from flask import render_template
from flask import redirect
from flask import make_response
app = Flask(__name__)
app.debug=False
app.secret_key=‘sdfsdfsadfasdf‘
# @app.before_request #类比django中间件中的process_request,写多个执行顺序是从上往下
# def before():
# print(‘我来了‘)
#
# request.xx=‘xxxxxx‘
# # return ‘回去吧‘ #四件套之一
# return None # 继续进入下一个before_request
# @app.before_request #类比django中间件中的process_request
# def before2():
# print(‘我来了22‘)
#
# return None
# @app.after_request # 从下往上
# def after(response):
# print(‘我走了‘)
# print(response)
# response.headers[‘xxxx‘]=‘xxx‘
# return response #要return response
# @app.after_request
# def after2(response):
# print(‘我走了222‘)
# return response #要return response
# @app.before_first_request # 只会执行一次,程序启动以后,第一个访问的会触发,以后再也不会了
# def first():
# # 程序初始化的一些操作
# print(‘我的第一次‘)
# @app.teardown_request
# def ter(e):
# # 日志记录,不管当次请求是否出异常,都会执行,出了异常,e就是异常对象,debug=False模式下
# print(e)
# print(‘我执行了‘)
# @app.errorhandler(404) #只要是404错误,都会走它
# def error_404(arg):
# return "404错误了"
# # return render_template(‘404.html‘)
# @app.errorhandler(500) #只要是500错误,都会走它,debug模式要关掉
# def error_500(arg):
# return "出问题了"
@app.template_global()
def sb(a1, a2):
return a1 + a2
#{{sb(1,2)}}
@app.template_filter()
def db(a1, a2, a3):
return a1 + a2 + a3
#{{ 1|db(2,3)}}
@app.route(‘/user‘, methods=[‘GET‘, "POST"])
def login():
# print(request.xx)
# response=make_response(‘hello‘)
# return response
return render_template(‘login.html‘)
if __name__ == ‘__main__‘:
app.run(port=8080)
蓝图
# 对程序进行目录结构划分
# 使用步骤
-实例化得到一个蓝图对象(可以指定直接的静态文件和模板路径)
-在app中注册蓝图(可以指定前缀)
-以后再写路由装饰器,使用蓝图对象的.route
flask 项目演示
# 1 创建一个库movie
# 2 手动把表同步进去
-modes.py,解开注释,右键执行
# 3 安装项目依赖
-flask-sqlalchemy
-flask_script
-flask_redis
-flask_wtf
# 4 命令行中运行
python3 manage.py runserver
# 5 后台管理中rbac控制
# https://gitee.com/openspug/spug/tree/1.x/
g对象
from flask import Flask,g,request,session
# g对象在当次请求中一直有效
# g和session有什么区别
#session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,
# 但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次
app = Flask(__name__)
@app.before_request
def first():
session[‘name‘]=‘dlrb‘
request.form=‘egon‘
g.name=‘lqz‘
@app.after_request
def after(response):
print(‘11111‘,g.name)
return response
@app.route(‘/‘)
def hello_world():
print(‘00000‘,g.name)
# test()
# test1()
return ‘Hello World!‘
if __name__ == ‘__main__‘:
app.run()
flask-session
# pip install flask-session
from flask import Flask,g,request,session
from flask_session import RedisSessionInterface
app = Flask(__name__)
app.debug=True
app.secret_key=‘asdfasdfasdf‘
# 方式一
# from redis import Redis
# conn=Redis(host=‘127.0.0.1‘,port=6379)
# app.session_interface=RedisSessionInterface(redis=conn,key_prefix=‘flask_session‘)
## 方式二(flask使用第三方插件的通用方案)
from flask_session import Session
from redis import Redis
app.config[‘SESSION_TYPE‘] = ‘redis‘
app.config[‘SESSION_KEY_PREFIX‘]=‘flask_session‘
app.config[‘SESSION_REDIS‘] = Redis(host=‘127.0.0.1‘,port=‘6379‘)
Session(app)
@app.route(‘/‘)
def hello_world():
session[‘name‘]=‘lqz‘
return ‘Hello World!‘
if __name__ == ‘__main__‘:
app.run()
# 如何设置session的过期时间?
配置文件一个参数
#设置cookie时,如何设定关闭浏览器则cookie失效
app.session_interface=RedisSessionInterface(conn,key_prefix=‘lqz‘,permanent=False)
数据库连接池
# from flask import Flask,g,request,session
# import time
# import pymysql
# app = Flask(__name__)
# app.debug=True
#
# app.secret_key=‘asdfasdfasdf‘
#
#
#
#
#
# @app.route(‘/‘)
# def hello_world():
# conn = pymysql.connect(host=‘127.0.0.1‘, port=3306, user=‘root‘, password=‘1‘, database=‘luffy‘)
# cursor = conn.cursor()
# cursor.execute(‘select * from luffy_order‘)
# time.sleep(1)
# print(cursor.fetchall())
# return ‘Hello World!‘
#
#
# @app.route(‘/hello‘)
# def hello_world1():
# conn = pymysql.connect(host=‘127.0.0.1‘, port=3306, user=‘root‘, password=‘1‘, database=‘luffy‘)
# cursor = conn.cursor()
# cursor.execute(‘select * from luffy_banner‘)
# time.sleep(1)
#
# print(cursor.fetchall())
# return ‘Hello World!‘
# if __name__ == ‘__main__‘:
# app.run()
## 问题:如果使用全局连接对象,会导致数据错乱
## 问题二:如果在视图函数中创建数据库连接对象,会导致连接数过多
## 解决:使用数据库连接池 DBUtils
from dbutils.pooled_db import PooledDB
import pymysql
POOL=PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host=‘127.0.0.1‘,
port=3306,
user=‘root‘,
password=‘1‘,
database=‘luffy‘,
charset=‘utf8‘)
# 去池中获取连接
from threading import Thread
# mysql可以看到当前有多少个连接数
def task():
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute(‘select * from luffy_order‘)
print(cursor.fetchall())
for i in range(100):
t=Thread(target=task)
t.start()
# 类似forms组件
信号
# Flask框架中的信号基于blinker,其主要就是让开发者可是在flask执行过程中定制一些用户行为
# pip3 install blinker
## flask中有内置信号
# 内置信号什么时候触发的
‘‘‘
request_started = _signals.signal(‘request-started‘) # 请求到来前执行
request_finished = _signals.signal(‘request-finished‘) # 请求结束后执行
before_render_template = _signals.signal(‘before-render-template‘) # 模板渲染前执行
template_rendered = _signals.signal(‘template-rendered‘) # 模板渲染后执行
got_request_exception = _signals.signal(‘got-request-exception‘) # 请求执行出现异常时执行
request_tearing_down = _signals.signal(‘request-tearing-down‘) # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal(‘appcontext-tearing-down‘)# 应用上下文执行完毕后自动执行(无论成功与否)
appcontext_pushed = _signals.signal(‘appcontext-pushed‘) # 应用上下文push时执行
appcontext_popped = _signals.signal(‘appcontext-popped‘) # 应用上下文pop时执行
message_flashed = _signals.signal(‘message-flashed‘) # 调用flask在其中添加数据时,自动触发
‘‘‘
## 自定义信号
‘‘‘
1 写一个信号
2 写一个函数
3 信号绑定函数
4 触发信号
‘‘‘
#### 内置信号的使用
‘‘‘
内置信号使用步骤:
-写一个函数
-跟内置信号绑定
-以后只要触发内置信号,函数就会执行
‘‘‘
from flask import Flask,signals,render_template
from flask.signals import _signals
app = Flask(__name__)
# 往信号中注册函数
def func(*args,**kwargs):
print(‘触发型号‘,args,kwargs)
# 信号一般用来记录日志
# signals.request_started.connect(func)
# 给模板渲染前编写信号
def template_before(*args,**kwargs):
print(args)
print(kwargs)
print(‘模板开始渲染了‘)
# signals.before_render_template.connect(template_before)
‘‘‘
1 写一个信号
2 写一个函数
3 信号绑定函数
4 触发信号
‘‘‘
# 自定义信号
before_view = _signals.signal(‘before_view‘)
# 写函数
def test(*args,**kwargs):
print(‘我执行了‘)
print(args)
print(kwargs)
# 绑定给信号
before_view.connect(test)
# 触发信号
@app.route(‘/‘,methods=[‘GET‘,"POST"])
def index():
print(‘视图‘)
return ‘hello world‘
@app.route(‘/index‘,methods=[‘GET‘,"POST"])
def index1():
# 触发信号
before_view.send(name=‘lqz‘,age=19)
print(‘视图‘)
return render_template(‘index.html‘,a=‘lqz‘)
if __name__ == ‘__main__‘:
app.run(port=8080)
app.__call__
flask-script
from flask_script import Manager
from flask import Flask
app = Flask(__name__)
manager=Manager(app)
@app.route(‘/‘)
def index():
return ‘你好‘
if __name__ == ‘__main__‘:
manager.run()
#以后在执行,直接:python3 manage.py runserver
#python3 manage.py runserver --help
SQLAlchemy(orm框架)
# orm框架SQLAlchemy,第三方,独立使用,集成到web框架中
# django的orm框架
# pip install SQLAlchemy
sqlalchemy 一对多关系
# 操作mysql
-pymysql---》写原生sql
-django的orm
-sqlalchemy:orm框架,独立于其他框架,可以单独使用,也可以集成到框架中使用
-(了解)同步框架:sanic,fastapi,只用来对表进行管理,不用来查询插入,aiomysql
-Gino:https://python-gino.org/docs/zh/master/tutorials/tutorial.html
-国人写的异步的orm框架
sqlalchemy 多对多关系,其他操作
import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base()
#单表
class Users(Base):
__tablename__ = ‘users‘ # 数据库表名称
id = Column(Integer, primary_key=True) # id 主键
name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空
# email = Column(String(32), unique=True)
#datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
# ctime = Column(DateTime, default=datetime.datetime.now)
# extra = Column(Text, nullable=True)
__table_args__ = (
# UniqueConstraint(‘id‘, ‘name‘, name=‘uix_id_name‘), #联合唯一
# Index(‘ix_id_name‘, ‘name‘, ‘email‘), #索引
)
def __repr__(self):
return self.name+‘===‘+str(self.id)
### 一对多
class Hobby(Base):
__tablename__ = ‘hobby‘
id = Column(Integer, primary_key=True)
caption = Column(String(50), default=‘篮球‘)
def __str__(self):
return self.caption
class Person(Base):
__tablename__ = ‘person‘
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
# hobby指的是tablename而不是类名,uselist=False
hobby_id = Column(Integer, ForeignKey("hobby.id"))
# 跟数据库无关,不会新增字段,只用于快速链表操作
# 类名,backref用于反向查询
hobby = relationship(‘Hobby‘, backref=‘pers‘)
def __str__(self):
return self.name
def __repr__(self): # 解释器环境下打印对象显示的样子
return self.name
###多对多
class Boy2Girl(Base):
__tablename__ = ‘boy2girl‘
id = Column(Integer, primary_key=True, autoincrement=True)
girl_id = Column(Integer, ForeignKey(‘girl.id‘))
boy_id = Column(Integer, ForeignKey(‘boy.id‘))
class Girl(Base):
__tablename__ = ‘girl‘
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
class Boy(Base):
__tablename__ = ‘boy‘
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(64), unique=True, nullable=False)
# 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
girls = relationship(‘Girl‘, secondary=‘boy2girl‘, backref=‘boys‘)
def __str__(self):
return self.name
def init_db():
"""
根据类创建数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:1@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.create_all(engine)
def drop_db():
"""
根据类删除数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:1@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.drop_all(engine)
if __name__ == ‘__main__‘:
# drop_db()
init_db()
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
import models
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from models import Users
from sqlalchemy.sql import text
#"mysql+pymysql://root@127.0.0.1:3306/aaa"
engine = create_engine("mysql+pymysql://root:1@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
Connection = sessionmaker(bind=engine)
session=Connection()
## 一对多操作
# hobby=models.Hobby(caption=‘橄榄球‘)
# session.add(hobby)
# hobby=models.Hobby(caption=‘足球‘)
# person=models.Person(name=‘张三‘,hobby_id=1)
# session.add_all([hobby,person])
# 使用relationship
# hobby=models.Hobby(caption=‘水球‘)
# person=models.Person(name=‘张三‘,hobby=hobby)
# session.add_all([hobby,person])
## 基于对象的跨表查询
# 正向
# person=session.query(models.Person).filter_by(nid=1).first()
# print(person)
# print(person.hobby)
# 反向
# hobby=session.query(models.Hobby).filter_by(id=1).first()
# print(hobby)
# print(hobby.pers)
# 多对多
# girl=models.Girl(name=‘刘亦菲‘)
# boy=models.Boy(name=‘吴亦凡‘)
# session.add_all([girl,boy])
# b2g=models.Boy2Girl(boy_id=1,girl_id=1)
# session.add(b2g)
## 基于对象的跨表查询
# 正向
# boy=session.query(models.Boy).filter_by(name=‘吴亦凡‘).first()
# print(boy)
# print(boy.girls)
# 反向
# girl=session.query(models.Girl).filter_by(name=‘刘亦菲‘).first()
# print(girl)
# print(girl.boys)
# b2g=session.query(models.Boy2Girl).filter_by(id=1).first()
# print(b2g.boy)
## filter 和 filter_by区别
# filter内写条件,filter_by内传参数
# ################ 修改 ################
#传字典
# res=session.query(models.Boy).filter_by(id = 1).all()
# res=session.query(models.Boy).filter(models.Boy.id >= 1).all()
# print(res)
# session.query(models.Boy).filter(models.Boy.id > 1).update({"name" : "lqz"})
#类似于django的F查询
# session.query(models.Boy).filter(models.Boy.id > 1).update({models.Boy.name: models.Boy.name + "099"}, synchronize_session=False)
# session.query(models.Boy).filter(models.Boy.id == 3).update({"id": models.Boy.id + 1}, synchronize_session="evaluate")
# session.commit()
# ################ 查询 ################
# r1 = session.query(models.Boy).all()
#只取age列,把name重命名为xx
# select name as xx,hobby_id from person
# r1 = session.query(models.Person.name.label(‘xx‘), models.Person.hobby_id).all()
# #filter传的是表达式,filter_by传的是参数
# r1 = session.query(Users).filter(Users.name == "lqz").all()
# r4 = session.query(Users).filter_by(name=‘lqz‘).all()
# r1 = session.query(Users).filter_by(name=‘lqz‘).first()
# #:value 和:name 相当于占位符,用params传参数
# r1 = session.query(Users).filter(text("id<:value and name=:name")).params(value=3, name=‘lqz‘).order_by(Users.id).all()
# #自定义查询sql
# r1 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name=‘lqz‘).all()
###更多操作
# 条件
# ret = session.query(Users).filter_by(name=‘lqz‘).all()
#表达式,and条件连接
# ret = session.query(Users).filter(Users.id > 1, Users.name == ‘lqz‘).all()
# ret = session.query(Users).filter(Users.id.between(1, 2), Users.name == ‘lqz1‘).all()
# #注意下划线
# ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
# #~非,除。。外
# ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
# #二次筛选
# ret = session.query(Users).filter(Users.id.in_(session.query(models.Person.nid).filter_by(name=‘张三‘))).all()
from sqlalchemy import and_, or_
# #or_包裹的都是or条件,and_包裹的都是and条件
# ret = session.query(Users).filter(and_(Users.id > 1, Users.name == ‘lqz‘)).all()
# ret = session.query(Users).filter(or_(Users.id < 2, Users.name == ‘lqz‘)).all()
# ret = session.query(Users).filter(
# or_(
# Users.id < 2,
# and_(Users.name == ‘李易峰‘, Users.id > 2),
# )).all()
#
#
# # 通配符,以e开头,不以e开头
# ret = session.query(Users).filter(Users.name.like(‘l%‘)).all()
# ret = session.query(Users).filter(~Users.name.like(‘l%‘)).all()
#
# # 限制,用于分页,区间
# ret = session.query(Users)[0:2] #前闭后开
#
# # 排序,根据name降序排列(从大到小)
# ret = session.query(Users).order_by(Users.name.desc()).all()
# #第一个条件重复后,再按第二个条件升序排
# ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
#
# # 分组
from sqlalchemy.sql import func
#
# ret = session.query(Users).group_by(Users.name).all()
# #分组之后取最大id,id之和,最小id
# select min(id) as mid_id ,max(id) as max_id,sum(id) as sum_id from user where id >10 groupby name having sum(id)>10
# valuse 在annotate前表示groupby
# filter 在annotate前表示where
# filter 在annotate后表示having
# valuse 在annotate后表示取字段
# User.objects.all().filter(id__gt=10).values(‘name‘).annotate(max_id=max(id),sum_id=sum(id),min_id=min(id)).filter(sum_id>10)
# 分组以后,只能拿分组字段和聚合函数字段
# ret = session.query(
# func.max(Users.id),
# func.sum(Users.id),
# func.min(Users.id),Users.name).group_by(Users.name).all()
# #haviing筛选
# ret = session.query(
# func.max(Users.id),
# func.sum(Users.id),
# func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
#
# # 连表(默认用forinkey关联)
# select * from person,hobby where person.hobby_id=hobby.id;
# ret = session.query(models.Person,models.Hobby).filter(models.Person.hobby_id == models.Hobby.id).all()
# #join表,默认是inner join
# select * from person inner join hobby on person.hobby_id=hobby.id
#select * from person inner join hobby on person.hobby_id=hobby.id where hobby.caption=篮球
# Person.objects.all().filter(hobby__caption=‘篮球‘)
# ret = session.query(models.Person).join(models.Hobby).filter(models.Hobby.caption==‘篮球‘).all()
# #isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可
#select * from person left join hobby on person.hobby_id=hobby.id
# ret = session.query(models.Person).join(models.Hobby, isouter=True).all()
# ret = session.query(models.Hobby).join(models.Person, isouter=True).all()
# #打印原生sql
# aa=session.query(models.Person).join(models.Hobby)
# print(aa)
# # 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上
# ret = session.query(models.Person).join(models.Hobby,models.Person.hobby_id==models.Hobby.id, isouter=True)
# print(ret)
# print(ret)
session.commit() # 提交
session.close()
flask-sqlalchemy 操作
# flask-sqlalchemy:让flask更好的集成sqlalchemy
# flask_migrate:类似于django的makemigrations和migrate,因为sqlalchemy不支持表修改(删除,增加字段)
flask-migrate
python3 manage.py db init 初始化:只执行一次
python3 manage.py db migrate 等同于 makemigartions
python3 manage.py db upgrade 等同于migrate
## flask-sqlalchemy
-先导入,实例化得到一个对象
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
-所有表模型都继承 db.Model
-在视图函数中查询那个session对象
db.session
## 导出项目依赖
# 安装:pip3 install pipreqs
#导出:pipreqs . --encoding=utf8
flask