RDS一键同步到新实例

一.前情提要

因需求要将生产环境多台RDS给克隆到DEV环境,因为环境在调试,在生产变动时可能要重新同步一遍,这个调试周期要一个多月,期间主要是环境的部署配置。

阿里云RDS同区域同步使用DTS操作将很方便,配置DTS的迁移按照教程就能免费又快捷的迁移了。

但当RDS很多,又要不定时去迁移一下(同步要钱),就需要通过阿里云开放的api来进行批量操作了。

需要使用python3,安装阿里云核心库和rds库
pip3 install aliyun-python-sdk-core
pip3 install aliyun-python-sdk-rds
RDS一键同步到新实例

注意:
1.需要填写同步的信息,当完成前几个,后面一个失败,需要注释掉成功的实例信息
2.需要加上阿里云ak和sk
3.脚本会清空同步实例的库,不然也没法同步

二.脚本

#!/bin/python3
 
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
import ast, sys, pymysql, time, json
 
#填写信息主库和从库
#实例名,实例ID,要迁移的库,账号名,密码
rds_info = [
    [("生产会员", "rm-2zeasdasdsasd", "prod_userdb", "dtscp", "123456"),("dev会员", "rm-2asdsadd", "dev_userdb", "dtscp", "123456")],
    [("生产支付", "rm-2zfasda", "prod_paydb", "dtscp", "1233456"),("dev支付", "rm-2asdasdasd", "dev_paydb", "dtscp", "123456")],
    ]


#自动变量
dts_id = ""
dts_list = []

#资源信息
aliyun_user_ak = 'Lwewwewew'
aliyun_user_sk = 'AEwqewqrwqeq'
region_id = 'cn-beijing'
client = AcsClient(ak=aliyun_user_ak, secret=aliyun_user_sk, region_id=region_id, timeout=300)


#创建DTS
def Create_Dts(req):
    global dts_id
    global dts_list
    req.set_action_name('CreateMigrationJob')
    req.add_query_param('RegionId', "cn-hangzhou")
    req.add_query_param('Region', "cn-beijing")
    req.add_query_param('MigrationJobClass', "large")
    req.add_query_param('ClientToken', "fj2j3kk2jasjd")
 
    response = client.do_action(req)
    false = False
    true = True
    response_dict = eval(response)
    dts_id = response_dict['MigrationJobId']
    dts_list.append(dts_id)
 
    if response_dict['Success'] == True:
        print("创建任务成功,ID为: " + dts_id)
    else:
        print("创建任务失败")
        sys.exit()
 
#验证是否是空的库,0为空,1为不空
def Check_Table(cur):
    sql = "show tables;"
    cur.execute(sql)
    table_number = cur.fetchall()
    if table_number:
        return 1
    else:
        return 0
 
def Create_Link(rds_list):
    #部分拼接
    rds_dest_ip = rds_list[1][1] + ".mysql.rds.aliyuncs.com" #目标实例IP
 
    db = pymysql.connect(host=rds_dest_ip, port=3306, user=rds_list[1][3], passwd=rds_list[1][4], db=rds_list[1][2], charset='utf8')
    cursor = db.cursor() #使用cursor方法创建一个游标
 
    print("开始清理数据库" + rds_list[1][2])
    if Check_Table(cursor) == 0:
        print(rds_list[1][2] + "库已经是空的,跳过清理步骤")
        cursor.close()
        db.close()
    else:
        #这里输出drop开头的删除表命令,存到元组里
        sql = "show tables ;"
        cursor.execute(sql)
        table_bin = cursor.fetchall()
        cursor.execute('SET foreign_key_checks = 0;')
 
        #循环执行drop删除表命令
        for i in table_bin:
            sql = "drop table" + '`' + i[0] + '`'
            cursor.execute(sql)
        cursor.execute('SET foreign_key_checks = 1;')
 
        #检测是否清空了
        if Check_Table(cursor) == 1:
            print(rds_list[1][2] + "库清空失败")
        else:
            print(rds_list[1][2] + "清空成功")
 
        #提交操作并关闭链接
        db.commit()
        cursor.close()
        db.close()
 
