一、DBUtils
DBUtils 是一套允许线程化 Python 程序可以安全和有效的访问数据库的模块,DBUtils提供两种外部接口: PersistentDB :提供线程专用的数据库连接,并自动管理连接。 PooledDB :提供线程间可共享的数据库连接,并自动管理连接。
操作数据库模板:
import datetime
import sys
import os
import configparser
import logging
import psycopg2 from DBUtils.PooledDB import PooledDB class DatabaseOperator(object):
'''
class for database operator
''' def __init__(self,
database_config_path, database_config=None):
'''
Constructor
'''
self._database_config_path = database_config_path # load database configuration
if not database_config :
self._database_config = self.parse_postgresql_config(database_config_path)
else:
self._database_config = database_config
self._pool = None def database_config_empty(self):
if self._database_config:
return False
else:
return True def parse_postgresql_config(self, database_config_path=None):
'''解析pei数据库配置文件
参数
---------
arg1 : conf_file
数据库配置文件路径
返回值
--------
dict
解析配置属性dict--config 示例
--------
无
'''
if database_config_path == None and self._database_config_path != None:
database_config_path = self._database_config_path
if not os.path.isfile(database_config_path):
sys.exit("ERROR: Could not find configuration file: {0}".format(database_config_path))
parser = configparser.SafeConfigParser()
parser.read(database_config_path)
config = {}
config['database'] = parser.get('UniMonDB', 'Database')
config['db_user'] = parser.get('UniMonDB', 'UserName')
config['db_passwd'] = parser.get('UniMonDB', 'Password')
config['db_port'] = parser.getint('UniMonDB', 'Port')
config['db_host'] = parser.get('UniMonDB', 'Servername')
self._database_config = config return config def get_pool_conn(self): if not self._pool:
self.init_pgsql_pool()
return self._pool.connection() def init_pgsql_pool(self):
'''利用数据库属性连接数据库
参数
---------
arg1 : config
数据库配置属性
返回值
-------- 示例
--------
无
'''
# 字典config是否为空
config = self.parse_postgresql_config()
POSTGREIP = config['db_host']
POSTGREPORT = config['db_port']
POSTGREDB = config['database']
POSTGREUSER = config['db_user']
POSTGREPASSWD = config['db_passwd']
try:
logging.info('Begin to create {0} postgresql pool on:{1}.\n'.format(POSTGREIP, datetime.datetime.now())) pool = PooledDB(
creator=psycopg2, # 使用链接数据库的模块mincached
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=1, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=4, # 链接池中最多闲置的链接,0和None不限制
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
host=POSTGREIP,
port=POSTGREPORT,
user=POSTGREUSER,
password=POSTGREPASSWD,
database=POSTGREDB)
self._pool = pool
logging.info('SUCCESS: create postgresql success.\n') except Exception as e:
logging.error('ERROR: create postgresql pool failed:{0}\n')
self.close_db_cursor()
sys.exit('ERROR: create postgresql pool error caused by {0}'.format(str(e))) def pg_select_operator(self, sql):
'''进行查询操作,函数返回前关闭cursor,conn
参数
---------
arg1 : sql查询语句
返回值
--------
list:result
类型为list的查询结果:result 示例
--------
无
'''
# 执行查询
try:
conn = self.get_pool_conn()
cursor = conn.cursor()
cursor.execute(sql)
result = cursor.fetchall()
except Exception as e:
logging.error('ERROR: execute {0} causes error'.format(sql))
sys.exit('ERROR: load data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.close()
return result def test_pool_con(self):
sql = 'select * from tbl_devprofile'
result = self.pg_select_operator(sql)
print(result) def pg_insert_operator(self, sql): result = False
try:
conn = self.get_pool_conn()
cursor = conn.cursor()
cursor.execute(sql)
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error'.format(sql))
sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
conn.close()
return result def pg_update_operator(self, sql): result = False
try:
conn = self.get_pool_conn()
cursor = conn.cursor()
cursor.execute(sql)
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error'.format(sql))
sys.exit('ERROR: update data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
conn.close()
return result def pg_delete_operator(self, sql):
result = False
# 执行查询
try:
conn = self.get_pool_conn()
cursor = conn.cursor()
cursor.execute(sql)
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error'.format(sql))
sys.exit('ERROR: delete data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
conn.close()
return result def close_pool(self):
'''关闭pool
参数
---------
无 返回值
--------
无
示例
--------
无
'''
if self._pool != None:
self._pool.close() if __name__ == '__main__':
path = "E:\\Users\\Administrator\\eclipse-workspace\\com.leagsoft.basemodule\\base\\config\\sql_conf.conf"
db = DatabaseOperator(
database_config_path=path)
db.test_pool_con()
二、多线程
原理:创建多个线程类,多个线程类共享一个队里Queue,每一个线程类可以操作数据库
from threading import Thread class Worker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue def run(self):
while True:
# Get the work from the queue and expand the tuple
# 从队列中获取任务
database_operator, device, stand_alone_result = self.queue.get()
operateResult(database_operator, device, stand_alone_result)
# 任务执行完之后要通知队列
self.queue.task_done()
填充队列
# 使用队列多线程
logging.info('begin to update all device risk score by multi_processing.\n')
from queue import Queue
queue = Queue()
# 六个线程,每个线程共享一个队列
for _ in range(6):
worker = Worker(queue)
worker.setDaemon(True)
worker.start() for record in all_devid:
device = record[0]
devtype = record[1]
all_countlist = all_dict.get(device)
stand_alone_result = device_assess(all_countlist)
if (devtype in (server_devtype + computer_devtype)) and (stand_alone_result < 100):
stand_alone_result *= 0.8
# 将设备风险评分数据保存到数据库中
queue.put((database_operator, device, stand_alone_result)) #等待队列任务执行完
queue.join() def operateResult(database_operator, device, stand_alone_result):
'''
函数名称: device_assess
描述: 保存单台设备分数到数据库
调用: 无
被调用: main
被访问的表: tbl_devprofile
被修改的表: 无
输入参数: database_operator, device:设备uid, stand_alone_result:单台设备风险分数
输出参数:无
返回值: 单台设备风险分数值
其它: 无
'''
import time
find_profile_sql = "SELECT uiddevrecordid FROM tbl_devprofile WHERE uiddevrecordid='{0}';".format(device)
isExistRecord = database_operator.pg_select_operator(find_profile_sql)
#currentTime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
currentTime=time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
if len(isExistRecord) > 0:
updata_profile_sql = "UPDATE tbl_devprofile SET irisklevel={0}, dtrisktime='{1}' \
WHERE uiddevrecordid='{2}';".format(stand_alone_result, currentTime, device)
database_operator.pg_update_operator(updata_profile_sql)
else:
insert_profile_sql = "INSERT INTO tbl_devprofile VALUES('{0}',NULL,NULL,NULL,NULL,NULL,NULL,NULL,{1},'{2}');".format(
device, stand_alone_result, currentTime)
database_operator.pg_insert_operator(insert_profile_sql)
使用单线程时,执行完代码花费20s左右,使用多线程时花费5s左右。
Reference:
[1] https://blog.csdn.net/zhaihaifei/article/details/54016939
[2] https://www.cnblogs.com/hao-ming/p/7215050.html?utm_source=itdadao&utm_medium=referral
[3] https://www.cnblogs.com/wozijisun/p/6160065.html (多线程)
[4] http://www.lpfrx.com/archives/4431/
[5] https://www.cnblogs.com/95lyj/p/9047554.html