SQLAlchemy数据库

ORM框架- SQLAlchemy – 潘登同学的flask学习笔记

文章目录

SQLAlchemy介绍

SQLAlchemy是一个ORM框架,

对象关系映射(英语:Object Relational Mapping,简称ORM,或O/RM,或O/R mapping),是一种程序设计技术,用于实现面向对象编程语言里不同类型系统的数据之间的转换。

随着项目的越来越大,采用写原生SQL的方式在代码中会出现大量重复的SQL语句,那么,问题就出现了:

  • SQL语句重复利用率不高,越复杂的SQL语句条件越多,代码越长,会出现很多相近的SQL语句
  • 很多SQL语句 是在业务逻辑中拼接出来的,如果数据库需要更改,就要去修改这些逻辑,这会容易
    漏掉对某些SQL语句的修改
  • 写SQL时容易忽略web安全问题,造成隐患

底层逻辑

而ORM可以通过类的方式去操作数据库而不用再写原生的SQL语句,通过把表映射成类,把行作为实例(一条数据),把字段作为属性,ORM在执行对象操作的时候最终还是会把对象的操作转换为数据库的原生语句,但使用ORM有许多优点:

  • 易用性:使用ORM做数据库开发可以有效减少重复SQL语句的概率,写出来的模型也更加直观、清晰
  • 性能损耗小:ORM转换成底层数据库操作指令确实会有一些开销。但是从实际情况来看,这种性能损耗很少(不足5%),只要不是针对性能有严苛的要求,综合考虑开发效率、代码阅读性,带来的好处远大于性能损耗,而且项目越大作用越明显。
  • 设计灵活:可以轻松的写出复杂的查询。
  • 可移植性:SQLAlchemy封装了底层的数据库实现,支持多个关系数据库引擎,包括流行的Mysql、PostgreSQL和SQLite,可以非常轻松的切换数据库。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AloSodks-1642174962208)(img/ORM框架.png)]

SQLAlchemy使用

先安装包

pip install pymysql
pip install sqlalchemy

在vscode中安装mysql组件

SQLAlchemy数据库

SLQAlchemy链接数据库

from sqlalchemy import create_engine

# 数据库的变量
HOST = '127.0.0.1'  # 自己本机的就是127.0.0.1 或 localhost
PORT = 3306
DATA_BASE = 'flask_db'
USER = 'root'
PWD = 'xxx'  # 这是你自己的啊,不记得别来找我
#  # DB_URI = f'数据库的名+驱动名://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
DB_URI = f'mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
engine = create_engine(DB_URI)

sql = 'select 1;'
conn = engine.connect()
rs = conn.execute(sql)
print(rs.fetchone())

SQLAlchemy数据库

因为数据库连接是要断开的,所有采用with写法

engine = create_engine(DB_URI) # 创建引擎
sql = 'create table t_use(id int primary key auto_increment, name varchar(32));'
# 链接数据库 with写法是因为要断开链接
with engine.connect() as conn:
    conn.execute(sql)

ORM模型

from sqlalchemy import create_engine,Column,Integer,String
from sqlalchemy.ext.declarative import declarative_base
前面到创建引擎都是照常操作

# 创建一个基础类
Base = declarative_base(engine)

# 传递一个引擎过去
class Person(Base):
    __tablename__ = 't_person'
    id = Column(Integer, primary_key=True,autoincrement=True)
    name = Column(String(32))
    age = Column(Integer)
    country = Column(String(32))

# 映射表结构
Base.metadata.create_all()

就能看到flask_db下多了一个t_person的表

SQLAlchemy的增删改查

SQLAlchemy数据库

from sqlalchemy.orm import sessionmaker
前面到创建引擎都是照常操作
Session = sessionmaker(engine)

# 创建一条数据
def create_data1():
    with Session() as session:
        p1 = Person(name='pd',age=19,country="CHina")
        session.add(p1)
        session.commit()

# 创建多条数据
def create_data_many():
    with Session() as session:
        p1 = Person(name='Daijiawei',age=80,country="saibotan")
        p2 = Person(name='gouzei',age=45,country="qingchao")
        p3 = Person(name='yechui',age=35,country="evening")
        session.add_all([p1,p2,p3])
        session.commit()

def query_data():
    # 查询所有数据
    with Session() as session:
        all_person = session.query(Person).all()
        for p in all_person:
            print(p.name)

def query_data1():
    # 查询一条数据
    with Session() as session:
        a_person = session.query(Person).first()
        print(a_person.name)

def query_data_by_params():
    # 根据参数进行查询
    with Session() as session:
        a_person = session.query(Person).filter_by(name="gouzei").first()
        b_person = session.query(Person).filter(Person.name=="Daijiawei").first()
        print(a_person.age,b_person.country)

def update_data():
    # 修改数据
    with Session() as session:
        p1 = session.query(Person).filter_by(name="Daijiawei").first()
        # 发现老师的年龄过载了30,要给他加30
        p1.age += 30
        # 修改完要提交事务
        session.commit()

def delete_data():
    # 删除数据
    with Session() as session:
        # 当然也能吧表里的所有数据进行删除,先获取所有数据用all() 然后遍历删除最后提交
        a_person = session.query(Person).filter_by(name="yechui").first()
        session.delete(a_person)
        session.commit()


if __name__ == '__main__':
    query_data_by_params()

