Boto3访问S3的基本用法

一、简述Boto3

  1. Boto3有两种API,低级和高级
  • 低级API:是和AWS的HTTP接口一一对应的,通过boto3.client(“xx”)暴露;
  • 高级API:是面向对象的,通过boto3.resource(“xxx”)暴露,不一定覆盖所有API。
  1. Boto3 是整个 AWS 的 SDK, 而不只是包括 S3. 还可以用来访问 SQS, EC2 等等。
  2. boto3.resource(“s3”)例子
import boto3

s3 = boto3.resource("s3")

# 创建一个 bucket
bucket = s3.create_bucket(Bucket="my-bucket")

# 获得所有的 bucket, boto 会自动处理 API 的翻页等信息。
for bucket in s3.buckets.all():
    print(bucket.name)

# 过滤 bucket, 同样返回一个 bucket_iterator
s3.buckets.fitler()

# 生成一个 Bucket 资源对象
bucket = s3.Bucket("my-bucket")
bucket.name  # bucket 的名字
bucket.delete()  # 删除 bucket

# 删除一些对象
bucket.delete_objects(
    Delete={
        'Objects': [
            {
                'Key': 'string',
                'VersionId': 'string'
            },
        ],
        'Quiet': True|False
    },
)
# 返回结果
{
    'Deleted': [
        {
            'Key': 'string',
            'VersionId': 'string',
            'DeleteMarker': True|False,
            'DeleteMarkerVersionId': 'string'
        },
    ],
    'RequestCharged': 'requester',
    'Errors': [
        {
            'Key': 'string',
            'VersionId': 'string',
            'Code': 'string',
            'Message': 'string'
        },
    ]
}

# 下载文件
bucket.download_file(Key, Filename, ExtraArgs=None, Callback=None, Config=None)

# 下载到文件对象,可能会自动开启多线程下载
with open('filename', 'wb') as data:
    bucket.download_fileobj('mykey', data)

# 上传文件
object = bucket.put_object(Body=b"data"|file, ContentMD5="", Key="xxx")

# 这个方法会自动开启多线程上传
with open('filename', 'rb') as f:
    bucket.upload_fileobj(f, 'mykey')

# 列出所有对象
bucket.objects.all()

# 过滤并返回对象
objects = bucket.objects.filter(
    Delimiter='string',
    EncodingType='url',
    Marker='string',
    MaxKeys=123,
    Prefix='string',
    RequestPayer='requester',
    ExpectedBucketOwner='string'
)

# 创建一个对象
obj = bucket.Object("xxx")
# 或者
obj = s3.Object("my-bucket", "key")

obj.bucket_name
obj.key

# 删除对象
obj.delete()
# 下载对象
obj.download_file(path)
# 自动多线程下载
with open('filename', 'wb') as data:
    obj.download_fileobj(data)
# 获取文件内容
rsp = obj.get()
body = rsp["Body"].read()  # 文件内容
obj.put(Body=b"xxx"|file, ContentMD5="")

# 上传文件
obj.upload_file(filename)
# 自动多线程上传
obj.upload_fileobj(fileobj)

二、Low-level clients

  1. 创建clients
  • 客户端创建的方式与资源类似
import boto3

# Create a low-level client with the service name
sqs = boto3.client('sqs')
  • 也可以从现有资源访问低级客户端
# Create the resource
sqs_resource = boto3.resource('sqs')

# Get the client from the resource
sqs = sqs_resource.meta.client
  1. 服务操作(Service operations)

服务操作映射到同名客户端的方法,并通过关键字参数提供对相同操作参数的访问;

# Make a call using the low-level client
response = sqs.send_message(QueueUrl='...', MessageBody='...')
  • 从上面可以看出,方法参数直接映射到关联的 SQS API;
  • 为了让 Python 代码看起来更好看,方法名称已经被蛇形大写;
  • 参数必须作为关键字参数发送。它们不能用作位置参数。
  1. 处理响应(Handing responses)

响应作为 python 字典返回,可以遍历或以其他方式处理所需数据的响应,响应可能并不总是包含所有预期数据;

  • 在下面的示例中,response.get(‘QueueUrls’, [])用于确保始终返回列表,即使响应没有键’QueueUrls’:
# List all your queues
response = sqs.list_queues()
for url in response.get('QueueUrls', []):
    print(url)
  • 上面示例中的响应如下所示:
{ 
    "QueueUrls" :  [ 
        "http://url1" , 
        "http://url2" , 
        "http://url3" 
    ] 
}
  1. Waiters

Waiters 使用client的服务操作来轮询 AWS 资源的状态并暂停执行,直到 AWS 资源达到 Waiter 正在轮询的状态或轮询时发生故障。通过使用client,可以了解client有权访问的每个Waiter的名字:

