csv文件导入Sybase数据库

csv文件批量插入sybase数据库

# -*- coding:utf-8 -*-
import os
import pyodbc
import pandas as pd
from math import ceil

from pandas.api.types import is_datetime64_any_dtype
from pandas.api.types import is_float_dtype
from pandas.api.types import is_integer_dtype
from pandas.api.types import is_object_dtype


class CsvToSybase(object):
    def __init__(self, DSN, UID, pwd):
        # connect to sybase and get cursor
        try:
            self.conn = pyodbc.connect('DSN={}; UID={}; pwd={}'.format(DSN, UID, pwd))
        except Exception as e:
            print(e)
        self.cursor = self.conn.cursor()

    def read_csv(self, filename):
        # read csv file and make table name
        df = pd.read_csv(filename, keep_default_na=False, encoding='utf-8')
        table_name = '[' + os.path.split(filename)[-1].split('.')[0].replace(' ', '_') + ']'
        self.csv_to_sybase(table_name=table_name, df=df)

    def make_table_sql(self, df):
        # field type conversion
        columns = df.columns.tolist()
        types = df.ftypes
        make_table = []
        make_field = []

        for item in columns:
            item1 = '[' + item.replace(' ', '_') + ']'
            if is_integer_dtype(types[item]):
                char = item1 + ' INT'
            elif is_float_dtype(types[item]):
                char = item1 + ' FLOAT'
            elif is_object_dtype(types[item]):
                char = item1 + ' VARCHAR(255)'
            elif is_datetime64_any_dtype(types[item]):
                char = item1 + ' DATETIME'
            else:
                char = item1 + ' VARCHAR(254)'

            make_table.append(char)
            make_field.append(item1)
        return ','.join(make_table), ','.join(make_field)

    def csv_to_sybase(self, db_name, table_name, df):
        # create table and insert data
        field1, field2 = self.make_table_sql(df)
        drop_table_sql = "DROP TABLE {}".format(table_name)
        create_table_sql = "CREATE TABLE {}({})".format(table_name, field1)

        try:
            self.cursor.execute(drop_table_sql)
        except Exception:
            ...

        self.cursor.execute(create_table_sql)

        values = df.values.tolist()
        s = ','.join(["?" for _ in range(len(df.columns))])

        try:
            times = ceil(len(values) / 10000)
            for v in range(times):
                self.cursor.executemany('INSERT INTO {} ({}) values ({})'.format(table_name, field2, s),
                                        values[v * 10000:(v + 1) * 10000])
                self.conn.commit()
        except Exception as e:
            print(e)
        finally:
            self.conn.commit()
            self.cursor.close()
            self.conn.close()


if __name__ == "__main__":
    obj = CsvToSybase(DSN="***", UID="***", pwd="***")
    # csv文件目录
    csv_file_dir = 'path'
    file_list = os.listdir(csv_file_dir)
    for i in range(len(file_list)):
        file_path = os.path.join(csv_file_dir, file_list[i])
        if os.path.isfile(file_path):
            obj.read_csv(file_path)
上一篇:.NET Core Dapper使用Sybase数据库中文乱码问题(cp850)


下一篇:Sybase获取所有主键和索引,并删除