def Conf_Dts(req,rds_list):
    #部分拼接
    rds_job_name = rds_list[0][0] + "-》" + rds_list[1][0] #rds实例名
    rds_sour_ip = rds_list[0][1] + ".mysql.rds.aliyuncs.com" #源实例IP
    rds_dest_ip = rds_list[1][1] + ".mysql.rds.aliyuncs.com" #目标实例IP
    rds_sour_dbname = '\"%s\"' %(rds_list[0][2])
    rds_dest_dbname = '\"%s\"' %(rds_list[1][2])
    global dts_id
    print("开始进行" + rds_job_name + "的DTS任务配置")
 
    req.set_action_name('ConfigureMigrationJob')
    req.add_query_param('RegionId', "cn-hangzhou")
    req.add_query_param('MigrationJobName', rds_job_name)
    req.add_query_param('SourceEndpoint.InstanceType', "RDS")
    req.add_query_param('DestinationEndpoint.InstanceType', "RDS")
    req.add_query_param('MigrationMode.StructureIntialization', "true")
    req.add_query_param('MigrationMode.DataIntialization', "true")
    req.add_query_param('MigrationMode.DataSynchronization', "false")
    req.add_query_param('MigrationObject', "[{     \"DBName\": %s,     \"NewDBName\": %s }]" %(rds_sour_dbname, rds_dest_dbname))
    req.add_query_param('MigrationJobId', dts_id)
    req.add_query_param('SourceEndpoint.InstanceID', rds_list[0][1])
    req.add_query_param('SourceEndpoint.EngineName', "Mysql")
    req.add_query_param('SourceEndpoint.Region', "cn-beijing")
    req.add_query_param('SourceEndpoint.IP', rds_sour_ip)
    req.add_query_param('SourceEndpoint.Port', "3306")
    req.add_query_param('SourceEndpoint.UserName', rds_list[0][3])
    req.add_query_param('SourceEndpoint.Password', rds_list[0][4])
    req.add_query_param('DestinationEndpoint.InstanceID', rds_list[1][1])
    req.add_query_param('DestinationEndpoint.EngineName', "Mysql")
    req.add_query_param('DestinationEndpoint.Region', "cn-beijing")
    req.add_query_param('DestinationEndpoint.IP', rds_dest_ip)
    req.add_query_param('DestinationEndpoint.Port', "3306")
    req.add_query_param('DestinationEndpoint.UserName', rds_list[1][3])
    req.add_query_param('DestinationEndpoint.Password', rds_list[1][4])
 
    response = client.do_action(req)
    print(str(response, encoding = 'utf-8'))
 
def Check_Dts(req, dts_name):
    req.set_action_name('DescribeMigrationJobStatus')
    req.add_query_param('RegionId', "cn-hangzhou")
    req.add_query_param('MigrationJobId', dts_name)
    response = client.do_action(request)
    res_dict = json.loads(response)
 
    dts_stat = {'NotStarted':'未启动',
        'Prechecking':'预检查中',
        'PrecheckFailed':'预检查失败',
        'Migrating':'迁移中',
        'Suspending':'暂停中',
        'MigrationFailed':'迁移失败',
        'Finished':'完成'}
 
    print(dts_name + "当前状态:" + dts_stat[res_dict['MigrationJobStatus']])

#开始执行
for rds_list in rds_info:
    #初始化
    request = CommonRequest()
    request.set_accept_format('json')
    request.set_domain('dts.aliyuncs.com')
    request.set_method('POST')
    request.set_protocol_type('https') # https | http
    request.set_version('2020-01-01')

    Create_Dts(request) #创建dts任务
    Create_Link(rds_list) #清理数据库
    Conf_Dts(request, rds_list) #配置DTS信息
    print("")

#循环显示状态
while True:
    for i in dts_list:
        Check_Dts(request, i)

    print("")
    time.sleep(30)
上一篇:持续时间Table.AddColumn(Power Query 之 M 语言)


下一篇:算法分析与设计——算法问题求解基础