SQLAlchemy的数据类型

  • Integer:整形,映射到数据库中是int类型。
  • Float:浮点类型,映射到数据库中是float类型。他占据的32位。
  • String:可变字符类型,映射到数据库中是varchar类型.
  • Boolean:布尔类型,映射到数据库中的是tinyint类型。
  • DECIMAL:定点类型。是专门为了解决浮点类型精度丢失的问题的。在存储钱相关的字段的时候建议使用这个数据类型。
    • 这个类型使用的时候需要传递两个参数,第一个参数是用来标记这个字段总能能存储多少个数字,第二个参数表示小数点后有多少位。
  • Enum:枚举类型。指定某个字段只能是枚举中指定的几个值,不能为其他值。在ORM模型中,使用Enum来作为枚举,示例代码如下:
class News(Base):
    __tablename__ = 't_news'
    tag = Column(Enum("python",'flask','django'))

在Python3中,已经内置了enum这个枚举的模块,我们也可以使用这个模块去定义相关的字段。示例代码如下:

class TagEnum(enum.Enum):
        python = "python"
        flask = "flask"
        django = "django"
class News(Base):
        __tablename__ = 't_news'
        id = Column(Integer,primary_key=True,autoincrement=True)
        tag = Column(Enum(TagEnum))
        news = News(tag=TagEnum.flask)

Date:存储时间,只能存储年月日。映射到数据库中是date类型。在Python代码中,可以使用 datetime.date 来指定。

  • DateTime:存储时间,可以存储年月日时分秒毫秒等。映射到数据库中也是datetime类型。在Python代码中,可以使用 datetime.datetime 来指定。
  • Time:存储时间,可以存储时分秒。映射到数据库中也是time类型。在Python代码中,可以使用 datetime.time 来支持那个。示例代码如下:
class News(Base):
    __tablename__ = 't_news'
    create_time = Column(Time)
    news = News(create_time=time(hour=11,minute=11,second=11))
  • Text:存储长字符串。一般可以存储6W多个字符。如果超出了这个范围,可以使用LONGTEXT类型。映射到数据库中就是text类型。
  • LONGTEXT:长文本类型,映射到数据库中是longtext类型。

SQLAlchemy列参数

  • primary_key:True设置某个字段为主键。
  • autoincrement:True设置这个字段为自动增长的。
  • default:设置某个字段的默认值。在发表时间这些字段上面经常用,写一个函数名就可以了不用加括号
  • nullable:指定某个字段是否为空。默认值是True,就是可以为空。
  • unique:指定某个字段的值是否唯一。默认是False。
  • onupdate:在数据更新的时候会调用这个参数指定的值或者函数。在第一次插入这条数据的时候,不会用onupdate的值,只会使用default的值。常用于是 update_time 字段(每次更新数据的时候都要更新该字段值)。
  • name:指定ORM模型中某个属性映射到表中的字段名。如果不指定,那么会使用这个属性的名字来作为字段名。如果指定了,就会使用指定的这个值作为表字段名。这个参数也可以当作位置参数,在第1个参数来指定。

可以把常用的一系列语句放在一个文件中,起名为db_util.py

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

# 数据库的变量
HOST = '127.0.0.1'  # 自己本机的就是127.0.0.1 或 localhost
PORT = 3306
DATA_BASE = 'flask_db'
USER = 'root'
PWD = 'xxx'
#  # DB_URI = f'数据库的名+驱动名://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
DB_URI = f'mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
engine = create_engine(DB_URI)
# 创建一个基础类
Base = declarative_base(engine)
Session = sessionmaker(engine)
from sqlalchemy import Column,Integer,String
from db_util import Base,Session
from datetime import datetime

def test():
    class News(Base):
        __tablename__ = 'New2'
        id = Column(Integer,primary_key=True,autoincrement=True)
        phone = Column(String(11), unique=True)
        title = Column(String(32),nullable = False)
        read_count = Column(Integer,default=1)
        create_time = Column(DateTime,default=datetime.now)
        update_time = Column(DateTime,default=datetime.now,onupdate=datetime.now)
    Base.metadata.create_all()
def update_data():
    with Session() as session:
        # 更新数据后 update_time也就会随之改变
        new1 = session.query(News).first()
        new1.read_count += 1
        session.commit()
if __name__ == '__main__':
    delete_data()

query函数的使用

  • 模型名。指定查找这个模型中所有的属性(对应查询表为全表查询) session.query(Person).all()
  • 模型中的属性。可以指定只查找某个模型的其中几个属性 session.query(Person.name,Person.age).all()
  • 聚合函数
    • func.count:统计行的数量。
    • func.avg:求平均值。
    • func.max:求最大值。
    • func.min:求最小值。
    • func.sum:求和。
from sqlalchemy import func
def query_data():
    with Session() as session:
        oldest = session.query(func.max(Person.age)).first()
        print(oldest)
        b_person = session.query(Person).filter(Person.age==oldest[0]).first()
        print(b_person.name)

filter过滤数据

  • equals
  • not equals
  • like & ilike
  • in
  • not in
  • is null
  • is not null
  • and
  • or
with Session() as session:
    dai = session.query(Person).filter(Person.name == 'Daijiawei').first()
    print(dai.name)
    not_dai = session.query(Person).filter(Person.name != 'Daijiawei').all()
    for i in not_dai:
            print(i.name)
    # 模糊匹配  %代表不知道填什么 gou% 表示 gouxxx %gou则表示xxxgou
    gou = session.query(Person).filter(Person.name.like('gou%')).first()
    print(gou.name)
    # in匹配
    d = session.quert(Person).filter(Person.name.in_(['d'])).all()
    for i in d:
        print(d.name)
    # and匹配
    re = session.query(Person).filter(and_(Person.name != 'pd',Person.age > 80))
    for i in re:
        print(i)

