import pymysql from pymysql.cursors import DictCursor from queue import Queue import threading class Pool: def __init__(self,size,*args,**kwargs): self._size=size self._pool=Queue(size) self.local=threading.local() for _ in range(size): conn=pymysql.connect(*args,**kwargs) self._pool.put(conn) @property def size(self): return self._pool.qsize() def acquire(self): return self._pool.get() def release(self,conn:pymysql.connections.Connection): if isinstance(conn,pymysql.connections.Connection): self._pool.put(conn) def __enter__(self): if getattr(self.local,'conn',None) is None: self.local.conn=self.acquire() return self.local.conn.cursor(cursor=DictCursor) def __exit__(self,exc_type,exc_value,exc_tb): if exc_type: self.local.conn.rollback() else: self.local.conn.commit() self.release(self.local.conn) self.local.conn=None pool=Pool(5,'localhost','root','cruces','uranus') with pool as cursor: with cursor: sql='select * from pp where id in (%(a)s,%(b)s,%(c)s)' cursor.execute(sql,{'a':3,'b':6,'c':7}) print(cursor.fetchall()) sql='show processlist' cursor.execute(sql,args=None) for b in cursor: print(b) # conn=pool.acquire() # print(conn) # print(pool.size) # cursor=conn.cursor(cursor=DictCursor) # cursor.execute('select * from pp') # # print(cursor.fetchall()) # pool.release(conn) # print(pool.size)
import pymysql from pymysql.cursors import DictCursor conn=None cursor=None try: conn=pymysql.connect('localhost','root','cruces','uranus') print(conn.password) print(conn.ping(False)) conn.__ent with conn.cursor(cursor=DictCursor) as cursor: d={'id':'5 or 1'} sql='select * from pp where id=%(id)s' print(cursor.execute(sql,d)) cursor.rownumber=-cursor.rowcount print(cursor.fetchone()) cursor.execute('select 1') conn.commit() # except: # conn.rollback() finally: if conn: conn.close()