首先,需要搭建mongodb副本集,才能进行增量备份,此过程略过
本人的构想是每天进行一次备份,这次备份是全量备份还是增量备份取决于最后一次全量备份的时间节点的日志是否已被覆盖(因为oplog的大小是有上限的,所以新的操作会覆盖旧的),如果已被覆盖,则进行全量备份,否则从该时间节点开始进行增量备份。为了方便,每次全量备份完成后会将oplog.bson放到固定的地方
代码如下:
# encoding: utf-8
import argparse
import calendar
import json
import logging
import logging.handlers
import os
import shutil
import traceback
from datetime import datetime
from bson import Timestamp
from pymongo import MongoClient
class MyLogger(object):
"""日志控制"""
def __init__(self,
log_file="logs/run.log"):
# set logger
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
# file handler
fh = logging.handlers.RotatingFileHandler(log_file, ‘a‘, 1000000, 5)
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter(‘%(asctime)s %(levelname)s: %(message)s‘)
fh.setFormatter(formatter)
self.logger.addHandler(fh)
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
ch.setFormatter(formatter)
self.logger.addHandler(ch)
# set output logger
self.s_logger = logging.getLogger("display")
self.s_logger.setLevel(logging.INFO)
# stream handler
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter_stream = logging.Formatter(‘%(message)s‘)
ch.setFormatter(formatter_stream)
self.s_logger.addHandler(ch)
def log_exit(self, msg):
"""打印并退出"""
self.logger.error(msg)
os._exit(1)
def warning(self, msg):
"""打印错误日志"""
self.logger.warning(msg)
def error(self, msg):
"""打印错误日志"""
self.logger.error(msg)
def critical(self, msg):
"""打印紧急日志"""
self.logger.critical(msg)
def debug(self, msg):
"""记录调试日志"""
self.logger.debug(msg)
def info(self, msg):
COW = ‘‘‘
\ ^__^
\ (oo)\_______
(__)\ )\/\
||----w |
|| ||
|| ||
‘‘‘
#print COW
self.logger.info(msg)
def load_config():
# 读取基本配置
with open(‘./warning_config.json‘, ‘r‘) as fp:
config = json.load(fp)
return config
class BackupMongodbClass(object):
def __init__(self, config, log, date_str=None):
self.db = config[‘mongo‘][‘db‘]
self.host = config[‘mongo‘].get(‘host‘, ‘127.0.0.1‘)
self.port = config[‘mongo‘].get(‘port‘, 27017)
self.username = config[‘mongo‘].get(‘username‘)
self.password = config[‘mongo‘].get(‘password‘)
self.auth_db = config[‘mongo‘].get(‘auth_db‘) if config[‘mongo‘].get(‘auth_db‘) else self.db
self.base_backup_dir = config[‘mongo_back_up‘]
self.oplog_path = os.path.join(self.base_backup_dir, ‘oplog.bson‘)
self.log = log
self.client = self.connet_db()
if not date_str:
self.date_str = BackupMongodbClass.get_date()
else:
self.date_str = date_str
@staticmethod
def get_date():
now_ = datetime.now()
date_str = now_.strftime("%Y%m%d")
return date_str
def read_back_up_time(self):
"""读取全量备份最后的时间戳"""
try:
cmd = "bsondump %s" % (self.oplog_path)
ret = os.popen(cmd)
res = ret.read()
lines = res.splitlines()
val = lines[-1]
bsondump_jsonresult = json.loads(val)
backup_time_read = bsondump_jsonresult["ts"]
return backup_time_read
except Exception as e:
print(‘查找备份时间点失败:{}‘.format(e))
self.log.error(‘查找备份时间点失败:{}‘.format(e))
return False
def full_backup(self):
"""全量备份,并且将oplos.bson文件复制到 self.oplog_path"""
dir_name = self.date_str + ‘_full‘
back_path = os.path.join(self.base_backup_dir, dir_name)
if self.password and self.auth_db and self.username:
cmd = "mongodump -h {}:{} --authenticationDatabase {} -u {} -p {} --oplog -o={}" .format(
self.host, self.port, self.auth_db, self.username, self.password, back_path)
else:
cmd = "mongodump -h {}:{} --oplog -o={}".format(self.host, self.port, back_path)
print(‘执行全量备份命令‘)
print(cmd)
self.log.info(‘执行全量备份命令:{}‘.format(cmd))
if os.path.exists(back_path):
shutil.rmtree(back_path)
ret = os.system(cmd)
if ret == 0:
print(‘{}全量备份成功‘.format(self.date_str))
self.log.info(‘{}全量备份成功‘.format(self.date_str))
newest_oplog_path = os.path.join(back_path, ‘oplog.bson‘)
try:
shutil.copy(newest_oplog_path, self.oplog_path)
except Exception as e:
print(‘复制oplog.bson失败:{}‘.format(e))
self.log.error(‘复制oplog.bson失败:{}‘.format(e))
else:
print(‘{}全量备份失败‘.format(self.date_str))
self.log.error(‘{}全量备份失败‘.format(self.date_str))
def increment_backup(self, backup_time_read=None):
"""增量备份"""
if not backup_time_read:
backup_time_read = self.read_back_up_time()
if not backup_time_read:
self.log.error(‘未找到上次备份时间节点,无法进行增量备份‘)
return
t = backup_time_read["$timestamp"]["t"]
i = backup_time_read["$timestamp"]["i"]
latest_back_up_date = str(datetime.fromtimestamp(t).date()).replace(‘-‘, ‘‘)
dir_name = self.date_str + ‘_inc_‘ + latest_back_up_date
back_path = os.path.join(self.base_backup_dir, dir_name)
if self.password and self.auth_db and self.username:
cmd = """mongodump -h %s:%s --authenticationDatabase %s -u %s -p %s -d local -c oplog.rs -q "{ts: {‘\$gte‘: Timestamp(%s, %s)}}" -o=%s """ % (
self.host, self.port, self.auth_db, self.username, self.password, t, i, back_path)
else:
cmd = """mongodump -h %s:%s -d local -c oplog.rs -q "{ts: {‘\$gte‘: Timestamp(%s, %s)}}" -o=%s """ % (
self.host, self.port, t, i, back_path)
print(‘执行增量备份命令‘)
print(cmd)
self.log.info(‘执行增量备份命令:{}‘.format(cmd))
if os.path.exists(back_path):
shutil.rmtree(back_path)
ret = os.system(cmd)
if ret == 0:
print(‘{}增量备份成功‘.format(self.date_str))
self.log.info(‘{}增量备份成功‘.format(self.date_str))
else:
print(‘{}增量备份失败‘.format(self.date_str))
self.log.error(‘{}增量备份失败‘.format(self.date_str))
def connet_db(self):
""""连接客户端"""
try:
client = MongoClient(self.host, self.port)
if self.password and self.username and self.auth_db:
auth_db = client[self.auth_db]
auth_db.authenticate(self.username, self.password)
return client
except Exception as e:
self.log.error(‘连接数据库失败:{}‘.format(e))
traceback.print_exc()
def back_up_data(self):
"""当没有全量备份数据或者备份当天是周一或者上次全量备份的timestamp无法在oplog中找到时进行全量备份"""
year = int(self.date_str[0:4])
month = int(self.date_str[4:6])
day = int(self.date_str[6:])
currentday = calendar.weekday(year, month, day)
if currentday == 0:
self.full_backup()
return
if not os.path.exists(self.oplog_path):
self.full_backup()
else:
latest_back_up = self.read_back_up_time()
if latest_back_up is False:
self.full_backup()
return
ts = latest_back_up["$timestamp"]["t"]
i = latest_back_up["$timestamp"]["i"]
db = self.client[‘local‘]
count = db.oplog.rs.find({‘ts‘: Timestamp(ts, i)}).count()
if count == 0:
self.full_backup()
else:
self.increment_backup(latest_back_up)
def parse_args():
"""
Desc:
进行参数设置
"""
parser = argparse.ArgumentParser(description=‘Dataset loading and exporting utilities.‘)
parser.add_argument(‘-a‘, ‘--action‘, choices=[‘full‘, ‘increment‘, ‘casual‘], dest=‘action‘, help=‘The action you would like to perform.‘, required=True)
args = parser.parse_args()
return args
def main():
log_file = ‘logs/mongo_backup.log‘
logger = MyLogger(log_file)
config = load_config()
backup_tool = BackupMongodbClass(config, logger)
args = parse_args()
if args.action == ‘full‘:
backup_tool.full_backup()
elif args.action == ‘increment‘:
backup_tool.increment_backup()
else:
backup_tool.back_up_data()
if __name__ == ‘__main__‘:
main()