python使用dbutils的PooledDB连接池,操作数据库

python使用dbutils的PooledDB连接池,操作数据库

 

1、使用dbutils的PooledDB连接池,操作数据库。

这样就不需要每次执行sql后都关闭数据库连接,频繁的创建连接,消耗时间

2、如果是使用一个连接一直不关闭,多线程下,插入超长字符串到数据库,运行一段时间后很容易出现OperationalError: (2006, ‘MySQL server has gone away’)这个错误。

使用PooledDB解决。

3、记录工作中连接mysql和sqlserver的两种示例

 

 

连接mysql数据库

python使用dbutils的PooledDB连接池,操作数据库
# coding=utf-8
"""
使用DBUtils数据库连接池中的连接,操作数据库
OperationalError: (2006, ‘MySQL server has gone away’)
"""
import json
import pymysql
import datetime
from DBUtils.PooledDB import PooledDB
import pymysql


class MysqlClient(object):
    __pool = None;

    def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
                 maxusage=100, setsession=None, reset=True,
                 host='127.0.0.1', port=3306, db='test',
                 user='root', passwd='123456', charset='utf8mb4'):
        """

        :param mincached:连接池中空闲连接的初始数量
        :param maxcached:连接池中空闲连接的最大数量
        :param maxshared:共享连接的最大数量
        :param maxconnections:创建连接池的最大数量
        :param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
        :param maxusage:单个连接的最大重复使用次数
        :param setsession:optional list of SQL commands that may serve to prepare
            the session, e.g. ["set datestyle to ...", "set time zone ..."]
        :param reset:how connections should be reset when returned to the pool
            (False or None to rollback transcations started with begin(),
            True to always issue a rollback for safety's sake)
        :param host:数据库ip地址
        :param port:数据库端口
        :param db:库名
        :param user:用户名
        :param passwd:密码
        :param charset:字符编码
        """

        if not self.__pool:
            self.__class__.__pool = PooledDB(pymysql,
                                             mincached, maxcached,
                                             maxshared, maxconnections, blocking,
                                             maxusage, setsession, reset,
                                             host=host, port=port, db=db,
                                             user=user, passwd=passwd,
                                             charset=charset,
                                             cursorclass=pymysql.cursors.DictCursor
                                             )
        self._conn = None
        self._cursor = None
        self.__get_conn()

    def __get_conn(self):
        self._conn = self.__pool.connection();
        self._cursor = self._conn.cursor();

    def close(self):
        try:
            self._cursor.close()
            self._conn.close()
        except Exception as e:
            print e

    def __execute(self, sql, param=()):
        count = self._cursor.execute(sql, param)
        print count
        return count

    @staticmethod
    def __dict_datetime_obj_to_str(result_dict):
        """把字典里面的datatime对象转成字符串,使json转换不出错"""
        if result_dict:
            result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
            result_dict.update(result_replace)
        return result_dict

    def select_one(self, sql, param=()):
        """查询单个结果"""
        count = self.__execute(sql, param)
        result = self._cursor.fetchone()
        """:type result:dict"""
        result = self.__dict_datetime_obj_to_str(result)
        return count, result

    def select_many(self, sql, param=()):
        """
        查询多个结果
        :param sql: qsl语句
        :param param: sql参数
        :return: 结果数量和查询结果集
        """
        count = self.__execute(sql, param)
        result = self._cursor.fetchall()
        """:type result:list"""
        [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
        return count, result

    def execute(self, sql, param=()):
        count = self.__execute(sql, param)
        return count

    def begin(self):
        """开启事务"""
        self._conn.autocommit(0)

    def end(self, option='commit'):
        """结束事务"""
        if option == 'commit':
            self._conn.autocommit()
        else:
            self._conn.rollback()


if __name__ == "__main__":
    mc = MysqlClient()
    sql1 = 'SELECT * FROM shiji  WHERE  id = 1'
    result1 = mc.select_one(sql1)
    print json.dumps(result1[1], ensure_ascii=False)

    sql2 = 'SELECT * FROM shiji  WHERE  id IN (%s,%s,%s)'
    param = (2, 3, 4)
    print json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False)
View Code

如果独立使用pymysql数据库,最好是配合DButils库。

 

连接sqlserver数据库

 

python使用dbutils的PooledDB连接池,操作数据库
from DBUtils.PooledDB import PooledDB


class SqlClient(object):
    """默认是连接sqlserver的客户端"""

    def __init__(self, host=None, user=None, password=None,
                 sql=None, database=None):
        if database:
            pool = PooledDB(pymssql, database=database,
                            mincached=5, maxcached=10, maxshared=5, maxconnections=10, blocking=True,
                            maxusage=100, setsession=None, reset=True, host=host,
                            user=user, password=password
                            )
        else:
            pass
        conn = pool.connection()
        cursor = conn.cursor()
        self.sql = sql
        self.conn = conn
        self.cursor = cursor

    def all(self):
        self.cursor.execute(self.sql)
        self.data = self.cursor.fetchall()
        return self.data

    def first(self):
        self.cursor.execute("select top 1 * from v_ASRS_STORE_MESVIEW")
        self.data = self.cursor.fetchone()
        return self.data

    def count(self, sql="select count(*) as count from v_ASRS_STORE_MESVIEW"):
        self.cursor.execute(sql)
        self.data = self.cursor.fetchone()
        return self.data[0]

    def close(self):
        self.conn.close()
        self.cursor.close()
client.py

 

 

python使用dbutils的PooledDB连接池,操作数据库
from client import SqlClient

CONFIG = { 

    "default": {"HOST": "10.10.10.10", "USER": "aaa", "NAME": "AAA", "PASSWORD": "123"},

}

def main():

    conf = dict(
        host=CONFIG['default']['HOST'],
        user=CONFIG['default']['USER'],
        database=CONFIG['default']['NAME'],
        password=CONFIG['default']['PASSWORD'])
 
    sc = SqlClient(sql, **conf)
    res = sc.all()
    print(res)
connect.py

 

 

 

OK, thank

 

 

上一篇:iOS 事件处理机制与图像渲染过程


下一篇:python requests库爬取网页小实例:ip地址查询