话不多少,直接上代码。
import time
import pymysql
from DBUtils.PooledDB import PooledDB, SharedDBConnection
from log import logger
from config import configs, APP_ENV
DB_HOST = configs[APP_ENV].DB_HOST
DB_PORT = configs[APP_ENV].DB_PORT
DB_USER = configs[APP_ENV].DB_USER
DB_PASSWORD = configs[APP_ENV].DB_PASSWORD
DB_DEFAULT = configs[APP_ENV].DB_DEFAULT
DB_CHARSET = configs[APP_ENV].DB_CHARSET
def exe_time(func):
def new_func(*args, **args2):
t0 = time.time()
t0local = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
back = func(*args, **args2)
t1 = time.time()
t1local = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
msg = "Function: %s, start: @%s, end: @%s, elapsed: @%.4fs." % (func.__name__, t0local, t1local, t1 - t0)
logger.info(msg)
return back
return new_func
class MSC(object):
"""the class to connect mysql database and execute sql"""
def __init__(self, host, port, dbuser, password, db, charset):
self.pool = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host=host,
port=int(port),
user=dbuser,
password=password,
database=db,
charset=charset
)
def get_cursor(self):
try:
conn = self.pool.connection()
cursor = conn.cursor()
except Exception as e:
logger.error("Database Connection error: " + str(e))
return None
return conn, cursor
def get_cursor2(self):
try:
conn = self.pool.connection()
cursor = conn.cursor(pymysql.cursors.SSDictCursor)
except Exception as e:
logger.error("Database Connection error: " + str(e))
return None
return conn, cursor
@exe_time
def execute(self, sql, param=None):
"""执行单条SQL。用于:delete、update、insert。
:param sql: 标准SQL
:param param: SQL参数
:return: 影响行数
"""
logger.info(sql)
conn, cursor = self.get_cursor()
try:
cursor.execute(sql, param)
conn.commit()
affected_row = cursor.rowcount
except Exception as e:
logger.error("mysql execute error: " + str(e))
raise Exception("mysql execute error: " + str(e))
finally:
cursor.close()
conn.close()
return affected_row
@exe_time
def executemany(self, sql, param=None):
"""执行多条SQL。
:param sql: 标准SQL
:param param: SQL参数
:return: 影响行数
"""
logger.info(sql)
conn, cursor = self.get_cursor()
try:
cursor.executemany(sql, param)
conn.commit()
affected_row = cursor.rowcount
except Exception as e:
logger.error("mysql execute error: " + str(e))
raise Exception("mysql execute error: " + str(e))
finally:
cursor.close()
conn.close()
return affected_row
@exe_time
def query(self, sql):
"""查询SQL。执行select语句。
:param sql: 标准SQL
:return: 返回元组类型的结构
"""
logger.info(sql)
conn, cursor = self.get_cursor()
try:
cursor.execute(sql, None)
result = cursor.fetchall()
except Exception as e:
logger.error("mysql query error: " + str(e))
return None
finally:
cursor.close()
conn.close()
return result
@exe_time
def query2(self, sql):
"""查询SQL。执行select语句。
:param sql: 标准SQL
:return: 返回SQL执行的记录,列表类型的结构,键值对形式
"""
logger.info(sql)
conn, cursor = self.get_cursor2()
try:
cursor.execute(sql, None)
result = cursor.fetchall()
except Exception as e:
logger.error("database query error: " + str(e))
return None
finally:
cursor.close()
conn.close()
return result
cursor_mysql = MSC(DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DB_DEFAULT, DB_CHARSET) # 实例化对象
insert_sql = """insert into users(username, password) values(‘zhangsan‘, ‘123456‘);"""
print(insert_sql)
cursor_mysql.execute(insert_sql)
select_sql = """select * from users;"""
res = cursor_mysql.query(select_sql)
print(res)
select_sql = """select * from users;"""
res2 = cursor_mysql.query2(select_sql)
print(res2)
以上。
参考大佬DBUtils使用:
Python 使用 PyMysql、DBUtils 创建连接池,提升性能
Python数据库连接池DBUtils(基于pymysql模块连接数据库)
本博文代码属本文博主所有,转载请注明博文出处。