python orm之sqlalchemy

基础操作

import sqlalchemy
import threading
import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column,Integer,String,Text,ForeignKey,DateTime,UniqueConstraint,Table
from sqlalchemy.orm import sessionmaker,relationship

engine = create_engine('mysql+mysqlconnector://root:123456@127.0.0.1:3306/pysql?charset=utf8',
                       max_overflow=1, # 超过连接池大小外最多创建的连接
                       pool_size=5,
                       pool_timeout=30, #  池中没有线程最多等待的时间,否则报错
                       pool_recycle=-1) # 多久之后对线程池中的线程进行一次连接的回收(重置)


Base = declarative_base()



# def task(arg):
#     conn = engine.raw_connection()
#     cursor = conn.cursor()
#     cursor.execute(
#         "select * from role"
#     )
#     result = cursor.fetchall()
#     print(result)
#     cursor.close()
#     conn.close()
#
# for i in range(20):
#     t = threading.Thread(target=task, args=(i,))
#     t.start()


class UserInfo(Base):
    __tablename__ = 'user_info'
    id = Column(Integer,primary_key=True,autoincrement=True)
    user_name = Column(String(64),index=True,nullable=False)

    email = Column(String(64), unique=True)
    # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    create_time = Column(DateTime, default=datetime.datetime.now())
    update_time = Column(DateTime, default=datetime.datetime.now)

    role_id = Column(Integer,ForeignKey('role.id'))
    # extra = Column(Text, nullable=True)
    role = relationship('Role', backref='user')

    # __table_args__ = (
        # UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
        # Index('ix_id_name', 'name', 'email'), #索引
    # )

    def __repr__(self):
        return '<UserInfo: user_name:%s,email:%s >' %(self.user_name,self.email)

class Role(Base):
    __tablename__ = 'role'
    id = Column(Integer,primary_key=True,autoincrement=True)
    name = Column(String(64))
    status = Column(Integer)

    def __repr__(self):
        return '{"name":"%s","status":%d}'%(self.name,self.status)

role_authority = Table('role_authority',Base.metadata,
                        Column('role_id',Integer,ForeignKey('role.id')),
                        Column('api_authority_id',Integer,ForeignKey('api_authority.id'))
                        )

class ApiAuthority(Base):
    __tablename__ = 'api_authority'
    id = Column(Integer,primary_key=True,autoincrement=True)
    url = Column(String(128),nullable=False,unique=True)
    method = Column(String(16),nullable=False)
    status = Column(Integer)
    roles = relationship("Role",secondary=role_authority,backref="authorities")


# Base.metadata.create_all(engine)


Connection = sessionmaker(bind=engine)

session = Connection()
# 新增
# role = Role(name='admin',status=0)
# role1 = Role(name='sys',status=1)
# role2 = Role(name='user',status=2)
#
# user = UserInfo(user_name='Tom',email='354782154@qq.com',role=role)
# user1 = UserInfo(user_name='Tom',email='354782156@qq.com',role=role1)
# user2 = UserInfo(user_name='Tom',email='354782178@qq.com',role=role2)
#
# session.add_all([role,role1,role2,user,user1,user2])
#
# session.commit()
# session.close()

'''修改1'''
# user = session.query(UserInfo).get(3)
# user.user_name = 'Jack'

# print(user)


'''修改2'''
# user2 = session.query(UserInfo).filter(UserInfo.id==5).update({'user_name':'Lili'})
'''根据外键关系映射获取role对象'''
# user2 = session.query(UserInfo).filter_by(id=5).first()
# print(user2.role)
'''根据外键关系映射获取user对象'''
# role = session.query(Role).get(4)
# print(role.user)

'''and 查询 or 查询 条件查询 '''

# 不等于
# roles = session.query(Role).filter(Role.status != 0).all()
# print(roles)

# 模糊查询
# roles = session.query(Role).filter(Role.name.like('%se%')).all()
# print(roles)

# in查询
# roles = session.query(Role).filter(Role.status.in_([1,2])).all()
# print(roles)

# is null
# users = session.query(UserInfo).filter(UserInfo.user_name.is_(None)).all()
# print(users)

# is not null
# users = session.query(UserInfo).filter(UserInfo.user_name.isnot(None)).all()
# print(users)

# and
# from sqlalchemy import and_
# users = session.query(UserInfo).filter(and_(UserInfo.user_name=='Jack',UserInfo.role_id==4)).all()
# print(users)

# or
# from sqlalchemy import or_
# users = session.query(UserInfo).filter(or_(UserInfo.role_id==4,UserInfo.role_id==5)).all()
# print(users)
# 组合and or
# from sqlalchemy import or_,and_
# # 查询邮箱号以35开头并且role_id为4或者5
# users = session.query(UserInfo).filter(and_(UserInfo.email.like('35%'),or_(UserInfo.role_id==4,UserInfo.role_id==5))).all()
# print(users)

'''分页'''
# users = session.query(UserInfo).filter(UserInfo.id >= 2).limit(2).offset((1 - 1) * 2).all()
# print(users)

# 删除
# res = session.query(UserInfo).filter(UserInfo.id==3).delete()
# print(res)



session.commit()
session.close()

  

源码解读

 

上一篇:微信小程序实现获取用户高清头像


下一篇:django-model操作数据库(增删该查)