自动解压
函数计算实现 oss 上传超大 zip 压缩文件的自动解压处理
姊妹篇(非oss触发器使用案例):
使用函数计算流式打包OSS文件
object 备份
oss 跨 region 备份或者 NAS 备份
下面代码代码示例,oss 触发器触发的函数, 自动将该 object 从本region复制到另外一个region, 或者说备份到nas
# -*- coding: utf-8 -*-
import oss2, json
import os
import logging
OSS_DEST_REGION = os.environ["OSS_DEST_REGION"]
OSS_DEST_BUCKET = os.environ["OSS_DEST_BUCKET"]
OSS_DEST_AK_ID = os.environ["OSS_DEST_AK_ID"]
OSS_DEST_AK_SK = os.environ["OSS_DEST_AK_SK"]
OSS_DEST_CALLBACK = os.environ["OSS_DEST_CALLBACK"]
# Close the info log printed by the oss SDK
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)
def handler(event, context):
evt_lst = json.loads(event)
print(evt_lst)
creds = context.credentials
auth=oss2.StsAuth(
creds.access_key_id,
creds.access_key_secret,
creds.security_token)
evt = evt_lst['events'][0]
bucket_name = evt['oss']['bucket']['name']
endpoint = 'oss-' + evt['region'] + '-internal.aliyuncs.com'
bucket = oss2.Bucket(auth, endpoint, bucket_name)
object_name = evt['oss']['object']['key']
r = bucket.get_object(object_name)
headers = None
# copy object 去另外一个region
''' 如果有自定义的callback, 可以参考如下code
call_back_event = {} # 用户自定义的参数,这里示例是一个空的dict
callback_dict = {}
callback_dict['callbackUrl'] = OSS_DEST_CALLBACK
callback_body = '&'.join(['{0!s}={1!s}'.format(k, v) for k, v in call_back_event.items()])
callback_dict['callbackBody'] = callback_body
callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'
callback_param = json.dumps(callback_dict).strip()
base64_callback_body = base64.b64encode(bytes(callback_param, encoding='utf-8'))
base64_callback_body = str(base64_callback_body, encoding='utf-8')
headers = {'x-oss-callback': base64_callback_body}
'''
# 备份到另外一个region的bucket
oss_dest_auth = oss2.Auth(OSS_DEST_AK_ID, OSS_DEST_AK_SK)
oss_dest_bucket = oss2.Bucket(oss_dest_auth, OSS_DEST_REGION, OSS_DEST_BUCKET)
oss_dest_bucket.put_object(filename, r, headers)
# 备份到NAS
# 写入函数计算挂载的nas,假设挂载的目录为 /mnt/nas_dir
with open("/mnt/nas_dir","w") as f:
f.write(r.read())
oss object 自动上传 FTP 备份
以下是这样的一个代码示例, 用户上传了一个文件到 bucket, 自动触发函数, 并且将这个文件的 md5 值传到某个服务用于校验, 同时将这个文件上传到 ftp 服务器
# -*- coding: utf-8 -*-
import oss2, json
import os
import logging
import requests
from hashlib import md5
import datetime, time
from ftplib import FTP
GMT_FORMAT = '%a, %d %b %Y %H:%M:%S GMT'
LOGGER = logging.getLogger()
MD5_HTTP_URL = "<http server url>"
# 将对应的ftp 信息改写正确
FTP_ADDR = "<ftp addr>"
FTP_PORT = 21
FTP_USER = "<ftp-user-name>"
FTP_PWD = "123456"
FTP_SAVE_ROOT_PATH = "/home/remote1/"
# Close the info log printed by the oss SDK
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)
def handler(event, context):
evt_lst = json.loads(event)
#print(evt_lst)
creds = context.credentials
auth=oss2.StsAuth(
creds.access_key_id,
creds.access_key_secret,
creds.security_token)
evt = evt_lst['events'][0]
bucket_name = evt['oss']['bucket']['name']
endpoint = 'oss-' + evt['region'] + '-internal.aliyuncs.com'
bucket = oss2.Bucket(auth, endpoint, bucket_name)
object_name = evt['oss']['object']['key']
r = bucket.get_object(object_name)
m = md5()
while 1:
data = r.read(4096)
if not data:
break
m.update(data)
md5_value = m.hexdigest()
objectmeta = bucket.head_object(object_name)
last_modified = objectmeta.headers['Last-Modified']
# convert timeformat from GMT to YYYYMMDDHHmmss
dt = datetime.datetime.strptime(last_modified, GMT_FORMAT)
time_t = time.strftime("%Y%m%d%H%M%S", dt.utctimetuple())
payload = {'md5': md5_value, 'time': time_t, 'filePath': object_name, 'bucketName':bucket_name}
# LOGGER.info(payload)
r = requests.get(MD5_HTTP_URL, params=payload)
# 异常处理
if r.status_code >= 400:
LOGGER.error("upload md5 fail, payload = {} , detail = {}".format(payload, r.text))
# upload file to ftp server
ftp = FTP()
#ftp.set_debuglevel(2)
ftp.connect(FTP_ADDR, FTP_PORT)
ftp.login(FTP_USER, FTP_PWD)
# 生成对应的目录, object key中有/, 即有目录,这边也处理成有目录
ftp.cwd(FTP_SAVE_ROOT_PATH)
parent_dir = os.path.dirname(object_name)
if parent_dir:
try:
ftp.mkd(parent_dir)
except:
pass
sever_will_savefile = FTP_SAVE_ROOT_PATH + object_name
try:
remote_stream=bucket.get_object(object_name)
ftp.storbinary('STOR ' + sever_will_savefile, remote_stream)
except Exception as e:
LOGGER.error("upload ftp server fail, savefile ={}, detail = {}".format(sever_will_savefile, str(e)))
finally:
ftp.close()
return "OK"
持续更新中 ...