一、参考文档
http://www.pythondoc.com/flask-sqlalchemy/quickstart.html (官方文档)
https://www.jianshu.com/p/f454a1aa760c
二、ORM解释
Object (code) - Relational (Database) - Mapping
数据库的表(table) --> 类(class)
记录(record,行数据)--> 对象(object)
字段(field)--> 对象的属性(attribute)
三、封装工作
1.Database → Class
使用以下指令生成models文件
sqlacodegen mysql+pymysql://用户名:密码@主机:端口号/数据库名 > 数据库对象存放路径
2.数据库连接
import pymysql from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker class DBSession(object): def __init__(self, db_name=database): engine = create_engine('mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(user, passwd, host, port, db_name), echo=False) Session = sessionmaker() Session.configure(bind=engine) self.session = Session(autocommit=True) # 可以这样用 db = DBSession() db.session.query(AppNotTest).filter(AppNotTest.appId == app_id).update({AppNotTest.status: 1, AppNotTest.updatedBy: 'tester'}, synchronize_session=False) db.session.flush() # 数据提交 db.session.close() # 数据库连接关闭
3.装饰器组装
def db_fixture(db_name='ibupltplatformdb'): class DBFixture(): def __init__(self): self.db_name = db_name def __db_conn(self): self.db = DBSession(self.db_name) self.db_session = self.db.session return self.db_session def __db_finalizer(self): self.db_session.flush() self.db_session.close() def __call__(self, func): def wrapper(*args, **kwargs): session = self.__db_conn() try: res = func(session=session, *args, **kwargs) return res except Exception as ex: logging.error('[DB ERROR] {}'.format(ex)) session.rollback() finally: self.__db_finalizer() return wrapper return DBFixture()
四、使用
1.查
@db_fixture(db_name='platformdb') def query_app_not_test_by_id(session, app_id): """ 1. 查:根据app_id, 等同于SELECT * FROM app_not_test WHERE app_id= ..; :param session: 由装饰器传,调用该方法时无需传此参数 :param app_id: 调用该方法时传此参数 :return: """ record = session.query(AppNotTest).filter(AppNotTest.appId == app_id).scalar() #scalar():返回结果小于等于1 return record @db_fixture() def query_app_not_test_by_app_list(session, app_list): # 2. 批量查询 records = session.query(AppNotTest).filter(AppNotTest.appId.in_(app_list)).all() return records @db_fixture() def query_app_not_test_with_group(session, app_id): # 3. 联表查询 record = session.query(AppNotTest.appId.label('app'), Appgroup.groupname.label('group')).\ filter(AppNotTest.appId == app_id, AppNotTest.appId == Appgroup.appid).all() return record @db_fixture() def query_distinct_group_name(session): # 4. group by使用 records = session.query(Appgroup.groupname, func.count(func.distinct(Appgroup.groupname))).group_by(Appgroup.groupname).all() return records @db_fixture() def query_app_not_test_by_pagination(session, limit, offset): # 5. 分页查询 records = session.query(AppNotTest).order_by(AppNotTest.datachange_lasttime.desc()).offset(offset).limit(limit).all() return records @db_fixture() def query_by_sql(session, app_id): # 6. sql查询,结果是元组而非object sql = 'SELECT * FROM app_not_test WHERE appId = :app_id' records = session.execute(text(sql), {'app_id': app_id}).fetchall() return records
2. 增
@db_fixture() def add_app_not_test(session, app_id, status): record = app_not_test_assembler(app_id, status) session.add(record) def app_not_test_assembler(app_id, status): app_not_test = AppNotTest() app_not_test.appId = app_id app_not_test.status = status app_not_test.createdBy = 'tester' app_not_test.updatedBy = 'tester' return app_not_test
3.改
@db_fixture() def update_app_not_test_status(session, app_id, status): session.query(AppNotTest).filter(AppNotTest.appId == app_id).update({AppNotTest.status: status}, synchronize_session=False)
4.删
@db_fixture() def delete_app_not_test_by_app_id(session, app_id): session.query(AppNotTest).filter(AppNotTest.appId == app_id).delete(synchronize_session='fetch')
五、其它
1、在使用 create_engine创建引擎时,如果默认不指定连接池设置的话,一般情况下,SQLAlchemy会使用一个 QueuePool绑定在新创建的引擎上。并附上合适的连接池参数。
2、在这种情况下,当你使用了session后就算显式地调用session.close(),也不能把连接关闭。连接会由QueuePool连接池进行管理并复用。
3、如果想禁用SQLAlchemy提供的数据库连接池,只需要在调用create_engine是指定连接池为NullPool,SQLAlchemy就会在执行session.close()后立刻断开数据库连接。当然,如果session对象被析构但是没有被调用session.close(),则数据库连接不会被断开,直到程序终止。
from sqlalchemy.pool import NullPool #连接数据库 def getEngine(): engine = create_engine('mysql://root:123456@127.0.0.1/baa?charset=utf8mb4', poolclass=NullPool, echo=False) #print(engine) return engine def getSession(): engine = getEngine() BaseMode.metadata.create_all(engine)## 数据库生成表 # Session = sessionmaker(bind=engine) # session = Session() DBSession = sessionmaker(bind=engine) session = DBSession() return session