import boto3

s3 = boto3.client('s3')
sqs = boto3.client('sqs')

# List all of the possible waiters for both clients
print("s3 waiters:")
s3.waiter_names

print("sqs waiters:")
sqs.waiter_names
  • 如果client没有任何Waiter,则在访问其waiter_names属性时将返回一个空列表;
s3 waiters:
[u'bucket_exists', u'bucket_not_exists', u'object_exists', u'object_not_exists']
sqs waiters:
[]
  • 使用客户端的get_waiter()方法,可以从可能的等待者列表中获取特定的waiter;
# Retrieve waiter instance that will wait till a specified
# S3 bucket exists
s3_bucket_exists_waiter = s3.get_waiter('bucket_exists')
  • 然后要开始等待,必须使用传入方法的适当参数调用服务员的wait()方法;
# Begin waiting for the S3 bucket, mybucket, to exist
s3_bucket_exists_waiter.wait(Bucket='mybucket')
  1. 客户端的多线程或多处理(Multithreading or multiprocessing with clients)

多处理(Multi-Processing):虽然客户端是线程安全的,但由于它们的网络实现,它们不能跨进程共享。这样做可能会导致调用服务时响应顺序不正确;

  • 共享元数据(Shared Metadata):客户端通过一些属性(即meta、exceptions和waiter_names)向最终用户公开元数据。这些读取是安全的,但不应将任何突变视为线程安全的;
  • 自定义 Botocore 事件(Custom Botocore Events):Botocore(构建 Boto3 的库)允许高级用户提供他们自己的自定义事件挂钩,这些挂钩可以与 boto3 的客户端交互。大多数用户将不需要使用这些接口,但那些使用这些接口的用户在没有仔细审查的情况下不应再考虑他们的客户端线程安全。
  • 示例
import boto3.session
from concurrent.futures import ThreadPoolExecutor

def do_s3_task(client, task_definition):
    # Put your thread-safe code here

def my_workflow():
    # Create a session and use it to make our client
    session = boto3.session.Session()
    s3_client = session.client('s3')

    # Define some work to be done, this can be anything
    my_tasks = [ ... ]

    # Dispatch work tasks with our s3_client
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = [executor.submit(do_s3_task, s3_client, task) for task in my_tasks]

三、resource

  1. 简述
  • resource代表 Amazon Web Services (AWS) 的面向对象接口;
  • 提供了比服务客户端进行的原始数据低级调用更高级别的抽象;
  • 要使用资源,请调用Session的 resource()方法 并传入服务名称
# Get resources from the default session
sqs = boto3.resource('sqs')
s3 = boto3.resource('s3')
  • 每个资源实例都有许多属性和方法。在概念上可以分为标识符、属性、动作、引用、子资源和集合;
  • 资源本身也可以在概念上分为服务资源(如sqs、s3、ec2等)和单个资源(如 sqs.Queue或s3.Bucket);
  • 服务资源没有标识符或属性。否则,两者共享相同的组件。
  1. 标识符和属性
  • 标识符是用于对资源调用操作的唯一值。资源必须至少有一个标识符,除了*服务资源(例如sqs或s3);
  • 标识符是在实例创建时设置的,如果在实例化期间未能提供所有必要的标识符,将导致异常。
  • 标识符示例:
# SQS Queue (url is an identifier)
queue = sqs.Queue(url='http://...')
print(queue.url)

# S3 Object (bucket_name and key are identifiers)
obj = s3.Object(bucket_name='boto3', key='test.py')
print(obj.bucket_name)
print(obj.key)

# Raises exception, missing identifier: key!
obj = s3.Object(bucket_name='boto3')
  • 标识符也可以作为位置参数传递:
# SQS Queue
queue = sqs.Queue('http://...')

# S3 Object
obj = s3.Object('boto3', 'test.py')

# Raises exception, missing key!
obj = s3.Object('boto3')
  • 标识符也在资源实例平等中发挥作用。要使资源的两个实例被视为相等,它们的标识符必须相等:
>>> bucket1 = s3.Bucket('boto3')
>>> bucket2 = s3.Bucket('boto3')
>>> bucket3 = s3.Bucket('some-other-bucket')

>>> bucket1 == bucket2
True
>>> bucket1 == bucket3
False
  • 资源也可能具有属性,它们是实例上的延迟加载属性。它们可以在创建时根据对另一个资源的操作的响应进行设置,也可以在访问时或通过显式调用加载或重新加载操作进行设置。
  • 属性示例:
# SQS Message
message.body

