csv文件批量插入sybase数据库
# -*- coding:utf-8 -*-
import os
import pyodbc
import pandas as pd
from math import ceil
from pandas.api.types import is_datetime64_any_dtype
from pandas.api.types import is_float_dtype
from pandas.api.types import is_integer_dtype
from pandas.api.types import is_object_dtype
class CsvToSybase(object):
def __init__(self, DSN, UID, pwd):
# connect to sybase and get cursor
try:
self.conn = pyodbc.connect('DSN={}; UID={}; pwd={}'.format(DSN, UID, pwd))
except Exception as e:
print(e)
self.cursor = self.conn.cursor()
def read_csv(self, filename):
# read csv file and make table name
df = pd.read_csv(filename, keep_default_na=False, encoding='utf-8')
table_name = '[' + os.path.split(filename)[-1].split('.')[0].replace(' ', '_') + ']'
self.csv_to_sybase(table_name=table_name, df=df)
def make_table_sql(self, df):
# field type conversion
columns = df.columns.tolist()
types = df.ftypes
make_table = []
make_field = []
for item in columns:
item1 = '[' + item.replace(' ', '_') + ']'
if is_integer_dtype(types[item]):
char = item1 + ' INT'
elif is_float_dtype(types[item]):
char = item1 + ' FLOAT'
elif is_object_dtype(types[item]):
char = item1 + ' VARCHAR(255)'
elif is_datetime64_any_dtype(types[item]):
char = item1 + ' DATETIME'
else:
char = item1 + ' VARCHAR(254)'
make_table.append(char)
make_field.append(item1)
return ','.join(make_table), ','.join(make_field)
def csv_to_sybase(self, db_name, table_name, df):
# create table and insert data
field1, field2 = self.make_table_sql(df)
drop_table_sql = "DROP TABLE {}".format(table_name)
create_table_sql = "CREATE TABLE {}({})".format(table_name, field1)
try:
self.cursor.execute(drop_table_sql)
except Exception:
...
self.cursor.execute(create_table_sql)
values = df.values.tolist()
s = ','.join(["?" for _ in range(len(df.columns))])
try:
times = ceil(len(values) / 10000)
for v in range(times):
self.cursor.executemany('INSERT INTO {} ({}) values ({})'.format(table_name, field2, s),
values[v * 10000:(v + 1) * 10000])
self.conn.commit()
except Exception as e:
print(e)
finally:
self.conn.commit()
self.cursor.close()
self.conn.close()
if __name__ == "__main__":
obj = CsvToSybase(DSN="***", UID="***", pwd="***")
# csv文件目录
csv_file_dir = 'path'
file_list = os.listdir(csv_file_dir)
for i in range(len(file_list)):
file_path = os.path.join(csv_file_dir, file_list[i])
if os.path.isfile(file_path):
obj.read_csv(file_path)