python之内置sqlite3

#!/usr/bin/python
# -*- coding: utf-8 -*-
# @Author  : Yunhgu
# @Software: Vscode
# @Time    : 2021-04-20 08:17:24

# 导入python3内置数据库驱动模块
import sqlite3
import os
from logTool import logTool
logger = logTool()


class optionSqlite():

    def __init__(self, dbname="default.db"):
        # 连接到数据库,如果不存在就创建
        self.conn = sqlite3.connect(os.path.join(
            os.path.dirname(__file__), dbname))
        # 创建游标
        self.cur = self.conn.cursor()
        # 如果不存在便创建表
        self.cur.execute(‘‘‘
        CREATE TABLE if not exists exchanges  (
        exchange_name varchar(50)  NOT NULL PRIMARY KEY,
        exchange_type varchar(15)  NOT NULL,
        bind_queue varchar(50),
        create_time datetime(6) NOT NULL,
        comment varchar(50)
        );
        ‘‘‘)
        self.cur.execute(‘‘‘CREATE TABLE if not exists queues  (
        virtual_host varchar(50)  NOT NULL PRIMARY KEY,
        queue_name varchar(50)  NOT NULL,
        create_time datetime(6) NOT NULL,
        comment varchar(50)
        );
        ‘‘‘)

    # 将 db.row_factory 方法重写为 dict_factory 方法
    def dict_factory(self, cursor, row):
        d = {}
        for index, col in enumerate(cursor.description):
            d[col[0]] = row[index]
        return d

    # 插入数据
    def insert(self, sql):
        try:
            self.cur.execute(sql)  # 执行sql
            self.conn.commit()  # 增删改操作完数据库后,需要执行提交操作
            return True
        except Exception as e:
            logger.error("sqlite3 insert error and had rollback:%s", e)
            # 发生错误时回滚
            self.conn.rollback()
            return False

    # 查询数据
    def search(self, sql):
        try:
            # 调用重写的字典工厂方法,格式化查询的结果
            self.conn.row_factory = self.dict_factory
            cur = self.conn.cursor()
            queryResult = cur.execute(sql).fetchall()
            return queryResult
        except Exception as e:
            logger.error("sqlite3 insert error and had rollback:%s", e)
            return None

    # 更新数据
    def update(self, sql):
        try:
            self.cur.execute(sql)  # 执行sql
            self.conn.commit()  # 增删改操作完数据库后,需要执行提交操作
            return True
        except Exception as e:
            logger.error("sqlite3 update error and had rollback:%s", e)
            # 发生错误时回滚
            self.conn.rollback()
            return False

    # 删除数据
    def delete(self, sql):
        try:
            self.cur.execute(sql)  # 执行sql
            self.conn.commit()  # 增删改操作完数据库后,需要执行提交操作
            return True
        except Exception as e:
            logger.error("sqlite3 delete error and had rollback:%s", e)
            # 发生错误时回滚
            self.conn.rollback()
            return False
    
    # 关闭数据库链接
    def close(self):
        self.cur.commit()
        self.conn.close()


if __name__ == __main__:
    ops = optionSqlite()
    ops.insert(
        "INSERT INTO exchanges(exchange_name,exchange_type,create_time) VALUES(‘efg‘,‘direct‘,datetime(‘now‘, ‘localtime‘));")
    result = ops.search(
        "select exchanges.exchange_name,exchanges.exchange_type,exchanges.create_time from exchanges;")
    print(result)

 

python之内置sqlite3

上一篇:使用hbase,hive,hbase与建立宽表和ES 的分布式架构,实时监听到 db 的字段变更,再将变更的内容实时同步到 ES 和宽表设计天生支持海量数据查询


下一篇:sql:with as的用法