import pymysql
pymysql.install_as_MySQLdb()
from sqlalchemy import create_engine
from contextlib import contextmanager engine = cretae_engine('mysql://username:passwd@uri:port/database') @contextmanager def db_scope(engine, sql_type=None): """Provide a transactional scope around a series of operations.""" db = engine.raw_connection() if sql_type == 'mysql': cursor = db.cursor(cursor=pymysql.cursors.DictCursor) else: cursor = db.cursor() # cursor_factory=psycopg2.extras.RealDictCursor try: yield cursor db.commit() except Exception as e: db.rollback() raise e finally: cursor.close() db.close()
下面可以使用with语句来进行sqlalchemy进行原生sql操作
sql_type = 'mysql' with db_scope(self.engine, sql_type) as cursor: cursor.execute(sql) result = cursor.fetchall() logger.info(sql + ' 执行成功') return result