ORM建立表关系

表关系: 一对一, 一对多,多对多

外键

使用SQLAlchemy创建外键非常简单。在从表中增加一个字段,指定这个字段外键的是哪个表的哪个字段就可以了。从表中外键的字段,必须和主表的主键字段类型保持一致。

外键约束有以下几项

  • RESTRICT:若子表中有父表对应的关联数据,删除父表对应数据,会阻止删除。默认项
  • NO ACTION:在MySQL中,同RESTRICT。
  • CASCADE:级联删除。(删除父表,子表也删除)
  • SET NULL:父表对应数据被删除,子表对应数据项会设置为NULL。
class User(Base):
    __tablename__ = "t_user"
    id = Column(Integer,primary_key=True,autoincrement=True)
    uname = Column(String(50),nullable=False,name='name')  # 这个能将字段名改为name

    def __repr__(self) -> str:
        return f'<User: id={self.id} uname={self.uname}>'
    
class News(Base):
    __tablename__ = "t_news"
    id = Column(Integer,primary_key=True,autoincrement=True)
    title = Column(String(50),nullable=False)
    content = Column(Text,nullable=False)
    # 一个用户可以有多条新闻
    uid = Column(Integer,ForeignKey('t_user.id',ondelete='RESTRICT')) # 默认不准删除
    # uid = Column(Integer,ForeignKey('t_user.id',ondelete='NO ACTION')) # 默认不准删除
    # uid = Column(Integer,ForeignKey('t_user.id',ondelete='CASCADE')) # 级联删除
    # uid = Column(Integer,ForeignKey('t_user.id',ondelete='SET NULL')) # 父表对应数据被删除,子表对应数据项会设置为NULL。
    def __repr__(self) -> str:
        return f'<User: id={self.id} title={self.title} content={self.content} uid={self.uid}>'

def create_data2():
    class User(Base):
        __tablename__ = "t_user"
        id = Column(Integer,primary_key=True,autoincrement=True)
        uname = Column(String(50),nullable=False,name='name')  # 这个能将字段名改为name

    class News(Base):
        __tablename__ = "t_news"
        id = Column(Integer,primary_key=True,autoincrement=True)
        title = Column(String(50),nullable=False)
        content = Column(Text,nullable=False)
        # 一个用户可以有多条新闻
        uid = Column(Integer,ForeignKey('t_user.id'))

    user = User(uname='pd')
    news1 = News(title='python',content='flask',uid=1)
    news2 = News(title='MySQL',content='SQL',uid=1)
    with Session() as session:
        session.add(user)
        session.commit()
    # 先执行完主表的再执行下面的才行
    with Session() as session:
        session.add(news1)
        session.add(news2)
        session.commit()
if __name__ == '__main__':
    Base.metadata.create_all()
    create_data2()

一对多关系

一对多的时候,ForeignKey关键字要建立在多 一边

一对多的查询:

def query_data():
    # 通过news信息去获取user信息
    with Session() as session:
        news1 = session.query(News).first()
        uid = news1.uid
        user1 = session.query(User).filter(User.id == uid).first()
        print(user1)

改进查询,用relationship,这个类可以定义属性,以后在访问相关联的表的时候就直接可以通过属性访问的方式就可以访问得到了

可以通过 backref 来指定反向访问的属性名称。newss是指有多篇新闻。他们之间的关系是一个“一对多”的关系

from sqlalchemy.orm import relationship
class News(Base):
    __tablename__ = "t_news"
    id = Column(Integer,primary_key=True,autoincrement=True)
    title = Column(String(50),nullable=False)
    content = Column(Text,nullable=False)
    # 一个用户可以有多条新闻
    uid = Column(Integer,ForeignKey('t_user.id',ondelete='RESTRICT')) # 默认不准删除
    user = relationship('User')  # User对应类名 将主表的数据注入到子表

    def __repr__(self) -> str:
        return f'<User: id={self.id} title={self.title} content={self.content} uid={self.uid}>'

def query_data2():
    # 通过news信息去获取user信息
    with Session() as session:
        news1 = session.query(News).first()
        print(news1.user)

if __name__ == '__main__':
    query_data2()

