import pymysql
import os,sys
from sshtunnel import SSHTunnelForwarder
sys.path.append(os.path.join(os.getcwd()))
import readConfig
from common.log import logger
localReadConfig = readConfig.ReadConfig()
print(localReadConfig.get_mysql("username"))
class DataBaseHandle(object):
def __init__(self):
self.server = self.get_server()
self.server.start()
self.db = pymysql.connect(host="127.0.0.1",user=localReadConfig.get_mysql("username"),password=localReadConfig.get_mysql("password"),database=localReadConfig.get_mysql("database"),port=self.server.local_bind_port,charset='utf8')
self.cursor = None
def get_server(self):
ssh_address = localReadConfig.get_ssh("host") ##服务器地址
ssh_port = int(localReadConfig.get_ssh("port"))
server_user = localReadConfig.get_ssh("username") ##登录服务器的用户
server_password = localReadConfig.get_ssh("password") ##登录服务器的密码
server = SSHTunnelForwarder(
ssh_address_or_host = (ssh_address, ssh_port),
ssh_password=server_password,
ssh_username=server_user,
remote_bind_address=(localReadConfig.get_mysql("host"),int(localReadConfig.get_mysql("port"))),
local_bind_address=("0.0.0.0", 9527) #绑定到本地的端口
)
return server
def execute_sql(self, sql):
try:
self.cursor = self.db.cursor()
self.cursor.execute(sql)
self.db.commit()
except Exception as err:
self.db.rollback()
def query_one(self, sql):
try:
self.cursor = self.db.cursor()
self.cursor.execute(sql)
data = self.cursor.fetchone()
return data
except Exception as err:
logger.error(err)
def query_all(self, sql):
try:
self.cursor = self.db.cursor()
self.cursor.execute(sql)
datas = self.cursor.fetchall()
return datas
except Exception as err:
logger.error(err)
def closedb(self):
"""关闭数据库连接"""
if self.cursor: self.cursor.close()
if self.db: self.db.close()
self.server.stop()
if __name__ == "__main__":
net = DataBaseHandle()
s = net.query_one("select * from xxx")
print(s)
net.closedb()