# S3 Object
obj.last_modified
obj.e_tag
  • 警告:
    • 属性在第一次访问时可能会引发加载操作。如果延迟是一个问题,那么手动调用load将允许准确控制何时调用加载操作(以及延迟)。每个资源的文档都明确列出了其属性。

    • 此外,在对资源执行操作后可能会重新加载属性。例如,如果加载了 S3 对象的last_modified属性,然后调用了放置操作,那么下次访问last_modified 时,它将重新加载对象的元数据。

  1. 动作(Actions)

动作是调用服务的方法。操作可能会返回低级响应、新资源实例或新资源实例列表。动作自动将资源标识符设置为参数,但允许您通过关键字参数传递其他参数。

  • 动作示例:
# SQS Queue
messages = queue.receive_messages()

# SQS Message
for message in messages:
    message.delete()

# S3 Object
obj = s3.Object(bucket_name='boto3', key='test.py')
response = obj.get()
data = response['Body'].read()
  • 发送附加参数的示例:
# SQS Service
queue = sqs.get_queue_by_name(QueueName='test')

# SQS Queue
queue.send_message(MessageBody='hello')
  • Parameters must be passed as keyword arguments. They will not work as positional arguments.
  1. 子资源(Sub-resources)

子资源类似于引用,但它是一个相关的类而不是一个实例。子资源在实例化时与其父资源共享标识符。这是一种严格的亲子关系。在关系方面,这些可以被认为是一对多的。

  • 子资源示例:
# SQS
queue = sqs.Queue(url='...')
message = queue.Message(receipt_handle='...')
print(queue.url == message.queue_url)
print(message.receipt_handle)

# S3
obj = bucket.Object(key='new_file.txt')
print(obj.bucket_name)
print(obj.key)
  1. Waiters
  • Waiter类似于一个动作。Waiter将轮询资源的状态并暂停执行,直到资源达到正在轮询的状态或轮询时发生故障。Waiters 自动将资源标识符设置为参数,但允许您通过关键字参数传递其他参数。
  • Waiter的例子包括:
# S3: Wait for a bucket to exist.
bucket.wait_until_exists()

# EC2: Wait for an instance to reach the running state.
instance.wait_until_running()
  1. 多线程或多处理资源(Multithreading or multiprocessing with resources)
  • 资源实例不是安全的线程,不应跨线程或进程共享。这些特殊类包含无法共享的附加元数据。
  • 为每个线程或进程创建一个新的资源:
import boto3
import boto3.session
import threading

class MyTask(threading.Thread):
    def run(self):
        # Here we create a new session per thread
        session = boto3.session.Session()

        # Next, we create a resource client using our thread's session object
        s3 = session.resource('s3')

        # Put your thread-safe code here
  • 在上面的示例中,每个线程都有自己的 Boto3 会话和自己的 S3 资源实例;
  • 因为资源在加载和调用操作、访问属性或手动加载或重新加载资源时包含共享数据可以修改此数据。

四、session(会话)

  1. 默认会话(Default session)
  • Boto3 充当默认会话的代理。这是在创建低级客户端或资源客户端时自动创建的:
import boto3

# Using the default session
sqs = boto3.client('sqs')
s3 = boto3.resource('s3')
  1. 自定义会话(Custom session)
  • 可以管理自己的会话并从中创建低级客户端或资源客户端:
import boto3
import boto3.session

# Create your own session
my_session = boto3.session.Session()

# Now we can create low-level clients or resource clients from our custom session
sqs = my_session.client('sqs')
s3 = my_session.resource('s3')
  1. 会话配置(Session configurations)

使用特定凭证、AWS 区域信息或配置文件配置每个会话;

  • 最常见配置是:
    • aws_access_key_id - 特定的 AWS 访问密钥 ID。
    • aws_secret_access_key - 特定的 AWS 秘密访问密钥。
    • region_name - 您要在其中创建新连接的 AWS 区域。
    • profile_name - 创建会话时使用的配置文件。
  • 仅当会话需要特定配置文件时才设置profile_name参数。要使用默认配置文件,根本不要设置profile_name参数。如果未设置profile_name参数并且没有默认配置文件,则将使用空的配置字典。
  1. 使用会话进行多线程或多处理
  • 与Resource对象类似,Session对象不是线程安全的,不应跨线程和进程共享;
  • 为每个线程或进程创建一个新的Session对象:
import boto3
import boto3.session
import threading

class MyTask(threading.Thread):
    def run(self):
        # Here we create a new session per thread
        session = boto3.session.Session()

        # Next, we create a resource client using our thread's session object
        s3 = session.resource('s3')

        # Put your thread-safe code here
上一篇:【E-26】ERROR: Could not install packages due to an EnvironmentError: [Errno 2] No such file or direct


下一篇:python-使用boto3检查EC2实例的停止时间