Sanic是异步库,想要发挥其强大的性能,当需要使用第三方库的时候,就需要使用异步的库,在python中,异步orm较为常见的就两个可,一个SQLAlchemy,一个Tortoise-ORM
SQLAlchemy 在1.4版本之后,已经支持异步了,既然要用异步,那同步库的PyMYSQL肯就就不能满足了,所以需要用异步库aiomysql
SQLAlchemy官网:https://www.sqlalchemy.org/
SQLAlchemy中文网站:https://www.osgeo.cn/sqlalchemy/index.html
aiomysql官网:https://aiomysql.readthedocs.io/en/latest/
安装SQLAlchemy: pip install SQLAlchemy
安装aiomysql: pip install aiomysql
使用示例
先确保有数据库:库名随便取,比如test
创建模型
由于没有sqlalchemy1.4以后的教程太少,没有找到用aiomysql作为驱动的映射教程,所以这里将对象映射到数据库的操作还是用mysql,系统运行中用aiomysql
orm与app绑定,这里就用异步的aiomysql了
在中间件中定义获取数据库session和释放资源
路由
插入数据
查询
models
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class BaseModel(Base):
__abstract__ = True
id = Column(Integer, primary_key=True, comment='id,主键')
class User(BaseModel):
""" 用户表 """
__tablename__ = "user"
name = Column(String(64), comment='名字')
age = Column(Integer, comment='年龄')
def to_dict(self):
return {"id": self.id, "name": self.name, "age": self.age}
模型映射到数据库
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Base
# 导入相应的模块
engine = create_engine("mysql+pymysql://root:123456@localhost/test")
# 创建session对象
session = sessionmaker(engine)()
# 创建表,执行所有BaseModel类的子类
Base.metadata.create_all(engine)
# 提交,必须
session.commit()
业务逻辑
from contextvars import ContextVar
from sanic import Sanic, response
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
from models import User
app = Sanic("SanicAndSqlalchemy")
# 创建异步数据库引擎
bind = create_async_engine("mysql+aiomysql://root:123456@localhost/test", echo=True)
_base_model_session_ctx = ContextVar("session")
@app.middleware("request")
async def inject_session(request):
"""
请求中间件
创建一个可用的 AsyncSession 对象并且将其绑定至 request.ctx 中,
而 _base_model_session_ctx 也会在这是被赋予可用的值,
如果需要在其他地方使用 session 对象(而非从 request.ctx 中取值),该全局变量或许能帮助您(它是线程安全的)。
"""
request.ctx.session = sessionmaker(bind, AsyncSession, expire_on_commit=False)()
request.ctx.session_ctx_token = _base_model_session_ctx.set(request.ctx.session)
@app.middleware("response")
async def close_session(request, response):
""" 响应中间件,将创建的 AsyncSession 关闭,并重置 _base_model_session_ctx 的值,进而释放资源 """
if hasattr(request.ctx, "session_ctx_token"):
_base_model_session_ctx.reset(request.ctx.session_ctx_token)
await request.ctx.session.close()
@app.post("/user")
async def create_user(request):
session = request.ctx.session
async with session.begin():
user = User(**request.json)
session.add(user)
return response.json(user.to_dict())
@app.get("/user/<user_id:int>")
async def get_user(request, user_id):
session = request.ctx.session
async with session.begin():
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar()
return response.json({'code': 200, 'message': '查询成功', 'data': user.to_dict() if user else {}})
if __name__ == '__main__':
import uvicorn
uvicorn.run('main:app', host='0.0.0.0', port=8000, debug=True)