python数据库连接池
一丶持久数据库 (persistent_db)
# 1. dbutils.persistent_db 中的类 PersistentDB使用任何 DB-API2 数据库模块
# 2. 实现到数据库的稳定、线程仿射、持久连接。
# 3. “线程仿射”和“持久”意味着各个数据库连接保持分配给各自的线程,并且在线程的生命周期内不会关闭
#
1. 每当线程第一次打开数据库连接时,将打开一个到数据库的新连接,该连接将从现在开始用于该特定线程
2. 当线程关闭数据库连接时,它仍然保持打开状态,以便下次同一个线程请求连接时,可以使用这个已经打开的连接
3. 当线程死亡时,连接将自动关闭
简而言之:
persistent_db尝试回收数据库连接以提高线程应用程序的整体数据库访问性能,但它确保线程之间永远不会共享连接
二丶池化数据库 (pooled_db)
三丶持久数据库 (persistent_db)
### persistent_db 的参数
- creator : 返回新的 DB-API 2 连接对象的任意函数或符合 DB-API 2 的数据库模块
- maxusage : 单个连接的最大重用次数(默认为0或None表示无限重用)
每当达到限制时,连接将被重置
- setsession : 可用于准备会话的 SQL 命令的可选列表,例如["set datestyle to German", ...]
- failures : 如果默认值(OperationalError,InterfaceError,InternalError)不适用于使用的数据库模块,则应应用连接故障转移机制的可选异常类或异常类元组
- ping : 一个可选标志,用于控制何时使用ping()方法检查连接,如果这种方法可用(0 =无= 从不,1 = 默认 = 每当请求时, 2 = 创建游标时,4 = 当执行查询, 7 = 总是,以及这些值的所有其他位组合)
- closeable : 如果设置为 true,则允许关闭连接,但默认情况下,这将被忽略
- threadlocal : 一个可选类,用于表示将使用的线程本地数据,而不是我们的 Python 实现(threading.local 更快,但不能在所有情况下都使用)
### 与本地数据库mydb的每个连接都被重用 1000 次
import pgdb # import used DB-API 2 module
from dbutils.persistent_db import PersistentDB
persist = PersistentDB(pgdb, 1000, database='mydb')
### 这些参数设置生成器后,您可以请求此类数据库连接
db = persist.connection()
四丶池化数据库 (pooled_db)
# pooled_db 的参数
- creator : 返回新的 DB-API 2 连接对象的任意函数或符合 DB-API 2 的数据库模块
- mincached : 池中的初始空闲连接数(默认为0表示启动时不建立连接)
- maxcached : 池中的最大空闲连接数(默认值0或None表示无限池大小)
- maxshared : 允许的最大共享连接数(默认值0或None表示所有连接都是专用的)
当达到此最大数量时,如果连接被请求为可共享,则连接将被共享。
- maxconnections : 一般允许的最大连接数(默认值0或None表示任意数量的连接)
- blocking : 确定超过最大值时的行为
- maxusage : 单个连接的最大重用次数(默认为0或None表示无限重用), 当达到此连接的最大使用次数时,连接会自动重置(关闭并重新打开)
- setsession : 可用于准备会话的 SQL 命令的可选列表,例如["set datestyle to German", ...]
- reset : 返回池时应如何重置连接(False或None回滚以begin() 开始的事务,默认值True出于安全考虑总是发出回滚)
- failures : 如果默认值(OperationalError,InterfaceError,InternalError)不适用于使用的数据库模块,则应应用连接故障转移机制的可选异常类或异常类元组
- ping : 一个可选标志,用于控制何时使用ping()方法检查连接(如果此类方法可用)(0 =无= 从不,1 = 默认 = 每当从池中获取时, 2 = 创建游标时,4 = 何时执行查询, 7 = 总是,以及这些值的所有其他位组合)
### 想要一个与本地数据库mydb的至少五个连接的数据库连接池
import pgdb # import used DB-API 2 module
from dbutils.pooled_db import PooledDB
pool = PooledDB(pgdb, 5, database='mydb')
### 设置连接池后,可以从该池请求数据库连接:
db = pool.connection()
### 设置非零maxshared参数, 默认情况下连接可能会与其他线程共享。如果您想拥有专用连接
db = pool.connection(shareable=False)
↓
db = pool.dedicated_connection() # 专用连接
### 如果您不再需要它,您应该立即使用 db.close() 将其返回到池中。您可以以相同的方式获得另一个连接
### 连接不是线程安全的,这将过早释放连接以供重用
db = pool.connection()
cur = db.cursor()
cur.execute(...)
res = cur.fetchone()
cur.close() # or del cur
db.close() # or del db
### 以将上下文管理器用于更简单的代码
with pool.connection() as db:
with db.cursor() as cur:
cur.execute(...)
res = cur.fetchone()
五丶 代码+官网
# 官网
https://webwareforpython.github.io/DBUtils/main.html
### 脚本
# -*- coding: utf-8 -*-
import os
import logging
import pymysql
from dbutils.pooled_db import PooledDB
base_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logging.basicConfig(
level=30,
filename=os.path.join(base_path, 'log', 'db_process.log'),
filemode='a',
)
class DbTool(object):
"""
# 单例模式 + 数据连接池
"""
mincached = 10 # 连接池种空闲连接的初始数量
maxcached = 20 # 连接池种空闲连接的最大数量
maxshared = 10 # 共享连接的最大数量
maxconnections = 200 # 创建连接池的最大数量
blocking = True # 超过最大连接数量的时. True等待连接数下降,False直接报错处理
maxusage = 100 # 单个连接的最大重复使用次数
setsession = None #
reset = True #
_pool = None
__instance = None
def __init__(self, db_config):
host = db_config['host']
port = db_config['port']
user = db_config['user']
password = db_config['password']
db = db_config['db']
if not self._pool:
self._pool = PooledDB(
creator=pymysql,
maxconnections=self.maxconnections,
mincached=self.mincached,
maxcached=self.maxcached,
blocking=self.blocking,
maxusage=self.maxusage,
setsession=self.setsession,
host=host,
port=port,
user=user,
password=password,
database=db,
)
logging.info('SUCCESS: create postgresql success.\n')
def __new__(cls, *args, **kwargs):
"""
:param args:
:param kwargs:
"""
if not cls.__instance:
cls.__instance = super(DbTool, cls).__new__(cls)
return cls.__instance
def pg_select_operator(self, sql):
'''
# 查询 SQL
:param sql: sql语句
:return: sql结果
'''
conn = self._pool.connection()
cursor = conn.cursor()
result = False
try:
cursor.execute(sql)
result = cursor.fetchall()
except Exception as e:
logging.error('ERROR:', e)
finally:
cursor.close()
conn.close()
return result
def pg_update_operator(self, sql):
result = False
conn = self._pool.connection()
cursor = conn.cursor()
try:
cursor.execute(sql)
result = True
except Exception as e:
logging.error('ERROR:', e)
finally:
cursor.close()
conn.commit()
conn.close()
return result
def pg_insert_operator(self, sql):
result = False
conn = self._pool.connection()
cursor = conn.cursor()
try:
cursor.execute(sql)
result = True
except Exception as e:
logging.error('ERROR:', e)
finally:
# 关闭连接钱提交
cursor.close()
conn.commit()
conn.close()
return result
def close_pool(self):
'''
关闭连接池
:return:
'''
if self._pool != None:
self._pool.close()
if __name__ == '__main__':
db_config = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"password": "",
"db": "testdb"
}
obj = DbTool(db_config)
sql = """
INSERT INTO `testdb`.`p_user`(`id`, `user_id`, `user_name`, `user_sex`, `user_phone`, `user_email`, `user_pwd`, `isadmin`, `create_time`, `end_time`, `user_origal`) VALUES (12, '10000001', 'admin_x', '1', '13811111112', 'admin_x@163.com', 'pbkdf2:sha256:150000$EU5aTt0N$555f3c7a0d28c092f8b5c3402f0218fffd51b6fa83bab54fed1670f969898c55', '1', '2021-08-01 11:34:07', NULL, NULL);
"""
print('id:', id(obj))
try:
obj.pg_insert_operator(sql)
except Exception as e:
print(e)