如果想通过用户查找他的新闻,也可以通过类似的方式给User增加relationship来操作,但是这样的操作就类似于操作主表,检验在Newsrelationship中增加backref属性(类似于endpoint能反向查找回

user = relationship('User',backref='News')
def query_data3():
    # 通过news信息去获取user信息
    with Session() as session:
        user = session.query(User).first()
        print(user.News)
if __name__ == '__main__':
    query_data3()

一对一关系

登录表(只有用户名和密码)与用户表(用户信息表),核心语句login_user = relationship('LoginUser',backref=backref('info_user',uselist=False))

from sqlalchemy.orm import relationship,backref

class LoginUser(Base):
    __tablename__ = 't_user_login'
    id =Column(Integer,primary_key=True,autoincrement=True)
    uname =Column(String(32),nullable=False)
    passwd =Column(String(32),nullable=False)
    # user =relationship('InfoUser',uselist=False) # uselist=False 表示查找的不是列表 不友好,总有警告

    def __repr__(self) -> str:
       return f'<User: id={self.id} uname={self.uname} passwd={self.passwd}>'

class InfoUser(Base):
    __tablename__ = 't_user_info'
    id =Column(Integer,primary_key=True,autoincrement=True)
    name =Column(String(32),nullable=False,name='name')
    gender = Column(String(1))
    address = Column(String(64))
    login_id = Column(Integer,ForeignKey('t_user_login.id'))
    # 可以用InfoUser.login_user = LoginUser 也可以用 LoginUser.info_user = InfoUser来关联表
    login_user = relationship('LoginUser',backref=backref('info_user',uselist=False))

    def __repr__(self) -> str:
       return f'<User: id={self.id} name={self.name} gender={self.gender} address={self.address}>'
def create_data3():
    login = LoginUser(uname='pd', passwd='123')
    info = InfoUser(name='pd',gender='男',address='广东')
    # 可以用InfoUser.login_user = LoginUser 也可以用 LoginUser.info_user = InfoUser来关联表
    info.login_user = login  # 建立关联关系
    with Session() as session:
        session.add(login)  # 这里加哪个都可以
        session.commit()

def query_data4():
    with Session() as session:
        login_user = session.query(LoginUser).first()
        info_user = session.query(InfoUser).first()
        print('用login找info',login_user.info_user)
        print('用info找logo',info_user.login_user)

if __name__ == '__main__':
    Base.metadata.create_all()
    # create_data3()
    query_data4()

多对多关系

  • 多对多的关系需要通过一张中间表来绑定他们之间的关系。
  • 先把两个需要做多对多的模型定义出来
  • 使用Table定义一个中间表,中间表一般就是包含两个模型的外键字段就可以了,并且让他们两个来作为一个“复合主键”
  • 在两个需要做多对多的模型中随便选择一个模型,定义一个relationship属性,来绑定三者之间的关系,在使用relationship的时候,需要传入一个secondary=中间表对象名

如: 一个商品可以有多个标签,一个标签可以对应多个商品

核心关键,创建第三张表,在relationship处新增参数secondary=xxx

# 创建第三张表,来关联两个模型的数据关系
thing_tag = Table(
    't_thing_tag',  # 名字
    Base.metadata,  # 固定写法
    # 这样可以形成复合主键
    Column('things_id',Integer,ForeignKey("t_thing.id"),primary_key=True),
    Column('tag_id',Integer,ForeignKey("t_tag.id"),primary_key=True),
)

class Thing(Base):
    __tablename__ = 't_thing'
    id = Column(Integer,primary_key=True,autoincrement=True)
    title = Column(String(32),nullable=False)
    # 因为是给物品打标签而不给标签打物品,所以这样写
    tags = relationship('Tag',backref='thing',secondary=thing_tag)

    def __repr__(self) -> str:
        return f'<News: id={self.id} title={self.title}>'

class Tag(Base):
    __tablename__ = 't_tag'
    id = Column(Integer,primary_key=True,autoincrement=True)
    name = Column(String(32),nullable=False)

    def __repr__(self) -> str:
        return f'<News: id={self.id} title={self.name}>'

def create_data5():
    thing1 = Thing(title='电视机')
    thing2 = Thing(title='Python')
    tag1 = Tag(name='电器')
    tag2 = Tag(name='21世纪伟大发明')
    thing1.tags.append(tag1)
    thing1.tags.append(tag2)
    thing2.tags.append(tag2)
    with Session() as session:
        session.add(thing1)
        session.add(thing2)
        session.commit()

def query_data5():
    with  Session() as session:
        news = session.query(Thing).first()
        print(news.tags)

if __name__ == '__main__':
    Base.metadata.create_all()
    # create_data5()
    query_data5()

SQLAlchemy数据库

SQLAlchemy数据库

删除数据注意事项

  • ORM层面删除数据,会无视mysql级别的外键约束。
  • 直接会将对应的数据删除,然后将从表中的那个外键设置为NULL,也就是数据库的 SET NULL 。
  • 如果想要避免这种行为,应该将从表中的外键的 nullable=False 。

换成人话,当子表中有数据依赖主表的时候,删除主表时,允许删除,依赖字段全变为NULL,想要不允许删除,那么给子表的那个字段(外键)设置 nullable=False

# 删除的语法
with Session() as session:
    user = session.query(xxx).first()
    session.delete(user)
    session.commit()

理解级联操作操作

在SQLAlchemy,只要将一个数据添加到session中,和他相关联的数据都可以一起存入到数据库中了。

这些是怎么设置的呢?其实是通过relationship的时候,有一个关键字参数cascade可以设置这些属性

cascade属性值:

  • save-update:默认选项。在添加一条数据的时候,会把其他和他相关联的数据都添加到数据库中。这种行为就是save-update属性影响的。
  • delete:表示当删除某一个模型中的数据的时候,是否也删掉使用relationship和他关联的数据。
  • delete-orphan:表示当对一个ORM对象解除了父表中的关联对象的时候,自己便会被删除掉。当然如果父表中的数据被删除,自己也会被删除。这个选项只能用在一对多上,并且还需要在子模型中的relationship中,增加一个single_parent=True的参数。
  • merge:默认选项。当在使用session.merge,合并一个对象的时候,会将使用了relationship相关联的对象也进行merge操作。
  • expunge:移除操作的时候,会将相关联的对象也进行移除。这个操作只是从session中移除,并不会真正的从数据库中删除。
  • all:是对save-update, merge, refresh-expire, expunge, delete几种的缩写。

relationship 建在子表上的,删除主表的时候:(不推荐)

# 加入数据时只要加入User或News的一个就可以
# 但是删除主表的时候,会把子表的外键变成NULL
newss = relationship('News',backref='User',cascade='save-update') 

# 删除主表时,子表的外键也会删除
newss = relationship('News',backref='User',cascade='save-update,delete')

# 删除主表时,子表会被删除,除了删除操作以外,主表中对应子表的外键属性变为NULL时(更新操作),子表也会被删除
newss = relationship('News',backref='User',cascade='save-update,delete,delete-orphan',single_parent=True)

relationship 建在主表上的,删除子表的时候:

# 删除子表不影响
user = relationship('User',backref='News',cascade='save-update')

# 删除子表,主表被删除,另一个子表的外键变为NULL
user = relationship('User',backref='News',cascade='save-update,delete')

# 如果想实现在主表的relationship上,使用delete-orphan,可以这样写
user = relationship('User',backref=backref('News',cascade='save-update,delete,delete-orphan',single_parent=True))

ORM对数据的操作

数据排序

order_by方法排序:可以指定根据模型中某个属性进行排序,"模型名.属性名.desc()"代表的是降序排序。

relationship的方法中order_by属性:在指定relationship方法的时候,添加order_by属性来指定排序的字段。


class User(Base):
    __tablename__ = "t_user"
    id = Column(Integer,primary_key=True,autoincrement=True)
    uname = Column(String(50),nullable=False,name='name')  # 这个能将字段名改为name
    age = Column(Integer)

    def __repr__(self) -> str:
        return f'<User: id={self.id} uname={self.uname} age={self.age}>'
    
class News(Base):
    __tablename__ = "t_news"
    id = Column(Integer,primary_key=True,autoincrement=True)
    title = Column(String(50),nullable=False)
    content = Column(Text,nullable=False)
    # 一个用户可以有多条新闻
    uid = Column(Integer,ForeignKey('t_user.id',ondelete='RESTRICT')) # 默认不准删除
    user = relationship('User',backref=backref('News',cascade='save-update'))
    def __repr__(self) -> str:
        return f'<User: id={self.id} title={self.title} content={self.content} uid={self.uid}>'

def create_data8():
    Base.metadata.drop_all()  # 删除已有的表
    Base.metadata.create_all()
    with Session() as session:
        for i in range(10):
            user = User(uname=f'name{i}',age = randint(6,30))
            session.add(user)
        for j in range(10):
            news = News(title=f'title{i}',content=urandom(10),read_count=randint(5,120))
            user.News.append(news)
        session.commit()

def query_data_byage():
    with Session() as session:
        # 默认升序排列
        users = session.query(User).order_by(User.age).all()
        # 修改为降序
        users = session.query(User).order_by(User.age.desc()).all()
        for u in users:
            print(u)

if __name__ == '__main__':
    create_data8()
    query_data_byage()

如果不想每次都这样排序,一看用户就知道那个文章的阅读量最大,就像CSDN中左侧热门文章一样,可以在relationship中的backref加入order_by=read_count参数


user = relationship('User',backref=backref('News',cascade='save-update',order_by=read_count))

def query_data_News():
    with Session() as session:
        last_user = session.query(User).all()[-1]
        for i in last_user.News:
            print(i)

if __name__ == '__main__':
    query_data_News()

SQLAlchemy数据库

数据分页

  • limit:可以限制查询的时候只查询前几条数据。 属top-N查询
  • offset:可以限制查找数据的时候过滤掉前面多少条。可指定开始查询时的偏移量。
  • 切片:可以对Query对象使用切片操作,来获取想要的数据。
    • 可以使用 slice(start,stop) 方法来做切片操作。
    • 也可以使用 [start:stop] 的方式来进行切片操作。(常用)
# 其他都与上一个例子的一样
def query_by_limit():
    with Session() as session:
        news = session.query(News).limit(3).all()
        for i in news:
            print(i)

执行结果:

SQLAlchemy数据库

为什么最终的数据不是有序排列,因为那个order_by是加在User上的,只有调user时才会生效

其他两种方式

def query_by_offset():
    with Session() as session:
        news = session.query(News).offset(3).limit(3).all()
        for i in news:
            print(i)

def query_by_page():
    # 实现商品分页效果
    with Session() as session:
        temp = session.query(func.count(News.id)).first()[0]
        limit = 3
        j = 0
        while temp // limit != 0:
            news = session.query(News).offset(j*limit).limit(limit).all()
            j += 1
            temp -= limit
            for i in news:
                print(i)
            print("-------------------------------------------")
        news = session.query(News).offset(j*limit).all()
        for i in news:
                print(i)

def query_by_page1():
    # 实现商品分页效果
    with Session() as session:
        num = session.query(func.count(News.id)).first()[0]
        limit = 3
        j = 0
        while (j+1)*limit < num:
            news = session.query(News).slice(j*limit,(j+1)*limit).all()
            # news = session.query(News).all()[j*limit:(j+1)*limit]
            for i in news:
                print(i)
            print("-------------------------------------------")
            j += 1
        news = session.query(News).offset(j*limit).all()
        for i in news:
                print(i)

SQLAlchemy数据库

数据分组过滤

  • group_by 分组
  • having 分组后过滤(也可以用filter)
from db_util import Base,Session
from sqlalchemy import Column, ForeignKey,Integer,String,and_,Text,Table,func
from random import randint


class User(Base):
    __tablename__ = "t_user"
    id = Column(Integer,primary_key=True,autoincrement=True)
    uname = Column(String(50),nullable=False) 
    age = Column(Integer)

    def __repr__(self) -> str:
        return f'<User: id={self.id} uname={self.uname} age={self.age}>'


def create_user():
    Base.metadata.drop_all()  # 删除已有的表
    Base.metadata.create_all()
    with Session() as session:
        for i in range(100):
            user = User(uname=f'name{i}',age = randint(6,50))
            session.add(user)
        session.commit()

def query_data():
    # 统计每个年龄的人数有多少
    with Session() as session:
        rs = session.query(User.age,func.count(User.age)).group_by(User.age).all()
        print(rs)
    print('***********************************')
    # 基于上面加一个约束,大于18岁
    with Session() as session:
        rs = session.query(User.age,func.count(User.age)).group_by(User.age).filter(User.age>=18).all()
        rs = session.query(User.age,func.count(User.age)).group_by(User.age).having(User.age>=18).all()
        print(rs)

if __name__ == '__main__':
    create_user()
    query_data()

数据懒加载

数据懒加载主要解决的问题是: 加载主表的时候,子表不需要加载;或者加载子表的时候,把他过滤了再加载

这时候我们可以给relationship方法添加属性lazy='dynamic' ,以后通过 user.articles 获取到的就不是一个列表,而是一个AppenderQuery对象了。这样就可以对这个对象再进行一层过滤和排序等操作

lazy可用的选项:

  • select : (默认) 后台会用select语句一次性加载所有数据,即访问到属性的时候,就会全部加载该属性的数据
  • joined - 数据会被JOIN语句加载,即对关联的两个表进行join操作,从而获取到所有相关的对象
  • subquery - 数据被用subquery子查询SQL语句加载
  • dynamic :这个也是懒加载。在访问属性的时候,并不在内存中加载数据,而是返回一个 AppenderQuery 对象, 需要执行相应方法才可以获取对象。适用于数据量大的时候

我们执行以下语句会返回Query对象,Query对象到低是个啥?

with Session() as session:
    user = session.query(User)
    print(type(user))

去看源码!点进orm中,找到Query对象,他内置的方法就是我们之前调用的filter,first等过滤方法

SQLAlchemy数据库

relationship加入懒加载属性lazy='dynamic',执行下面的语句,发现返回的是一个AppenderQuery,那么AppenderQuery对象又是啥?

user = relationship('User',backref=backref('News',cascade='save-update',order_by=read_count,lazy='dynamic'))
def query_lazy():
    with Session() as session:
        user = session.query(User).all()
        news = user[-1].News
        print(type(news))

去看源码! 点进orm中,找到dyanmic.py下的AppenderQuery对象

SQLAlchemy数据库

所以我们就能对第一次筛选出来的user,进行二次筛选

def query_lazy():
    with Session() as session:
        user = session.query(User).all()
        news = user[-1].News.filter(News.read_count > 50).all()
        for i in news:
            print(i)

if __name__ == '__main__':
    query_lazy()

SQLAlchemy数据库

注意 lazy=“dynamic” 只可以用在一对多多对对关系中,不可以用在一对一和多对一中。这样也合理:如果返回结果很少的话,就没必要延迟加载数据了。

高级查询

高级查询(join)

表A 表B
AID Num BID Num
1 a123456 1 b84684
2 a123452 2 b84681
3 a123453 3 b84682
4 a123454 4 b84683
5 a123455 8 b84688

left join是以A表的记录为基础的,A可以看成左表,B可以看成右表,left join是以左表为准的.

左连接
AID Num Num BID
1 a123456 b84684 1
2 a123452 b84681 2
3 a123453 b84682 3
4 a123454 b84683 4
5 a123455 Null Null
右连接
AID Num Num BID
1 a123456 b84684 1
2 a123452 b84681 2
3 a123453 b84682 3
4 a123454 b84683 4
Null Null b84688 8

inner join(相等连接或内连接)

相等连接
AID Num Num BID
1 a123456 b84684 1
2 a123452 b84681 2
3 a123453 b84682 3
4 a123454 b84683 4

很明显,这里只显示出了 A.aID = B.bID 的记录.这说明inner join并不以谁为基础,它只显示符合条件的记录.

join的使用

  • join分为left join(左外连接)和right join(右外连接)以及内连接(等值连接)。
  • 在sqlalchemy中,使用join来完成内连接。在写join的时候,如果不写join的条件,那么默认将使
    用外键来作为条件连接。
  • 查询出来的字段,跟join后面的东西无关,而是取决于query方法中传了什么参数。(模型名=全
    表;模型名.属性=表名.字段)。
  • 在sqlalchemy中,使用outerjoin来完成外连接(默认是左外连接)。
def query_join():
    # 找到所有用户,新闻的数量,并且按照新闻的数量进行排序
    with Session() as session:
        # join里面放对象类名时,默认会以外键作为关联条件
        rs = session.query(User.uname,func.count(News.id)).join(News).group_by(User.id).order_by(func.count(News.id)).all()
        print(rs)

SQLAlchemy数据库

子查询(subquery)

子查询即select语句中还有select。

那么在sqlalchemy中,要实现一个子查询,需以下几个步骤:

  • 将子查询按照传统的方式写好查询代码,然后在 query 对象后面执行 subquery 方法,将这个查询变成一个子查询。
  • 在子查询中,将以后需要用到的字段通过 label 方法,取个别名。
  • 在父查询中,如果想要使用子查询的字段,那么可以通过子查询的返回值上的 c 属性拿到(c=Column)。

应用场景: 给一个人匹配对象,要求,城市所在地相同,且年龄相同,以前方法,先把这个人的城市与年龄查到,再去过滤数据,新方法一句话解决

from db_util import Base,Session
from sqlalchemy import Column,Integer,String,and_,Text,Table,func
from random import choice, randint


class User(Base):
    __tablename__ = "t_user"
    id = Column(Integer,primary_key=True,autoincrement=True)
    uname = Column(String(50),nullable=False) 
    city = Column(String(50),nullable=False)
    age = Column(Integer)

    def __repr__(self) -> str:
        return f'<User: id={self.id} uname={self.uname} city={self.city} age={self.age}>'

def  add_data():
    Base.metadata.drop_all()
    Base.metadata.create_all()
    name = ['Daijiawei','gouzei','yechui']
    city = ['dalian','guangdong','shanghai','shenzhen']
    
    with Session() as session:
        for i in range(30):
            user = User(uname=choice(name),age=randint(3,10),city=choice(city))
            session.add(user)
        user = User(uname='pd',age=randint(3,10),city=choice(city))
        session.add(user)
        session.commit()

def query_data():
    # 需求:找到与pd在一个城市且年龄一样的人
    with Session() as session:
        sub_sql = session.query(User.city.label('qcity'),User.age.label('qage')).filter(User.uname =='pd').subquery()
        rs = session.query(User).filter(User.city==sub_sql.c.qcity,User.age==sub_sql.c.qage).all()
        for i in rs:
            print(i)


if __name__ == '__main__':
    add_data()
    query_data()

封装与优化

FLAK-SQLAlchemy的使用

Flask-SQLAlchemy是对SQLAlchemy进行了一个简单的封装的一个插件,使得我们在flask中使用sqlalchemy更加的简单。

pip install flask-sqlalchemy

数据库连接与使用

  • 数据库初始化不再是通过create_engine。

    • 跟sqlalchemy一样,定义好数据库连接字符串DB_URI。
    • 将这个定义好的数据库连接字符串DB_URI,通过 SQLALCHEMY_DATABASE_URI 这个key名配置到 app.config中。
    • 使用 flask_sqlalchemy.SQLAlchemy 这个类定义一个对象,并将 app 传入进去。
  • 之前都是通过Base = declarative_base()来初始化一个基类,然后再继承,在Flask-SQLAlchemy中更加简单了

    • 还是跟使用sqlalchemy一样,定义模型。现在不再是需要使用 delarative_base 来创建一个基类。而是使用 db.Model 来作为基类
    • 在模型类中, ColumnStringInteger 以及 relationship 等,都不需要导入了,直接使用 db下面相应的属性名就可以了
    • 在定义模型的时候,可以不写 __tablename__ ,那么 flask_sqlalchemy 会默认使用当前的模型的名字转换成小写来作为表的名字(但还是建议自己写)
  • 写完模型类后,要将模型映射到数据库的表中,使用以下代码即可

    • 1.删除数据库表: db.drop_all()
    • 2.创建数据库表: db.create_all()
  • Session的使用:以后session也不需要使用 sessionmaker 来创建了,直接使用 db.session 就可以了,操作这个session的时候就跟之前的 sqlalchemysession 是一样一样的。

from black import read_cache
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from pkg_resources import run_script
from db_util import DB_URI
# 创建对象
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = DB_URI
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False  # vscode提示要改为false,不改不报错
# 连接数据库
db = SQLAlchemy(app)

# 创建模型类
class User(db.Model):
    __tablename__ = "t_user"
    id = db.Column(db.Integer,primary_key=True,autoincrement=True)
    uname = db.Column(db.String(50),nullable=False) 
    age = db.Column(db.Integer)

    def __repr__(self) -> str:
        return f'<User: id={self.id} uname={self.uname} age={self.age}>'

class News(db.Model):
    __tablename__ = "t_news"
    id = db.Column(db.Integer,primary_key=True,autoincrement=True)
    title = db.Column(db.String(50),nullable=False)
    content = db.Column(db.Text,nullable=False)
    read_count = db.Column(db.Integer)
    # 一个用户可以有多条新闻
    uid = db.Column(db.Integer,db.ForeignKey('t_user.id')) # 默认不准删除
    user = db.relationship('User',backref=db.backref('News'))
    def __repr__(self) -> str:
        return f'<User: id={self.id} title={self.title} content={self.content} read_count={self.read_count} uid={self.uid}>'

# 删除表
db.drop_all()
# 创建表
db.create_all()

# 增
def create_data():
    user = User(uname='pd',age=19)
    news = News(title='再学Python',content='aaaa',read_count=5)
    user.News.append(news)
    db.session.add(user)
    db.session.commit()

# 查
def query_data():
    # 查询单表的时候可以直接用类名,也可以用之前的方式
    user = User.query.all()
    print(user)
    # 查询多表
    rs = db.session.query(User,News.content).join(News,News.uid == User.id).all()
    print(rs)

# 改
def update_data():
    user = User.query.filter(User.uname=='pd').first()
    user.uname = 'laoshi'
    db.session.commit()

# 删
def delete_data():
    news = News.query.first()
    db.session.delete(news)
    db.session.commit()

if __name__ == '__main__':
    create_data()
    # query_data()
    # update_data()
    # delete_data()

数据库迁移工具ablembic

  • alembic是sqlalchemy的作者开发的,用来做ORM模型与数据库的迁移与映射,
  • alembic使用方式跟git有点了类似,(能回档)
  • alembic的所有命令都是以alembic开头,
  • alembic的迁移文件也是通过版本进行控制的,
pip install alembic

SQLAlchemy数据库

只是第一次操作的时候要执行完整步骤,后续修改的时候只需要修改模型,生成迁移文件与映射数据库即可

新建一个文件夹Alembic_demo,在命令行将目录切换到该目录下,再初始化

cd ./Alembic_demo/
alembic init alembic

操作完后,该目录下会生成一个alembic.ini的文件,点进去,找到

sqlalchemy.url = driver://user:pass@localhost/dbname
# 修改为下面
sqlalchemy.url = mysql+pymysql://root:xxx@127.0.0.1/flask_db
# flask_db是之前创建的数据库名

接着就是同步模型,在Alembic_demo文件下,创建一个models.py文件,先把flask_db中所有表都删掉,要是不删的话,重建一个库

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, ForeignKey,Integer,String,and_,Text,Table,func
from sqlalchemy import create_engine
# 数据库的变量
HOST = '127.0.0.1'  # 自己本机的就是127.0.0.1 或 localhost
PORT = 3306
DATA_BASE = 'flask_db'
USER = 'root'
PWD = 'xxx'
#  # DB_URI = f'数据库的名+驱动名://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
DB_URI = f'mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
engine = create_engine(DB_URI)
# 创建一个基础类
Base = declarative_base(engine)
Session = sessionmaker(engine)

class User(Base):
    __tablename__ = "t_user"
    id = Column(Integer,primary_key=True,autoincrement=True)
    uname = Column(String(50),nullable=False) 
    age = Column(Integer)

    def __repr__(self) -> str:
        return f'<User: id={self.id} uname={self.uname} age={self.age}>'

点开当前目录下alembic的文件,找到env.py

target_metadata = None
# 修改为
import sys,os
# 返回两次上级目录
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
import models
target_metadata = models.Base.metadata

然后再终端输入指令,进行生成迁移文件

cd ./Alembic_demo/
alembic revision --autogenerate -m 'model_file'

然后在versions中会出现一个新的python文件,里面会有版本号,还有各种操作

Revision ID: edc3c58b2c48

将生成的迁移文件映射到数据库中

cd ./Alembic_demo/
alembic upgrade head

更新model后

依次执行以下代码即可更新

cd ./Alembic_demo/
alembic revision --autogenerate -m 'model_file_0.0.1'
alembic upgrade head

如果要降级

alembic downgrade head

alembic常见错误

问题:
创建新版本时报错 FAILED: Target database is not up to date

原因:
主要是heads和current不相同。current落后于heads的版本

解决办法:
将current移动到head上。alembic upgrade head

问题
创建新版本时报错 KeyError: '087f047901d6' 或者 FAILED:Can't locate revision identified by 'da3a8bee2343

原因
数据库中存的版本号不在迁移脚本文件中

解决办法:
删除versions中所有的迁移文件,删除数据库所有表

Flask-SQLAlchemy与alembic的结合

刚才的Model.py改为

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
# 创建对象
app = Flask(__name__)

DB_URI = mysql+pymysql://root:xxx@127.0.0.1/flask_db
app.config['SQLALCHEMY_DATABASE_URI'] = DB_URI
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False  # vscode提示要改为false,不改不报错
# 连接数据库
db = SQLAlchemy(app)

# 创建模型类
class User(db.Model):
    __tablename__ = "t_user"
    id = db.Column(db.Integer,primary_key=True,autoincrement=True)
    uname = db.Column(db.String(50),nullable=False) 
    age = db.Column(db.Integer)

    def __repr__(self) -> str:
        return f'<User: id={self.id} uname={self.uname} age={self.age}>'

class News(db.Model):
    __tablename__ = "t_news"
    id = db.Column(db.Integer,primary_key=True,autoincrement=True)
    title = db.Column(db.String(50),nullable=False)
    content = db.Column(db.Text,nullable=False)
    read_count = db.Column(db.Integer)
    # 一个用户可以有多条新闻
    uid = db.Column(db.Integer,db.ForeignKey('t_user.id')) # 默认不准删除
    user = db.relationship('User',backref=db.backref('News'))
    def __repr__(self) -> str:
        return f'<User: id={self.id} title={self.title} content={self.content} read_count={self.read_count} uid={self.uid}>'

然后确保flask_db中没有表的情况下,再执行一遍上面的操作即可,唯一不同的就是修改env.py

target_metadata = None
# 修改为
import sys,os
# 返回两次上级目录
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
import models
target_metadata = models.db.Model.metadata

Flask-Migrate封装Alembic

flask-migrate是flask的一个扩展模块,主要是扩展数据库表结构的。flask-migrate是flask的一个扩展模块,主要是扩展数据库表结构的。

pip install flask-migrate

注意版本问题,flask用2以上的版本,flask-migrate用3以上的版本

SQLAlchemy数据库

# 前面内容与上面的修改后的Model.py一致
from flask_migrate import Migrate

Migrate(app,db)

初始化时,依次执行命令(还是确保flask_db中没有表)

cd ./Alembic_demo/
flask db init  // 创建迁移仓库
flask db migrate -m 'model_file_0.0.1' // 生成脚本文件
flask db upgrade  // 更新数据库

下次更新的时候只需要执行后两步就可以

其他指令

flask db current // 当前版本号
flask db history // 示head指向的脚本文件版本号
flask db heads // head指向的脚本文件版本号

flask db downgrade xxx(之前的版本号)  // 返回以前版本
上一篇:MongoDB的安装


下一篇:MongoDB Sharding Cluster部署(docker方式)