名词解释
RAM (Resource Access Management)和STS(Security Token Service)是阿里云提供的权限管理系统。
RAM主要的作用是控制账号系统的权限。您可以使用RAM在主账号的权限范围内创建子用户,给不同的子用户分配不同的权限从而达到授权管理的目的。STS是一个安全凭证(Token)的管理系统。您可以使用STS来完成对于临时用户的访问授权。
- 子账号(RAM account):从阿里云的主账号中创建出来的子账号,在创建的时候可以分配独立的密码和权限,每个子账号拥有自己AccessKey,可以和阿里云主账号一样正常的完成有权限的操作。一般来说,这里的子账号可以理解为具有某种权限的用户,可以被认为是一个具有某些权限的操作发起者。
-
角色(Role):表示某种操作权限的虚拟概念,但是没有独立的登录密码和AccessKey。
说明 子账号可以扮演角色,扮演角色时候的权限是该角色自身的权限。 - 授权策略(Policy):用来定义权限的规则,比如允许用户读取或写入某些资源。
- 资源(Resource):代表用户可访问的云资源,比如OSS所有的Bucket、OSS的某个Bucket,或OSS的某个Bucket下面的某个Object等。
- 扮演角色(Assume role)扮演角色是实体用户获取角色身份的安全令牌的 方法。一个实体用户调用STS API AssumeRole可以获得角色的安全令牌,使用安全令牌可以访问云服务API。
这里将手动定义 授权策略(Policy),将 授权策略 授权给 角色 ,然后子账号(RAM account)通过 扮演角色_方法 _获取 角色 的 安全令牌 对 资源 进行操作.
RAM 用户 可以使用 API 扮演 RAM 角色。当 RAM 用户被授予 AliyunSTSAssumeRoleAccess
权限策略 之后,可以使用其访问密钥调用 STS API AssumeRole 接口,以获取某个角色的 安全令牌,从而使用安全令牌访问资源。
新建用户,角色,授权策略
新建RAM用户
- 登录 RAM 控制台。
- 在左侧导航栏的人员管理菜单下,单击用户。
- 单击新建用户,输入登录名称和显示名称。
说明 单击添加用户,可一次性创建多个 RAM 用户。 -
在访问方式区域下,选择控制台密码登录或编程访问。
- 控制台密码登录:可以完成对登录安全的基本设置,包括自动生成或自定义登录密码、是否要求下次登录时重置密码以及是否要求开启多因素认证。
- 编程访问:将会自动为 RAM 用户创建访问密钥(AccessKey)。RAM 用户可以通过 API 或其他开发工具访问阿里云。
- 说明 为了保障账号安全,建议仅为 RAM 用户选择一种登录方式。避免 RAM 用户离开组织后仍可以通过访问密钥访问阿里云资源。
- 单击确认。
授权RAM用户AssumeRole接口权限
- 登录 RAM 控制台。
- 在左侧导航栏的人员管理菜单下,单击用户。
- 在用户列表中找到刚才创建的用户,单击列表【操作】栏下的【添加权限】
- 在弹出对话框中的【系统权限策略】搜索并添加 _AliyunSTSAssumeRoleAccess_
- 单击确认。
新建自定义权限策略test_policy
- 登录 RAM 控制台。
- 在左侧导航栏的权限管理菜单下,单击权限策略管理。
- 单击新建权限策略。
- 填写策略名称和备注。
-
配置模式选择可视化配置或脚本配置。
- 若选择可视化配置:单击添加授权语句,根据界面提示,对权限效力、操作名称和资源等进行配置。
- 若选择脚本配置,请参考语法结构编辑策略内容。
- 单击确认。
下面为OSS所有权限语法示例
{
"Statement": [
{
"Action": "oss:*",
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "1"
}
新建角色testRole
注:这里角色有三种类型,创建可信实体为阿里云账号的RAM角色
- 云账号登录RAM控制台。
- 在左侧导航栏,单击RAM角色管理。
- 单击新建RAM角色。
- 选择可信实体类型为阿里云账号,单击下一步。
- 输入角色名称和备注。
- 选择云账号后,单击完成。
将自定义权限策略授权给角色
- 登录 RAM 控制台。
- 在左侧导航栏的权限管理菜单下,单击授权。
- 单击新增授权。
- 在被授权主体区域下,输入 RAM 角色名称后,单击需要授权的 RAM 角色。
- 在左侧权限策略名称列表下,单击需要授予 RAM 角色的权限策略。
说明 在右侧区域框,选择某条策略并单击 ×,可撤销该策略。 - 单击确定。
完成授权后可在角色管理中查看授权详情
注:这里策略主体类型有两种,权限策略也可直接授权给用户
通过STS完成临时授权
STS基本概念
阿里云临时安全令牌(Security Token Service,STS)是阿里云提供的一种临时访问权限管理服务。
-
RAM角色(RAM role)一种虚拟的RAM用户。RAM角色的全局资源描述符(Role ARN)Role ARN是角色的全局资源描述符(Aliyun Resource Name,简称ARN),用来指定具体角色。每个角色都有一个唯一的全局资源描述符。格式:
acs:ram::$accountID:role/$roleName
。 - 可信实体(Trusted entity)角色的可信实体是指可以扮演角色的实体用户身份。创建角色时必须指定可信实体,角色只能被受信的主体扮演。可信实体可以是受信的阿里云账号、受信的阿里云服务或身份提供商。
- 扮演角色(Assume role)扮演角色是实体用户获取角色身份的安全令牌的方法。一个实体用户调用STS API AssumeRole可以获得角色的安全令牌,使用安全令牌可以访问云服务API。
注:这里需要区分下RAM角色与扮演角色的区别,可信实体 通过 扮演角色 _方法__ _获取 RAM角色 的安全令牌
扮演角色的API接口概览
STS提供API调用接口每个请求都需要指定如下信息:
- 要执行的操作:Action参数。
- 每个操作接口都需要包含的公共请求参数。
- 操作接口所特有的请求参数。
调用AssumeRole接口获取一个临时身份。参考API
- RoleArn表示的是需要扮演的角色ID,角色的ID可以在角色管理 > 角色详情中找到。
- RoleSessionName是一个用来标示临时凭证的名称,一般来说建议使用不同的应用程序用户来区分。
- Policy表示的是在扮演角色的时候额外加上的一个权限限制。此参数可以限制生成的STS token的权限,若不指定则返回的token拥有指定角色的所有权限。
- DurationSeconds指的是临时凭证的有效期,单位是s,最小为900,最大为3600。
- id和secret表示的是需要扮演角色的子账号的AccessKey。
AssumeRole接口请求示例
https://sts.aliyuncs.com?Action=AssumeRole
&RoleArn=acs:ram::123456789012****:role/adminrole
&RoleSessionName=alice
&DurationSeconds=3600
&Policy=<url_encoded_policy>
&<公共请求参数>
AssumeRole接口返回格式(json)
{
"Credentials": {
"AccessKeyId": "STS.L4aBSCSJVMuKg5U1****",
"AccessKeySecret": "wyLTSmsyPGP1ohvvw8xYgB29dlGI8KMiH2pK****",
"Expiration": "2015-04-09T11:52:19Z",
"SecurityToken": "********"
},
"AssumedRoleUser": {
"arn": "acs:sts::123456789012****:assumed-role/AdminRole/alice",
"AssumedRoleUserId":"34458433936495****:alice"
},
"RequestId": "6894B13B-6D71-4EF5-88FA-F32781734A7F"
}
通过STS_SDK完成角色扮演
这里将通过官方提供的SDK模块进行具体的角色扮演
安装相关SDK包
pip install oss2
pip install aliyun-python-sdk-core
pip install aliyun-python-sdk-sts
sts sdk github 地址
获取临时身份信息
# 在控制台将 AliyunSTSAssumeRoleAccess 权限授权给子用户testRole,testRole操作AssumeRole接口,获取临时身份
def fetch_sts_info(access_key_id, access_key_secret, sts_role_arn):
"""子用户角色扮演获取临时身份的信息
:param access_key_id: 子用户的 access key id
:param access_key_secret: 子用户的 access key secret
:param sts_role_arn: STS角色的Arn
:return StsInfo 返回临时身份信息对象
"""
# 配置要访问的STS endpoint
REGIONID = 'cn-hongkong'
ENDPOINT = 'sts.cn-hongkong.aliyuncs.com'
region_provider.add_endpoint('Sts', REGIONID, ENDPOINT)
clt = client.AcsClient(access_key_id, access_key_secret, 'cn-hongkong')
request = AssumeRoleRequest.AssumeRoleRequest()
#request.set_accept_format('json')
#指定角色ARN
request.set_RoleArn(sts_role_arn)
#设置会话名称,审计服务使用此名称区分调用者
request.set_RoleSessionName('oss-python-sdk-example')
#发起请求,并得到response
response = clt.do_action_with_exception(request)
#格式化输出返回结果,将字符串结果转化为字典类型
i = json.loads(oss2.to_unicode(response))
#实例化StsInfo类,并将通过sts获取的临时身份信息存入
global StsInfo
StsInfo = StsInfo()
StsInfo.access_key_id = i['Credentials']['AccessKeyId']
StsInfo.access_key_secret = i['Credentials']['AccessKeySecret']
StsInfo.security_token = i['Credentials']['SecurityToken']
StsInfo.request_id = i['RequestId']
StsInfo.expiration = oss2.utils.to_unixtime(i['Credentials']['Expiration'], '%Y-%m-%dT%H:%M:%SZ')
#返回StsInfo对象
return StsInfo
将临时身份信息存入json文件
根据需求,可将临时身份信息存储到json文件中,等到临时身份过期后再重新请求,避免重复请求,造成临时身份泛滥,引发资源匮乏与安全隐患
def save_info():
#存储临时身份信息
with open('StsInfo.json','w',encoding='utf-8') as f:
data = {'sts_key_id':StsInfo.access_key_id,'sts_key_secret':StsInfo.access_key_secret,'sts_secrity_token':StsInfo.security_token,'sts_expire_date':StsInfo.expiration,'sts_reques_id':StsInfo.request_id}
json.dump(data,f,ensure_ascii=False)
f.close
def open_info():
#读取临时身份信息
with open('./StsInfo.json','r',encoding='utf-8') as f:
global STSINFO
STSINFO = json.load(f)
return STSINFO
通过临时身份操作Bucket资源
def buck_put_object():
#打印验证临时身份信息
print(StsInfo)
print('key id:',StsInfo.access_key_id)
print("key_secret:",StsInfo.access_key_secret)
print("secrity_token:",StsInfo.security_token)
print("request_id:",StsInfo.request_id)
print("expiration:",StsInfo.expiration)
#实例化Bucket对象,并上传字符串
auth = oss2.StsAuth(StsInfo.access_key_id, StsInfo.access_key_secret, StsInfo.security_token)
bucket = oss2.Bucket(auth,endpoint,'fralychen')
result = bucket.put_object('fralychen','good good study day day up')
全部代码
# -*- conding:utf-8 -*-
import json
import os
import oss2
from aliyunsdkcore import client
from aliyunsdkcore.profile import region_provider
from aliyunsdksts.request.v20150401 import AssumeRoleRequest
##定义一些变量
access_key_id = 'LTAI4FoMe6umpCSFQdEC9neg'
access_key_secret = 'jgEvtFOqIAGqKZve7zMvg8dJhSZv9J'
bucket_name = 'fralychen'
endpoint = 'oss-cn-hongkong.aliyuncs.com'
sts_role_arn = 'acs:ram::1149877324567510:role/testrole'
# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint, sts_role_arn):
assert '<' not in param, '请设置参数:' + param
#创建StsToken类方便用来存储临时身份信息
class StsInfo(object):
"""AssumeRole返回的临时身份信息
:param str access_key_id: 临时身份的access key id
:param str access_key_secret: 临时身份的access key secret
:param int expiration: 过期时间,UNIX时间,自1970年1月1日UTC零点的秒数
:param str security_token: 临时身份Token
:param str request_id: 请求ID
"""
def __init__(self):
self.access_key_id = ''
self.access_key_secret = ''
self.expiration = 0
self.security_token = ''
self.request_id = ''
# 在控制台将 AliyunSTSAssumeRoleAccess 权限授权给RAM子用户之后才能通过RAM用户获取临时身份信息
def fetch_sts_info(access_key_id, access_key_secret, sts_role_arn):
"""子用户角色扮演获取临时身份的信息
:param access_key_id: 子用户的 access key id
:param access_key_secret: 子用户的 access key secret
:param sts_role_arn: STS角色的Arn
:return StsInfo 返回临时身份信息对象
"""
# 配置要访问的STS endpoint
_REGIONID = 'cn-hongkong'
_ENDPOINT = 'sts.cn-hongkong.aliyuncs.com'
region_provider.add_endpoint('Sts', _REGIONID, _ENDPOINT)
clt = client.AcsClient(access_key_id, access_key_secret, 'cn-hongkong')
request = AssumeRoleRequest.AssumeRoleRequest()
#request.set_accept_format('json')
#指定角色ARN
request.set_RoleArn(sts_role_arn)
#设置会话名称,审计服务使用此名称区分调用者
request.set_RoleSessionName('oss-python-sdk-example')
#设置临时身份过期时间
request.set_DurationSeconds(DurationSeconds)
#发起请求,并得到response
response = clt.do_action_with_exception(request)
#格式化输出返回结果,将字符串结果转化为字典类型
i = json.loads(oss2.to_unicode(response))
#实例化StsInfo类并将临时身份信息存入对象
global StsInfo
StsInfo = StsInfo()
StsInfo.access_key_id = i['Credentials']['AccessKeyId']
StsInfo.access_key_secret = i['Credentials']['AccessKeySecret']
StsInfo.security_token = i['Credentials']['SecurityToken']
StsInfo.request_id = i['RequestId']
StsInfo.expiration = oss2.utils.to_unixtime(i['Credentials']['Expiration'], '%Y-%m-%dT%H:%M:%SZ')
#存储临时身份信息
save_info()
#使用sts授权的临时身份上传文件到bucket
def buck_put_object(sts_key_id, sts_key_secret, sts_secrity_token):
"""上传字符串到资源
:param sts_key_id: 临时身份的 access key id
:param sts_key_secret: 临时身份的 access key secret
:param sts_secrity_token: 临时身份的 secrity token
:retu
"""
#实例化Bucket对象,并上传字符串
auth = oss2.StsAuth(sts_key_id, sts_key_secret, sts_secrity_token)
bucket = oss2.Bucket(auth,endpoint,'fralychen')
result = bucket.put_object('fralychen','good good study day day up')
#根据需求,可将临时身份信息存储到json文件中,等到临时身份过期后再重新请求,避免重复请求,用户泛滥
def save_info():
#存储临时身份信息
with open('StsInfo.json','w',encoding='utf-8') as f:
data = {'sts_key_id':StsInfo.access_key_id,'sts_key_secret':StsInfo.access_key_secret,'sts_secrity_token':StsInfo.security_token,'sts_expire_date':StsInfo.expiration,'sts_reques_id':StsInfo.request_id}
json.dump(data,f,ensure_ascii=False)
f.close
def open_info():
#读取临时身份信息
with open('./StsInfo.json','r',encoding='utf-8') as f:
global STSINFO
STSINFO = json.load(f)
return STSINFO
#定义临时身份过期时间
DurationSeconds = 900
try:
open_info()
except IOError:
print("Error: 没有用户信息文件或文件读取失败")
print("初始化身份信息,临时身份信息存储中....")
fetch_sts_info(access_key_id, access_key_secret, sts_role_arn)
print("临时身份信息存储完毕,当前目录下StsInfo.json")
else:
if oss2.utils.http_to_unixtime(oss2.utils.http_date()) + DurationSeconds > STSINFO["sts_expire_date"]:
buck_put_object(sts_key_id = STSINFO["sts_key_id"],sts_key_secret = STSINFO["sts_key_secret"], sts_secrity_token = STSINFO["sts_secrity_token"])
print("上传成功,good_lucky")
else:
print("更新临时用户信息,请稍后")
fetch_sts_info(access_key_id, access_key_secret, sts_role_arn)
buck_put_object(sts_key_id = STSINFO["sts_key_id"],sts_key_secret = STSINFO["sts_key_secret"], sts_secrity_token = STSINFO["sts_secrity_token"])
print("上传成功,good_lucky")