准备数据
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer, String, Text, Date, DateTime, ForeignKey, UniqueConstraint, Index
from sqlalchemy import create_engine
from sqlalchemy.orm import relationship Base = declarative_base() class Depart(Base):
__tablename__ = 'depart'
id = Column(Integer, primary_key=True)
title = Column(String(32), index=True, nullable=False) class Users(Base):
__tablename__ = 'users' id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
depart_id = Column(Integer, ForeignKey("depart.id")) # 用于链表操作 与表的创建无关
dp = relationship("Depart", backref='pers') class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False) course_list = relationship('Course', secondary='student2course', backref='student_list') class Course(Base):
__tablename__ = 'course'
id = Column(Integer, primary_key=True)
title = Column(String(32), index=True, nullable=False) class Student2Course(Base):
__tablename__ = 'student2course'
id = Column(Integer, primary_key=True, autoincrement=True)
student_id = Column(Integer, ForeignKey('student.id'))
course_id = Column(Integer, ForeignKey('course.id')) __table_args__ = (
UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引
# Index('ix_id_name', 'name', 'extra'), # 联合索引
) def create_all():
engine = create_engine(
"mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
) Base.metadata.create_all(engine) def drop_all():
engine = create_engine(
"mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.drop_all(engine) if __name__ == '__main__':
# drop_all()
create_all()
models.py
基本操作
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users, Student, Depart engine = create_engine(
"mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine) # 从连接池获取一个连接
session = SessionFactory() # ############################## 基本增删改查 ###############################
# 1. 增加
obj = Users(name='tang')
session.add(obj)
session.commit() # 批量增加
session.add_all([
Users(name='tang'),
Users(name='chen')
])
session.commit() # 2. 查
result = session.query(Users).all()
for row in result:
print(row.id,row.name) # sqlalchemy 的语法跟Python很相似
result = session.query(Users).filter(Users.id >= 2)
for row in result:
print(row.id,row.name) # 获取第一个
result = session.query(Users).filter(Users.id >= 2).first()
print(result) # 3.删
session.query(Users).filter(Users.id >= 2).delete()
session.commit() # 4.改 通过字典
session.query(Users).filter(Users.id == 4).update({Users.name:'tang'})
session.query(Users).filter(Users.id == 4).update({'name':'tang'})
session.query(Users).filter(Users.id == 4).update({'name':Users.name+"_lao"},synchronize_session=False)
session.commit() # ############################## 其他常用 ###############################
# 1. 指定列 去别名
# 对应原生SQL:select id,name as cname from users;
result = session.query(Users.id,Users.name.label('cname')).all()
for item in result:
print(item[0],item.id,item.cname) # 2. 默认条件and
session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() # 3. between
session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() # 4. in
session.query(Users).filter(Users.id.in_([1,3,4])).all()
# not in
session.query(Users).filter(~Users.id.in_([1,3,4])).all() # 5. 子查询
session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='tang'))).all() # 6. and 和 or
from sqlalchemy import and_, or_
session.query(Users).filter(Users.id > 3, Users.name == 'tang').all()
session.query(Users).filter(and_(Users.id > 3, Users.name == 'tang')).all()
session.query(Users).filter(or_(Users.id < 2, Users.name == 'tang')).all()
session.query(Users).filter(or_(Users.id < 2,and_(Users.name == 'eric', Users.id > 3),Users.extra != "")).all() # 7. filter_by 只需字段名
session.query(Users).filter_by(name='alex').all() # 8. 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all() # 9. 切片
result = session.query(Users)[1:2] # 10.排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 11. group by
from sqlalchemy.sql import func ret = session.query(Users.depart_id,func.count(Users.id),).group_by(Users.depart_id).all()
for item in ret:
print(item)
#
# from sqlalchemy.sql import func
# 分组之后再进行查询
ret = session.query(
Users.depart_id,
func.count(Users.id),
).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()
for item in ret:
print(item) # 12.union 和 union all
"""
select id,name from users
UNION
select id,name from users;
"""
"""
select id,name from users
UNION ALL
select id,name from users;
"""
q1 = session.query(Depart.title).filter(Depart.id > 2)
q2 = session.query(Student.name).filter(Student.id < 2)
ret = q1.union(q2).all()
#
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Favor.caption).filter(Favor.nid < 2)
# ret = q1.union_all(q2).all() """
union 和 union_all 的区别
union 去重
union_all 不去重 相同点:合并的两张表的列要相同
""" """
union 和 join的区别
union是垂直合并成一张表
join是水平合并成一张表
""" """
查看原生sql 打印不获取结果的语句就可以
sql = session.query(Users).filter(Users.id==1)
print(sql)
""" session.close()
链表操作 与 外键relation字段的使用
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users,Depart engine = create_engine(
"mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
) SessionFactory = sessionmaker(bind=engine)
session = SessionFactory() # 单表操作
ret = session.query(Users).all()
for row in ret:
print(row.id,row.name, row.depart_id) # 链表操作
ret = session.query(Users.id, Users.name, Depart.title).join(Depart,Users.depart_id==Depart.id).all()
for row in ret:
print(row.id, row.name, row.title) # isouter 表示 left join 没有right join 只能调换查询顺序
ret = session.query(Users.id, Users.name, Depart.title).join(Users,isouter=True).all()
# print(ret)
for row in ret:
print(row.id, row.name, row.title) # 3. relation字段:查询所有用户+所属部门名称
ret = session.query(Users).all()
for row in ret:
# relation dp的作用
print(row.id,row.name,row.depart_id, row.dp.title) # 4. relation字段:查询销售部所有的人员
ret = session.query(Depart).filter(Depart.title=='销售部').first()
for row in ret.pers:
print(row.id, row.name, ret.title) # 5. 创建一个名称叫:IT部门,再在该部门中添加一个员工:tanglaoer
u1 = Users(name='tanglaoer',dp=Depart(title='IT'))
session.add(u1)
session.commit() # 6. 创建一个名称叫:技术部,再在该部门中添加一个员工:tang lao san
d1 = Depart(title='技术部')
d1.pers = [Users(name='tang'),Users(name='lao'), Users(name='san')]
session.add(d1)
session.commit() # 在已存在的技术部 添加几名员工
d1 = session.query(Depart).filter(Depart.title == '技术部').first()
d1.pers = [Users(name='LIN'), Users(name='WU'),Users(name='SEN')]
session.add(d1)
session.commit() session.close()
Foreign and join
多对多操作
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Student, Course, Student2Course engine = create_engine(
"mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
) SessionFactory = sessionmaker(bind=engine) session = SessionFactory()
# 1. 录入数据
session.add_all([
Student(name='tang'),
Student(name='chen'),
Course(title='生物'),
Course(title='体育'),
])
session.commit() # 可批量增加多对多外键
session.add_all([
Student2Course(student_id=2,course_id=1),
Student2Course(student_id=1,course_id=1),
Student2Course(student_id=1,course_id=2),
]) # 2. 三张表关联
ret = session.query(Student2Course.id, Student.name, Course.title, Course.id).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).order_by(Course.id.asc()).all()
print(ret)
session.commit() # 3. “tang”选的所有课
ret = session.query(Student2Course.id, Student.name, Course.title, Course.id).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).filter(Student.name=='tang').all()
print(ret) # relation 字段的使用
ret = session.query(Student).filter(Student.name== 'tang').first()
for row in ret.course_list:
print(row.title) # 4. 选了“生物”的所有人
# relation 字段的方向使用
ret = session.query(Course).filter(Course.title == '生物').first()
for row in ret.student_list:
print(row.name, ret.title) # 5. 创建一个课程,创建2学生,两个学生选新创建的课程。
obj = Course(title='英语')
obj.student_list = [Student(name='lin'), Student(name='wu')]
session.add(obj)
session.commit() # 创建一个学生,加入多门新创建课程
stu = Student(name='tang')
stu.course_list = [Course(title='数学'), Course(title='地理')]
session.add(stu)
session.commit() # 把tang添加到已存在的课程中
from sqlalchemy import or_
stu = session.query(Student).filter(Student.name=='tang').first()
stu.course_list = session.query(Course).filter(or_(Course.id == 1, Course.id ==3)).all()
print(stu.course_list)
session.add(stu)
session.commit() session.close()
many2many
sqlalchemy 连接与多线程的操作
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Student
engine = create_engine(
"mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine) def task():
# 去连接池中获取一个连接
# 第一版本
session = SessionFactory() ret = session.query(Student).all()
print(ret)
# 将连接交还给连接池
session.close() from threading import Thread for i in range(20):
t = Thread(target=task)
t.start()
第一版本
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Student,Course,Student2Course engine = create_engine(
"mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)
# scoped_session 里面有threading.local
# 为每个线程赋予一个连接 def task():
ret = session.query(Student).all()
print(ret)
# 将连接交还给连接池
session.remove() from threading import Thread for i in range(20):
t = Thread(target=task)
t.start()
第二版本scoped_session
sqlalchemy 写原生SQL语句
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session engine = create_engine(
"mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory) def task():
""""""
# 方式一:
# 查询
cursor = session.execute('select * from users')
result = cursor.fetchall()
print(result) # 添加 参数通过"冒号"
cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'tanglaoer'})
session.commit()
print(cursor.lastrowid) # 方式二:
# 与pymysql的链接一模一样
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from users"
)
result = cursor.fetchall()
print(result)
cursor.close()
conn.close() # 将连接交还给连接池
session.remove() from threading import Thread for i in range(20):
t = Thread(target=task)
t.start()
原生SQL