Oss文件存储

包含文件的上传下载和生成临时的url

# -*- coding: utf-8 -*-

import os
import oss2
import configparser
from Config import * class AliOss:
def __init__(self):
# 读取配置文件
self._cf = configparser.ConfigParser()
self._cf.read(ConfigPath) # 读取环境变量或定义常量
self._access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', self._cf.get("oss","OSS_TEST_ACCESS_KEY_ID"))
self._access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', self._cf.get("oss","OSS_TEST_ACCESS_KEY_SECRET"))
self._bucket_name = os.getenv('OSS_TEST_BUCKET', self._cf.get("oss","OSS_TEST_BUCKET"))
self._endpoint = os.getenv('OSS_TEST_ENDPOINT', self._cf.get("oss","OSS_TEST_ENDPOINT"))
# self._filepath = self._cf.get("oss","FILEPATH") # OSS认证信息
auth = oss2.Auth(self._access_key_id, self._access_key_secret) # 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
self._bucket = oss2.Bucket(auth, self._endpoint, self._bucket_name) # 上传文件至oss
#
# [Params]
# from_file: 上传对象文件
# to_oss: 上传至oss的文件名
# remove_from_file: 是否删除原文件
#
async def upload(self, from_file, to_oss=None, remove_from_file=False):
# 上传oss的文件名未指定,则默认为当前文件名
oss_file_name = os.path.basename(from_file) if to_oss is None else to_oss
self._bucket.put_object_from_file(oss_file_name, from_file) # 删除原文件
if remove_from_file:
os.remove(from_file) return True # 下载文件
#
# [Params]
# from_oss: 上传对象文件
# out_path: 上传至oss的文件名
#
def download(self, from_oss, out_path):
# 文件不存在返回False
if not self.exists(from_oss):
return False # 未指定输出路径,默认为当前路径下保存
out = from_oss if out_path is None else out_path
self._bucket.get_object_to_file(from_oss, out) return True # Stream
#
# [Params]
# oss_key: 上传对象Key
# oss_value: 上传内容
#
def put(self, oss_key, oss_value):
self._bucket.put_object(oss_key, oss_value)
return True # Stream
#
# [Params]
# oss_key: 上传对象Key
#
def stream(self, oss_key):
# oss对象不存在则返回None
if not self.exists(oss_key):
return None # 返回流式对象
return self._bucket.get_object(oss_key) # 文件是否存在
#
# [Params]
# oss_file_name: oss的文件名
#
def exists(self, oss_file_name):
return self._bucket.object_exists(oss_file_name) # 删除文件
#
# [Params]
# oss_file_name: oss的文件名
#
async def delete(self, oss_file_name):
self._bucket.delete_object(oss_file_name) return True def signedUrl(self, oss_file_name, HTTP_METHOD='GET', expiredTime=60):
result = self._bucket.sign_url(HTTP_METHOD, oss_file_name, expiredTime)
result.replace("-internal","")
return result

对上面的函数进行解释和说明

上传文件

self._bucket.put_object_from_file(oss_file_name, from_file) 接受的是本地的文件名称
可以这样使用
bucket.put_object_from_file('./example.jpg','./example.jpg')

  

下载文件

out = from_oss if out_path is None else out_path
self._bucket.get_object_to_file(from_oss, out)

第一个参数是fromOss,第二个参数是本地的输出路径

bucket.get_object_to_file('example.jpg', 'example2.jpg')

  

上传文件(以流的形式)

self._bucket.put_object(oss_key, oss_value)
需要打开的操作
with open(oss2.to_unicode('本地座右铭.txt'), 'rb') as f:
bucket.put_object('云上座右铭.txt', f)

下载文件(以流的形式)

def stream(self, oss_key):
# oss对象不存在则返回None
if not self.exists(oss_key):
return None # 返回流式对象
return self._bucket.get_object(oss_key)
直接去指定oss文件上的文件名称
result=bucket.get_object('5a055c56705deb3b31d3bcab.json')
print(result.read())

判断oss上是否存在这个文件 

直接把需要判断的文件名当参数进行判断即可 

def exists(self, oss_file_name):
return self._bucket.object_exists(oss_file_name)

删除oss上的文件

async def delete(self, oss_file_name):
self._bucket.delete_object(oss_file_name)

生成一个临时的签名url供客户端下载

bucket.sign_url('请求的方法get或post', 'oss上的文件名', '过期的时间')
    def signedUrl(self, oss_file_name, HTTP_METHOD='GET', expiredTime=60):
result = self._bucket.sign_url(HTTP_METHOD, oss_file_name, expiredTime)
result.replace("-internal","")
return result

上一篇:ECLIPSE中反编译插件JAD的配置安装,轻松查看JAVA源代码


下一篇:sql语句限制小数点后后两位,sql语句字符拼接