一.前情提要
因需求要将生产环境多台RDS给克隆到DEV环境,因为环境在调试,在生产变动时可能要重新同步一遍,这个调试周期要一个多月,期间主要是环境的部署配置。
阿里云RDS同区域同步使用DTS操作将很方便,配置DTS的迁移按照教程就能免费又快捷的迁移了。
但当RDS很多,又要不定时去迁移一下(同步要钱),就需要通过阿里云开放的api来进行批量操作了。
需要使用python3,安装阿里云核心库和rds库pip3 install aliyun-python-sdk-core
pip3 install aliyun-python-sdk-rds
注意:
1.需要填写同步的信息,当完成前几个,后面一个失败,需要注释掉成功的实例信息
2.需要加上阿里云ak和sk
3.脚本会清空同步实例的库,不然也没法同步
二.脚本
#!/bin/python3
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
import ast, sys, pymysql, time, json
#填写信息主库和从库
#实例名,实例ID,要迁移的库,账号名,密码
rds_info = [
[("生产会员", "rm-2zeasdasdsasd", "prod_userdb", "dtscp", "123456"),("dev会员", "rm-2asdsadd", "dev_userdb", "dtscp", "123456")],
[("生产支付", "rm-2zfasda", "prod_paydb", "dtscp", "1233456"),("dev支付", "rm-2asdasdasd", "dev_paydb", "dtscp", "123456")],
]
#自动变量
dts_id = ""
dts_list = []
#资源信息
aliyun_user_ak = 'Lwewwewew'
aliyun_user_sk = 'AEwqewqrwqeq'
region_id = 'cn-beijing'
client = AcsClient(ak=aliyun_user_ak, secret=aliyun_user_sk, region_id=region_id, timeout=300)
#创建DTS
def Create_Dts(req):
global dts_id
global dts_list
req.set_action_name('CreateMigrationJob')
req.add_query_param('RegionId', "cn-hangzhou")
req.add_query_param('Region', "cn-beijing")
req.add_query_param('MigrationJobClass', "large")
req.add_query_param('ClientToken', "fj2j3kk2jasjd")
response = client.do_action(req)
false = False
true = True
response_dict = eval(response)
dts_id = response_dict['MigrationJobId']
dts_list.append(dts_id)
if response_dict['Success'] == True:
print("创建任务成功,ID为: " + dts_id)
else:
print("创建任务失败")
sys.exit()
#验证是否是空的库,0为空,1为不空
def Check_Table(cur):
sql = "show tables;"
cur.execute(sql)
table_number = cur.fetchall()
if table_number:
return 1
else:
return 0
def Create_Link(rds_list):
#部分拼接
rds_dest_ip = rds_list[1][1] + ".mysql.rds.aliyuncs.com" #目标实例IP
db = pymysql.connect(host=rds_dest_ip, port=3306, user=rds_list[1][3], passwd=rds_list[1][4], db=rds_list[1][2], charset='utf8')
cursor = db.cursor() #使用cursor方法创建一个游标
print("开始清理数据库" + rds_list[1][2])
if Check_Table(cursor) == 0:
print(rds_list[1][2] + "库已经是空的,跳过清理步骤")
cursor.close()
db.close()
else:
#这里输出drop开头的删除表命令,存到元组里
sql = "show tables ;"
cursor.execute(sql)
table_bin = cursor.fetchall()
cursor.execute('SET foreign_key_checks = 0;')
#循环执行drop删除表命令
for i in table_bin:
sql = "drop table" + '`' + i[0] + '`'
cursor.execute(sql)
cursor.execute('SET foreign_key_checks = 1;')
#检测是否清空了
if Check_Table(cursor) == 1:
print(rds_list[1][2] + "库清空失败")
else:
print(rds_list[1][2] + "清空成功")
#提交操作并关闭链接
db.commit()
cursor.close()
db.close()
def Conf_Dts(req,rds_list):
#部分拼接
rds_job_name = rds_list[0][0] + "-》" + rds_list[1][0] #rds实例名
rds_sour_ip = rds_list[0][1] + ".mysql.rds.aliyuncs.com" #源实例IP
rds_dest_ip = rds_list[1][1] + ".mysql.rds.aliyuncs.com" #目标实例IP
rds_sour_dbname = '\"%s\"' %(rds_list[0][2])
rds_dest_dbname = '\"%s\"' %(rds_list[1][2])
global dts_id
print("开始进行" + rds_job_name + "的DTS任务配置")
req.set_action_name('ConfigureMigrationJob')
req.add_query_param('RegionId', "cn-hangzhou")
req.add_query_param('MigrationJobName', rds_job_name)
req.add_query_param('SourceEndpoint.InstanceType', "RDS")
req.add_query_param('DestinationEndpoint.InstanceType', "RDS")
req.add_query_param('MigrationMode.StructureIntialization', "true")
req.add_query_param('MigrationMode.DataIntialization', "true")
req.add_query_param('MigrationMode.DataSynchronization', "false")
req.add_query_param('MigrationObject', "[{ \"DBName\": %s, \"NewDBName\": %s }]" %(rds_sour_dbname, rds_dest_dbname))
req.add_query_param('MigrationJobId', dts_id)
req.add_query_param('SourceEndpoint.InstanceID', rds_list[0][1])
req.add_query_param('SourceEndpoint.EngineName', "Mysql")
req.add_query_param('SourceEndpoint.Region', "cn-beijing")
req.add_query_param('SourceEndpoint.IP', rds_sour_ip)
req.add_query_param('SourceEndpoint.Port', "3306")
req.add_query_param('SourceEndpoint.UserName', rds_list[0][3])
req.add_query_param('SourceEndpoint.Password', rds_list[0][4])
req.add_query_param('DestinationEndpoint.InstanceID', rds_list[1][1])
req.add_query_param('DestinationEndpoint.EngineName', "Mysql")
req.add_query_param('DestinationEndpoint.Region', "cn-beijing")
req.add_query_param('DestinationEndpoint.IP', rds_dest_ip)
req.add_query_param('DestinationEndpoint.Port', "3306")
req.add_query_param('DestinationEndpoint.UserName', rds_list[1][3])
req.add_query_param('DestinationEndpoint.Password', rds_list[1][4])
response = client.do_action(req)
print(str(response, encoding = 'utf-8'))
def Check_Dts(req, dts_name):
req.set_action_name('DescribeMigrationJobStatus')
req.add_query_param('RegionId', "cn-hangzhou")
req.add_query_param('MigrationJobId', dts_name)
response = client.do_action(request)
res_dict = json.loads(response)
dts_stat = {'NotStarted':'未启动',
'Prechecking':'预检查中',
'PrecheckFailed':'预检查失败',
'Migrating':'迁移中',
'Suspending':'暂停中',
'MigrationFailed':'迁移失败',
'Finished':'完成'}
print(dts_name + "当前状态:" + dts_stat[res_dict['MigrationJobStatus']])
#开始执行
for rds_list in rds_info:
#初始化
request = CommonRequest()
request.set_accept_format('json')
request.set_domain('dts.aliyuncs.com')
request.set_method('POST')
request.set_protocol_type('https') # https | http
request.set_version('2020-01-01')
Create_Dts(request) #创建dts任务
Create_Link(rds_list) #清理数据库
Conf_Dts(request, rds_list) #配置DTS信息
print("")
#循环显示状态
while True:
for i in dts_list:
Check_Dts(request, i)
print("")
time.sleep(30)