FastAPI数据库系列(一) MySQL数据库操作

一、简介

FastAPI中你可以使用任何关系型数据库,可以通过SQLAlchemy将其轻松的适应于任何的数据库,比如:

  • PostgreSQL
  • MySQL
  • SQLite
  • Oracle
  • Microsoft SQL Server
  • ...

  SQLAlchemy是一个ORM(object-relational mapping)的框架。在ORM中,你创建一个类就会通过SQLAlchemy将其自动转成一张表,在类中的每一个属性就会将其转成表中的字段。

这里有一些实例,假如有一个大的项目,里面包含一个子包叫做sql_app:

.
└── sql_app
    ├── __init__.py
    ├── crud.py
    ├── database.py
    ├── main.py
    ├── models.py
    └── schemas.py
  • __init__.py 是一个空文件,但是说明sql_app是一个package
  • database.py  数据库配置相关
  • models.py 数据库模型表
  • schemas.py 模型验证
  • crud.py  数据库操作相关
  • main.py 主文件

二、简单实例

 该实例以MySQL为例,SQLAlchemy需要借助于pymysql连接数据库,所以需要进行安装这两个工具包:

pip install sqlalchemy
pip install pymysql

1、database.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:123456@127.0.0.1:3306/test"

# echo=True表示引擎将用repr()函数记录所有语句及其参数列表到日志
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, encoding='utf8', echo=True
)

# SQLAlchemy中,CRUD是通过会话进行管理的,所以需要先创建会话,
# 每一个SessionLocal实例就是一个数据库session
# flush指发送到数据库语句到数据库,但数据库不一定执行写入磁盘
# commit是指提交事务,将变更保存到数据库文件中
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 创建基本映射类
Base = declarative_base()

在数据库相关的配置文件中,首先创建一个SQLAlchemy的"engine",然后创建SessionLocal实例进行会话,最后创建模型类的基类。

2、models.py

from sqlalchemy import Boolean, Column, Integer, String
from database import Base


class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(32), unique=True, index=True)
    hashed_password = Column(String(32))
    is_active = Column(Boolean, default=True)

通过数据库配置文件中的基类来创建模型类。

3、schemas.py

from pydantic import BaseModel


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    """
    请求模型验证:
    email:
    password:
    """
    password: str


class User(UserBase):
    """
    响应模型:
    id:
    email:
    is_active
    并且设置orm_mode与之兼容
    """
    id: int
    is_active: bool

    class Config:
        orm_mode = True

  定义请求参数模型验证与响应模型验证的Pydantic模型,其中响应模型中设置orm_mode=True参数,表示与ORM模型兼容,因为后续中返回的数据库查询是orm模型,通过设置这个参数可以将orm模型通过pydantic模型进行验证。

4、crud.py

from sqlalchemy.orm import Session
import models, schemas


# 通过id查询用户
def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


# 新建用户
def db_create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()  # 提交保存到数据库中
    db.refresh(db_user)  # 刷新
    return db_user

通过传入数据库连接以及参数等进行数据库操作,包括创建用户、查询用户等,返回的是orm模型对象。

5、main.py

from fastapi import FastAPI, Depends, HTTPException
import crud, schemas
from database import SessionLocal, engine, Base
from sqlalchemy.orm import Session
import uvicorn

Base.metadata.create_all(bind=engine) #数据库初始化,如果没有库或者表,会自动创建

app = FastAPI()


# Dependency
def get_db():
    """
    每一个请求处理完毕后会关闭当前连接,不同的请求使用不同的连接
    :return:
    """
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


# 新建用户
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    return crud.db_create_user(db=db, user=user)


# 通过id查询用户
@app.get("/user/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if not db_user:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


if __name__ == '__main__':
    uvicorn.run(app=app, host="127.0.0.1", port=8000)

主文件进行数据库初始化、FastAPI实例创建以及处理各种请求。

进入到交互文档查看:

  • http://127.0.0.1:8000/users/
# 请求
{
  "email": "hhh@example113.com",
  "password": "ss123456"
}

# 响应
{
  "email": "hhh@example113.com",
  "id": 7,
  "is_active": true
}
  • http://127.0.0.1:8000/user/7
# 响应
{
  "email": "hhh@example113.com",
  "id": 7,
  "is_active": true
}

三、复杂实例 

在之前的基础上再加一个模型类Item,User与之是一对多的关系。

 1、models.py

from sqlalchemy import Boolean, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from database import Base


class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(32), unique=True, index=True)
    hashed_password = Column(String(32))
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(32), index=True)
    description = Column(String(32), index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

 2、schemas.py

from typing import Optional,List
from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Optional[str] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    """
    请求模型验证:
    email:
    password:
    """
    password: str


class User(UserBase):
    """
    响应模型:
    id:
    email:
    is_active
    并且设置orm_mode与之兼容
    """
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True

3、crud.py

from sqlalchemy.orm import Session
import models, schemas


# 通过id查询用户
def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


# 新建用户
def db_create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()  # 提交保存到数据库中
    db.refresh(db_user)  # 刷新
    return db_user


# 获取用户拥有的item
def get_item(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


# 新建用户的item
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

4、main.py

from typing import List
from fastapi import FastAPI, Depends, HTTPException
import crud, schemas
from database import SessionLocal, engine, Base
from sqlalchemy.orm import Session
import uvicorn

Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    """
    每一个请求处理完毕后会关闭当前连接,不同的请求使用不同的连接
    :return:
    """
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


# 新建用户
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    return crud.db_create_user(db=db, user=user)


# 通过id查询用户
@app.get("/user/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if not db_user:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


# 读取用户拥有的item
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 0, db: Session = Depends(get_db)):
    items = crud.get_item(db=db, skip=skip, limit=limit)
    return items


# 创建用户的item
@app.post("/users/{user_id}/items", response_model=schemas.Item)
def create_item_user(user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


if __name__ == '__main__':
    uvicorn.run(app=app, host="127.0.0.1", port=8000)

当启动项目后,会生成新的Item数据表,以及与User表之间建立关系:

# User表
create table users
(
    id              int auto_increment
        primary key,
    email           varchar(32) null,
    hashed_password varchar(32) null,
    is_active       tinyint(1)  null,
    constraint ix_users_email
        unique (email)
);

create index ix_users_id
    on users (id);

# Item表
create table items
(
    id          int auto_increment
        primary key,
    title       varchar(32) null,
    description varchar(32) null,
    owner_id    int         null,
    constraint items_ibfk_1
        foreign key (owner_id) references users (id)
);

create index ix_items_description
    on items (description);

create index ix_items_id
    on items (id);

create index ix_items_title
    on items (title);

create index owner_id
    on items (owner_id);

最后进入交互文档进行测试。

 

上一篇:c# – 解决服务在Startup.cs中使用DI


下一篇:git clone操作到开发机的错误记录