python访问mysql

# -*- coding: UTF-8 -*-
import time

import phoenixdb
import psycopg2
import pymysql
from urllib.parse import urlparse

"""
将数据库的连接信息组装成一个connection对象
"""


class Connection(object):
    def __init__(self, conn_type, host, port, schema, username, password):
        self.host = host
        self.port = port
        self.conn_type = conn_type
        self.schema = schema
        self.username = username
        self.password = password

    # 从jdbc_url构造connection
    @classmethod
    def create(cls, url: str, username=None, password=None):
        result = urlparse(url[5:]) if url.startswith(jdbc:) else urlparse(url)
        conn_dict = {host: result.hostname,
                     port: result.port,
                     conn_type: result.scheme,
                     schema: result.path[1:],
                     username: username,
                     password: password}
        return cls(**conn_dict)

    def __str__(self):
        return self.__dict__

    def __repr__(self):
        return str(self.__str__())

    @property
    def jdbc_url(self):
        return "jdbc:{conn_type}://{host}:{port}/{schema}?useCompression=true&socketTimeout=1200000".format(
            **self.__dict__)


# 连接数据库和数仓
class DbConnCursor(object):

    def __init__(self, conn_type, host, port, schema, username, password):
        if conn_type.upper() == MYSQL:
            self.conn = pymysql.connect(host=host,port=port,user=username,password=password,database=schema,connect_timeout=31536000)
                    
        elif conn_type.upper() == POSTGRESQL:
            self.conn = psycopg2.connect(host=host, port=port, user=username, password=password, database=schema)
        elif conn_type.upper() == HBASE:
            self.conn = phoenixdb.connect("http://{host}:{port}".format(host=host, port=port), autocommit=True)
        
        self.cur = self.conn.cursor()

    # 通过传入一个conn对象构造db_cursor
    @classmethod
    def create(cls, conn: Connection):
        return cls(**conn.__dict__)

    def __enter__(self):
        return self.cur

    def __exit__(self, exc_type, exc_value, exc_trace):
        self.conn.commit()
        self.cur.close()
        self.conn.close()

 

python访问mysql

上一篇:2020/5/20数据库mysql学习通用数据库的对象中英文介绍笔记


下一篇:将Date存入数据库的两种方式