ssh通道连接mysql

import pymysql
import os,sys
from sshtunnel import SSHTunnelForwarder
sys.path.append(os.path.join(os.getcwd()))
import readConfig
from common.log import logger

localReadConfig = readConfig.ReadConfig()
print(localReadConfig.get_mysql("username"))

class DataBaseHandle(object):
    def __init__(self):
        self.server = self.get_server()
        self.server.start()
        self.db = pymysql.connect(host="127.0.0.1",user=localReadConfig.get_mysql("username"),password=localReadConfig.get_mysql("password"),database=localReadConfig.get_mysql("database"),port=self.server.local_bind_port,charset='utf8')
        self.cursor = None

    def get_server(self):
        ssh_address = localReadConfig.get_ssh("host") ##服务器地址
        ssh_port = int(localReadConfig.get_ssh("port"))
        server_user = localReadConfig.get_ssh("username") ##登录服务器的用户
        server_password = localReadConfig.get_ssh("password") ##登录服务器的密码
        server = SSHTunnelForwarder(
            ssh_address_or_host = (ssh_address, ssh_port),
            ssh_password=server_password,
            ssh_username=server_user,
            remote_bind_address=(localReadConfig.get_mysql("host"),int(localReadConfig.get_mysql("port"))),
            local_bind_address=("0.0.0.0", 9527) #绑定到本地的端口
            )
        return server  

    def execute_sql(self, sql):
        try:
            self.cursor = self.db.cursor()
            self.cursor.execute(sql)
            self.db.commit()
        except Exception as err:
            self.db.rollback()

    def query_one(self, sql):
        try:
            self.cursor = self.db.cursor()
            self.cursor.execute(sql)
            data = self.cursor.fetchone()
            return data
        except Exception as err:
            logger.error(err)
    
    def query_all(self, sql):
        try:
            self.cursor = self.db.cursor()
            self.cursor.execute(sql)
            datas = self.cursor.fetchall()
            return datas
        except Exception as err:
            logger.error(err)

    def closedb(self):
        """关闭数据库连接"""
        if self.cursor: self.cursor.close() 
        if self.db: self.db.close()
        self.server.stop()

if __name__ == "__main__":
    net = DataBaseHandle()
    s = net.query_one("select * from xxx")
    print(s)
    net.closedb()
上一篇:css cursor鼠标样式


下一篇:登录