使用SQLAlchemy进行ORM操作

一、参考文档

http://www.pythondoc.com/flask-sqlalchemy/quickstart.html  (官方文档)

https://www.jianshu.com/p/f454a1aa760c  

二、ORM解释

Object (code) - Relational (Database) - Mapping

数据库的表(table) --> 类(class)

记录(record,行数据)--> 对象(object)

字段(field)--> 对象的属性(attribute)

三、封装工作

1.Database → Class

使用以下指令生成models文件

sqlacodegen mysql+pymysql://用户名:密码@主机:端口号/数据库名 > 数据库对象存放路径

2.数据库连接

import pymysql

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

class DBSession(object):
    def __init__(self, db_name=database):
        engine = create_engine('mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(user, passwd, host, port, db_name),
                               echo=False)
        Session = sessionmaker()
        Session.configure(bind=engine)

        self.session = Session(autocommit=True)

# 可以这样用
db = DBSession()
db.session.query(AppNotTest).filter(AppNotTest.appId == app_id).update({AppNotTest.status: 1,
                                                                        AppNotTest.updatedBy: 'tester'},
                                                                       synchronize_session=False)
db.session.flush()   # 数据提交
db.session.close()   # 数据库连接关闭

3.装饰器组装

def db_fixture(db_name='ibupltplatformdb'):
    class DBFixture():
        def __init__(self):
            self.db_name = db_name

        def __db_conn(self):
            self.db = DBSession(self.db_name)
            self.db_session = self.db.session
            return self.db_session

        def __db_finalizer(self):
            self.db_session.flush()
            self.db_session.close()

        def __call__(self, func):
            def wrapper(*args, **kwargs):
                session = self.__db_conn()
                try:
                    res = func(session=session, *args, **kwargs)
                    return res
                except Exception as ex:
                    logging.error('[DB ERROR] {}'.format(ex))
                    session.rollback()
                finally:
                    self.__db_finalizer()

            return wrapper

    return DBFixture()

四、使用

1.查

@db_fixture(db_name='platformdb')
def query_app_not_test_by_id(session, app_id):
    """
    1. 查:根据app_id, 等同于SELECT * FROM app_not_test WHERE app_id= ..;
    :param session: 由装饰器传,调用该方法时无需传此参数
    :param app_id: 调用该方法时传此参数
    :return:
    """
    record = session.query(AppNotTest).filter(AppNotTest.appId == app_id).scalar()        #scalar():返回结果小于等于1
    return record


@db_fixture()
def query_app_not_test_by_app_list(session, app_list):
    # 2. 批量查询
    records = session.query(AppNotTest).filter(AppNotTest.appId.in_(app_list)).all()
    return records


@db_fixture()
def query_app_not_test_with_group(session, app_id):
    # 3. 联表查询
    record = session.query(AppNotTest.appId.label('app'), Appgroup.groupname.label('group')).\
        filter(AppNotTest.appId == app_id, AppNotTest.appId == Appgroup.appid).all()
    return record


@db_fixture()
def query_distinct_group_name(session):
    # 4. group by使用
    records = session.query(Appgroup.groupname, func.count(func.distinct(Appgroup.groupname))).group_by(Appgroup.groupname).all()
    return records


@db_fixture()
def query_app_not_test_by_pagination(session, limit, offset):
    # 5. 分页查询
    records = session.query(AppNotTest).order_by(AppNotTest.datachange_lasttime.desc()).offset(offset).limit(limit).all()
    return records


@db_fixture()
def query_by_sql(session, app_id):
    # 6. sql查询,结果是元组而非object
    sql = 'SELECT * FROM app_not_test WHERE appId = :app_id'
    records = session.execute(text(sql), {'app_id': app_id}).fetchall()
    return records  

2. 增

@db_fixture()
def add_app_not_test(session, app_id, status):
    record = app_not_test_assembler(app_id, status)
    session.add(record)


def app_not_test_assembler(app_id, status):
    app_not_test = AppNotTest()
    app_not_test.appId = app_id
    app_not_test.status = status
    app_not_test.createdBy = 'tester'
    app_not_test.updatedBy = 'tester'

    return app_not_test

3.改

@db_fixture()
def update_app_not_test_status(session, app_id, status):
    session.query(AppNotTest).filter(AppNotTest.appId == app_id).update({AppNotTest.status: status},
                                                                        synchronize_session=False)

4.删

@db_fixture()
def delete_app_not_test_by_app_id(session, app_id):
    session.query(AppNotTest).filter(AppNotTest.appId == app_id).delete(synchronize_session='fetch')

五、其它

1、在使用 create_engine创建引擎时,如果默认不指定连接池设置的话,一般情况下,SQLAlchemy会使用一个 QueuePool绑定在新创建的引擎上。并附上合适的连接池参数。

2、在这种情况下,当你使用了session后就算显式地调用session.close(),也不能把连接关闭。连接会由QueuePool连接池进行管理并复用。

3、如果想禁用SQLAlchemy提供的数据库连接池,只需要在调用create_engine是指定连接池为NullPool,SQLAlchemy就会在执行session.close()后立刻断开数据库连接。当然,如果session对象被析构但是没有被调用session.close(),则数据库连接不会被断开,直到程序终止。

from sqlalchemy.pool import NullPool

#连接数据库
def getEngine():
    engine = create_engine('mysql://root:123456@127.0.0.1/baa?charset=utf8mb4', poolclass=NullPool, echo=False)
    #print(engine)
    return engine

def getSession():
    engine = getEngine()
    BaseMode.metadata.create_all(engine)## 数据库生成表
    # Session = sessionmaker(bind=engine)
    # session = Session()

    DBSession = sessionmaker(bind=engine)
    session = DBSession()

    return session

 

上一篇:mongodb 插入/更新/删除 文档


下一篇:修改设置了表与表之间外键记录时存在的坑