class Database:
def __init__(self):
self.save_schema = 'dwd'
self.save_table = 'dwd_xxx'
self.mysql_dwd_config = {
'drivername': 'mysql+pymysql',
'username': 'user_a',
'password': 'xxx@#$xxx',
'host': 'am-xxxxx.ads.aliyuncs.com',
'port': 3306,
}
if sqlalchemy.__version__ > '1.4': #其实大于1.4.15之后,密码里面含有@,就必须以这种方式创建正确正则识别密码的引擎了。
self.mysql_engine_url = sqlalchemy.engine.URL.create(**self.mysql_dwd_config)
else:
# password 含有@
self.mysql_engine_url = '{drivername}://{username}:{password}@{host}:{port}'.format(**self.mysql_dwd_config)
self.mysql_dwd_engine = sqlalchemy.create_engine(self.mysql_engine_url)
@Usual.time_stat
def drop_mysql(self):
sql = f"DROP TABLE IF EXISTS {self.save_schema}.{self.save_table};"
with self.mysql_dwd_engine.connect() as conn:
conn.execute(sqlalchemy.text(sql))
print('drop_table_done.')
return
@Usual.time_stat
def get_mysql_data(self, sql, chunksize=None):
if not chunksize:
new_df = pd.read_sql(sql=sql, con=db.mysql_dwd_engine, chunksize=None)
else:
df_iter = pd.read_sql(sql=sql, con=db.mysql_dwd_engine, chunksize=chunksize)
new_df = pd.DataFrame()
for temp_df in df_iter:
new_df = new_df.append(temp_df, ignore_index=True)
print(f'get_df shape: {new_df.shape}')
return new_df
@Usual.time_stat
def write_mysql_data(self, df, if_exists='replace'):
df.to_sql(
con=self.mysql_dwd_engine,
schema=self.save_schema,
name=self.save_table,
if_exists=if_exists,
index=False,
index_label='id',
chunksize=10000,
)
-- 插入中文不仅是读写引擎的encoding=utf8mb4问题,一般现在都是默认的
-- 所以是创建schema的原始问题
create schema your_schema collate utf8mb4_unicode_ci;