1.首先建立一个空py文件(这里命名为connect.py):
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker # 配置信息 HOSTNAME = ‘127.0.0.1‘ PORT = ‘3306‘ DATABASE = ‘t_test‘ USERNAME = ‘root‘ PASSWORD = ‘qwe123‘ # 然后设置一个字符串的格式: db_url=‘mysql+pymysql://{}:{}@{}/{}?charset=utf8‘.format( USERNAME, PASSWORD, HOSTNAME, DATABASE ) # 创建一个引擎: engine = create_engine(db_url) # 将引擎作为参数导入declarative_base()方法,返回一个类: Base = declarative_base(engine) # 同时需要创建一个会话窗,即映射: Session = sessionmaker(engine) session = Session() # 验证是否成功可以在尾端进行如下操作: if __name__==‘__main__‘: print(dir(Base)) print(dir(session))
2.新建一个py文件(这里命名为user_modules.py),用来创建一个user表单。
首先从SQLAlchemy导入创建数据库记录的格式,同时将上个connect.py文件里创建的Base和session导入进来,下面也需要用到datetime的模块,将其也导入进来:
from datetime import datetime from connect import Base,session from sqlalchemy import Column,Integer,String,DateTime,Boolean # 定义一个User类,继承Base class User(Base): __tablename__ = ‘user‘ #表格名字 id = Column(Integer,primary_key=True,autoincrement=True) username = Column(String(20),nullable=False) password = Column(String(50)) creatime = Column(DateTime,default=datetime.now) _locked = Column(Boolean,default=False,nullable=False) # 自己封装查询方法 @classmethod def get_all(cls): return session.query(cls).all() @classmethod def get_id(cls,id): return session.query(cls).filter_by(id=id).all() @classmethod def get_username(cls,username): return session.query(cls).filter_by(username=username).all() @property def locked(self): return self._locked def __repr__(self): return "<User(id=‘%s‘,username=‘%s‘,password=‘%s‘,creatime=‘%s‘,_locked=‘%s‘)>"%( self.id, self.username, self.password, self.creatime, self._locked ) # 最后再执行创建 if __name__==‘__main__‘: Base.metadata.create_all()
可以登录数据库查看一下user表是否被创建出来。
3.最后再创建一个py文件(这里命名为test_user.py),用来测试对user表的“增删改查”
from user_modules import User from connect import session #增 def add_user(): # person = User(username=‘suyn‘,password=‘123456‘) # session.add(person) session.add_all([User(username=‘der‘,password=‘asdfas‘),User(username=‘sa‘,password=‘mima99‘)]) session.commit() #删 def delete_user(): row = session.query(User).filter_by(username=‘der‘)[0] print(row) session.delete(row) session.commit() #改 def update_user(): session.query(User).filter_by(id=3).update({User.password:‘abcdefg‘}) session.commit() #查 def query_user(): row = session.query(User).filter_by(username=‘suyn‘).all() row = session.query(User).filter(User.username==‘suyn‘).all() print(row[0]._locked) session.commit() # 最后可以进行验证: if __name__==‘__main__‘: # 验证 # add_user() # delete_user() # update_user() # query_user() print(User.get_id(1)) # 对上页封装的查询进行验证