基础操作
import sqlalchemy import threading import datetime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine from sqlalchemy import Column,Integer,String,Text,ForeignKey,DateTime,UniqueConstraint,Table from sqlalchemy.orm import sessionmaker,relationship engine = create_engine('mysql+mysqlconnector://root:123456@127.0.0.1:3306/pysql?charset=utf8', max_overflow=1, # 超过连接池大小外最多创建的连接 pool_size=5, pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1) # 多久之后对线程池中的线程进行一次连接的回收(重置) Base = declarative_base() # def task(arg): # conn = engine.raw_connection() # cursor = conn.cursor() # cursor.execute( # "select * from role" # ) # result = cursor.fetchall() # print(result) # cursor.close() # conn.close() # # for i in range(20): # t = threading.Thread(target=task, args=(i,)) # t.start() class UserInfo(Base): __tablename__ = 'user_info' id = Column(Integer,primary_key=True,autoincrement=True) user_name = Column(String(64),index=True,nullable=False) email = Column(String(64), unique=True) # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间 create_time = Column(DateTime, default=datetime.datetime.now()) update_time = Column(DateTime, default=datetime.datetime.now) role_id = Column(Integer,ForeignKey('role.id')) # extra = Column(Text, nullable=True) role = relationship('Role', backref='user') # __table_args__ = ( # UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一 # Index('ix_id_name', 'name', 'email'), #索引 # ) def __repr__(self): return '<UserInfo: user_name:%s,email:%s >' %(self.user_name,self.email) class Role(Base): __tablename__ = 'role' id = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(64)) status = Column(Integer) def __repr__(self): return '{"name":"%s","status":%d}'%(self.name,self.status) role_authority = Table('role_authority',Base.metadata, Column('role_id',Integer,ForeignKey('role.id')), Column('api_authority_id',Integer,ForeignKey('api_authority.id')) ) class ApiAuthority(Base): __tablename__ = 'api_authority' id = Column(Integer,primary_key=True,autoincrement=True) url = Column(String(128),nullable=False,unique=True) method = Column(String(16),nullable=False) status = Column(Integer) roles = relationship("Role",secondary=role_authority,backref="authorities") # Base.metadata.create_all(engine) Connection = sessionmaker(bind=engine) session = Connection() # 新增 # role = Role(name='admin',status=0) # role1 = Role(name='sys',status=1) # role2 = Role(name='user',status=2) # # user = UserInfo(user_name='Tom',email='354782154@qq.com',role=role) # user1 = UserInfo(user_name='Tom',email='354782156@qq.com',role=role1) # user2 = UserInfo(user_name='Tom',email='354782178@qq.com',role=role2) # # session.add_all([role,role1,role2,user,user1,user2]) # # session.commit() # session.close() '''修改1''' # user = session.query(UserInfo).get(3) # user.user_name = 'Jack' # print(user) '''修改2''' # user2 = session.query(UserInfo).filter(UserInfo.id==5).update({'user_name':'Lili'}) '''根据外键关系映射获取role对象''' # user2 = session.query(UserInfo).filter_by(id=5).first() # print(user2.role) '''根据外键关系映射获取user对象''' # role = session.query(Role).get(4) # print(role.user) '''and 查询 or 查询 条件查询 ''' # 不等于 # roles = session.query(Role).filter(Role.status != 0).all() # print(roles) # 模糊查询 # roles = session.query(Role).filter(Role.name.like('%se%')).all() # print(roles) # in查询 # roles = session.query(Role).filter(Role.status.in_([1,2])).all() # print(roles) # is null # users = session.query(UserInfo).filter(UserInfo.user_name.is_(None)).all() # print(users) # is not null # users = session.query(UserInfo).filter(UserInfo.user_name.isnot(None)).all() # print(users) # and # from sqlalchemy import and_ # users = session.query(UserInfo).filter(and_(UserInfo.user_name=='Jack',UserInfo.role_id==4)).all() # print(users) # or # from sqlalchemy import or_ # users = session.query(UserInfo).filter(or_(UserInfo.role_id==4,UserInfo.role_id==5)).all() # print(users) # 组合and or # from sqlalchemy import or_,and_ # # 查询邮箱号以35开头并且role_id为4或者5 # users = session.query(UserInfo).filter(and_(UserInfo.email.like('35%'),or_(UserInfo.role_id==4,UserInfo.role_id==5))).all() # print(users) '''分页''' # users = session.query(UserInfo).filter(UserInfo.id >= 2).limit(2).offset((1 - 1) * 2).all() # print(users) # 删除 # res = session.query(UserInfo).filter(UserInfo.id==3).delete() # print(res) session.commit() session.close()
源码解读