pymysql操作数据库【进阶版】

配置文件,名称为config.py

#config.py
rds_v1 = {
    "host": "127.0.0.1",
    "user": "root",
    "password": "12345678",
    "database": "users",
    "port": 3306
}

类对象内容

#database.py
import pymysql
import pandas as pd
from config import *



class DB:

    def __init__(self, engine):
        self._engine = engine
        self._conn = pymysql.connect(host=self._engine[‘host‘],
                                     user=self._engine[‘user‘],
                                     password=self._engine[‘password‘],
                                     port=self._engine[‘port‘],
                                     database=self._engine[‘database‘] ,
                                     charset=‘utf8‘)

    def insert(self, dataframe, table):
        """插入dataframe类型数据到指定表"""
        with self._conn.cursor() as cursor:
            col = ‘,‘.join(dataframe.columns)
            par = ‘,‘.join([‘%s‘] * dataframe.shape[1])
            keys = ‘,‘.join([f‘{i} = values({i})‘ for i in dataframe.columns])
            sql = f‘insert into {table} ({col}) values({par}) on duplicate key update {keys};‘
            cursor.executemany(sql, [tuple(i) for i in df.values])
            try:
                self._conn.commit()
                msg = f"""数据写入成功:共{dataframe.shape[0]}条数据写入{table}表!!!"""
            except Exception as err:
                self._conn.rollback()
                msg = f"数据写入失败!!!"
                raise Exception(msg)
        return msg

    def select(self, sql, kind=True):
        """查询数据:kind为True表示需要输出数据"""
        with self._conn.cursor() as cursor:
            try:
                cursor.execute(sql)
                if kind:
                    result = cursor.fetchall()
                    cols = cursor.description
                    return pd.DataFrame(result, columns=[i[0] for i in cols])
                else:
                    return self._conn.commit()
            except Exception as err:
                self._conn.rollback()
                msg = f"ERROR - {self._engine[‘host‘]} session init failed: {err}" + "\n"
                raise Exception(msg)

    def tables(self):
        """查看表"""
        sql = "show tables;"
        return self.select(sql)

    def drop(self, table):
        """删除表"""
        sql = f"drop table {table};"
        return self.select(sql, kind=False)

    def delete(self, table, tag=‘1=1‘):
        """删除数据,可以按条件删除"""
        sql = f"delete from {table} where {tag};"
        return self.select(sql, kind=False)

    def desc(self, table):
        """查看表结构"""
        sql = f"desc {table};"
        return self.select(sql)

    def process(self):
        """查看数据库进程"""
        sql = r"show processlist;"
        return self.select(sql)

    def kill(self, process_id):
        """关闭数据库进程"""
        sql = f"kill {process_id};"
        return self.select(sql, kind=False)

    def use(self, database):
        """切换数据库"""
        sql = f"use {database};"
        return self.select(sql, kind=False)

    def close(self):
        self._conn.close()

    def create_sql(self, table):
        sql = f"""show create table {table}"""
        dataframe = self.select(sql)
        return print(dataframe[‘Create Table‘][0])


if __name__ == ‘__main__‘:
    db = DB(rds_v1)
    df = db.tables()

pymysql操作数据库【进阶版】

上一篇:性能优化之数据库优化


下一篇:MySQL常见错误总结