多线程读取mysql中一张2.4T的表并dump为文件
最近公司一个mysql集群要关闭,需要将一张表中的数据dump下来,八个节点,每个节点300g,总共2.4t,由于没有服务器密码只能通过脚本的方式将数据dump下来
from multiprocessing.dummy import Pool as TPool
import traceback
import MySQLdb
import pandas as pd
from dbutils.pooled_db import PooledDB
class LoadFromMysql():
def __init__(self):
self.mysql_pool = PooledDB(
creator=MySQLdb, mincached=5, maxcached=10,
host='******', port=****, user='*****', passwd='*****', charset='utf8'
)
self.db = self.mysql_pool.connection()
# 将数据库查询出的元组转为df
def get_df_from_db(self, sql):
cursor = self.db.cursor()
cursor.execute(sql)
data = cursor.fetchall()
columnDes = cursor.description # 获取连接对象的描述信息
columnNames = [columnDes[i][0] for i in range(len(columnDes))]
df = pd.DataFrame([list(i) for i in data], columns=columnNames)
return df
def extract_data(self, wtid, protocolid):
sql_pro = (
f'select descrcn from config.propaths where transtype =2 and descrcn regexp "平均风速|最小风速|最大风速|平均有功功率|最小有功功率|最大有功功率|平均无功功率|最小无功功率|最大无功功率" and'
' protocolid = {protocolid} order by offsets ASC'.format(protocolid=protocolid))
pro_df = self.get_df_from_db(sql_pro)
if not pro_df.empty:
cloums = ['wfid', 'wtid', 'rectime']
for descrcn in pro_df["descrcn"].values:
cloums.append(descrcn)
# 得到列长度
cloums_size = len(cloums)
statisticdata_sql = 'select wfid,wtid,rectime,id0,id1,id2,id3,id4,id5,id6,id7,id8,id9,id10,id11,id12 from dbo.statisticdata where wtid = {wtid}'.format(
wtid=wtid)
sta_df = self.get_df_from_db(statisticdata_sql)
# 抽取需要的列
extract_df = sta_df.iloc[:, 0:cloums_size]
# 转换列名
extract_df.columns = cloums
return (extract_df, wtid)
def write_data(self, df, wtid):
# 写出为文件 /data/wff_algo/data_file/
#/home/lijiapeng/projects/mysql_cluster_load/data/
df.to_csv(path_or_buf='E:\\{wtid}.csv'.format(wtid=wtid), sep=str(','), index=False, na_rep='',
header=True, encoding="utf_8_sig")
def load_from_mysql(self):
# 获取所有wtid以及对应的协议id
wtinfo_sql = 'select wtid,protocolid from config.wtinfo'
wtinfo_df = self.get_df_from_db(wtinfo_sql)
pool = TPool(6)
for wtid, protocolid in wtinfo_df.values:
try:
res = pool.apply_async(self.extract_data,(wtid, protocolid))
kw_tup = res.get()
if kw_tup:
pool.apply_async(self.write_data, (kw_tup[0], kw_tup[1]))
#print(f"执行完毕{wtid}".format(wtid=wtid))
except Exception as e:
print("写入文件时发生异常,异常wtid为{wtid}".format(wtid=wtid))
traceback.print_exc()
pool.close()
pool.join()
if __name__ == '__main__':
load = LoadFromMysql()
load.load_from_mysql()
#pass