python pymysql操作数据库

import pymysql


def creatDB(dbName):
    """
    dbName:数据库名称
    创建数据库
    """
    conn = pymysql.connect(host=localhost, port=3306, user=root, password=root, charset=utf8)
    myCursor = conn.cursor()
    myCursor.execute("CREATE DATABASE {}".format(str(dbName)))
    myCursor.close()
    conn.close()


def creatTable(dbName, tableName):
    """
    dbName:数据库名称
    tableName:表名称
    创建表
    """
    conn = pymysql.connect(
        host=localhost,
        port=3306,
        user=root,
        password=root,
        database=str(dbName),
        charset=utf8
    )
    myCursor = conn.cursor()
    sql = "CREATE TABLE {} ("           "id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, "           "name VARCHAR(255) UNIQUE, "           "age TINYINT DEFAULT NULL, "           "gender boolean DEFAULT FALSE, "           "testText TEXT, "           "img mediumblob DEFAULT NULL)".format(str(tableName))
    myCursor.execute(sql)
    myCursor.close()
    conn.close()


def getAllTables(dbName):
    """
    dbName:数据库名称
    tableName:表名称
    dataDic:一个字典,键是表的字段,值是要插入的值
    获取所有的表名称,返回值是一个包含所有表名称的列表
    """
    tableList = []
    conn = pymysql.connect(host=localhost, port=3306, user=root, password=root, database=str(dbName),
                           charset=utf8)
    myCursor = conn.cursor()
    myCursor.execute("show tables")
    tableNames = myCursor.fetchall()
    for tableName in tableNames:
        tableList.append(tableName[0])
    myCursor.close()
    conn.close()

    return tableList


def insertIntoTable(dbName, tableName, dataDic):
    """
    dbName:数据库名称
    tableName:表名称
    dataDic:一个字典,键是表的字段,值是要插入的值
    往数据库里插入一条数据
    """
    conn = pymysql.connect(host=localhost, port=3306, user=root, password=root, database=str(dbName),
                           charset=utf8)
    myCursor = conn.cursor()

    keys = , .join(dataDic.keys())
    values = , .join([%s] * len(dataDic))
    sql = INSERT INTO {table} ({keys}) VALUES ({values}).format(table=tableName, keys=keys, values=values)
    try:
        if myCursor.execute(sql, tuple(dataDic.values())):
            print(插入数据成功)
            conn.commit()
            lastID = myCursor.lastrowid

            return lastID

    except Exception as e:
        print(e)
        print(插入数据失败)
        conn.rollback()
    finally:
        myCursor.close()
        conn.close()


def updateIntoTable(dbName, tableName, dataDic):
    """
    dbName:数据库名称
    tableName:表名称
    dataDic:一个字典,键是表的字段,值是要插入的值
    如果数据库里面有这条数据,就对这条数据进行修改操作,如果没有,就增加一条数据,进行修改操作时,ID不变。
    返回值是最新插入数据的ID或被修改数据的原ID。
    """

    conn = pymysql.connect(host=localhost, port=3306, user=root, password=root, database=str(dbName),
                           charset=utf8)
    myCursor = conn.cursor()
    keys = , .join(dataDic.keys())
    values = , .join([%s] * len(dataDic))
    sql = INSERT INTO {table}({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE.format(table=tableName, keys=keys,
                                                                                         values=values)
    update = ,.join([" {key} = %s".format(key=key) for key in dataDic])
    sql += update
    try:
        if myCursor.execute(sql, tuple(dataDic.values()) * 2):
            print(插入数据成功)
            conn.commit()
            lastID = myCursor.lastrowid

            return lastID

    except Exception as e:
        print(e)
        print(插入数据失败)
        conn.rollback()
    finally:
        myCursor.close()
        conn.close()


def replaceIntoTable(dbName, tableName, dataDic):
    """
    dbName:数据库名称
    tableName:表名称
    dataDic:一个字典,键是表的字段,值是要插入的值
    如果数据库里面有这条数据,先删除原来的数据在进行增加,如果没有,就增加一条数据,ID改变。
    返回值是最新插入数据的ID。
    """
    conn = pymysql.connect(host=localhost, port=3306, user=root, password=root, database=str(dbName),
                           charset=utf8)
    myCursor = conn.cursor()

    keys = , .join(dataDic.keys())
    values = , .join([%s] * len(dataDic))
    sql = REPLACE INTO {table} ({keys}) VALUES ({values}).format(table=tableName, keys=keys, values=values)
    try:
        if myCursor.execute(sql, tuple(dataDic.values())):
            print(插入数据成功)
            conn.commit()
            lastID = myCursor.lastrowid

            return lastID

    except Exception as e:
        print(e)
        print(插入数据失败)
        conn.rollback()
    finally:
        myCursor.close()
        conn.close()


def delData(dbName, tableName, condition):
    """
    dbName:数据库名称
    tableName:表名称
    condition:删除的条件
    删除数据
    """
    conn = pymysql.connect(host=localhost, port=3306, user=root, password=root, database=str(dbName),
                           charset=utf8)
    myCursor = conn.cursor()
    sql = DELETE FROM {table} WHERE {condition}.format(table=tableName, condition=condition)
    try:
        if myCursor.execute(sql):
            print(删除数据成功)
            conn.commit()
    except Exception as e:
        print(e)
        print(删除数据失败)
        conn.rollback()
    finally:
        myCursor.close()
        conn.close()


def selectDataAll(dbName, tableName):
    """
    dbName:数据库名称
    tableName:表名称
    查询所有的数据
    """
    conn = pymysql.connect(host=localhost, port=3306, user=root, password=root, database=str(dbName),
                           charset=utf8)
    myCursor = conn.cursor()
    sql = SELECT * FROM {table}.format(table=tableName)
    if myCursor.execute(sql):
        allData = myCursor.fetchall()
        return allData
    else:
        return None


def selectDataOne(dbName, tableName):
    """
    dbName:数据库名称
    tableName:表名称
    查询第一条数据
    """
    conn = pymysql.connect(host=localhost, port=3306, user=root, password=root, database=str(dbName),
                           charset=utf8)
    myCursor = conn.cursor()
    sql = SELECT * FROM {table}.format(table=tableName)
    if myCursor.execute(sql):
        oneData = myCursor.fetchone()
        return oneData
    else:
        return None


def selectDataMany(dbName, tableName, size):
    """
    dbName:数据库名称
    tableName:表名称
    size:想要查询的数据 int类型
    查询多条数据
    """
    conn = pymysql.connect(host=localhost, port=3306, user=root, password=root, database=str(dbName),
                           charset=utf8)
    myCursor = conn.cursor()
    sql = SELECT * FROM {table}.format(table=tableName)
    if myCursor.execute(sql):
        manyData = myCursor.fetchmany(size)
        return manyData
    else:
        return None

 

python pymysql操作数据库

上一篇:关于vue ui运行项目无反应的问题


下一篇:MySQL - 数据库设计的三范式