容器实例管理python sdk封装
测试结果
说明
- 这是根据我的需求写的,所以有些参数是写死的,比如cpu核数和内存,你可以根据你的需要自行修改。
- 前置条件:
当前环境已安装python3.8以上版本和azure cli并且已经登陆到你的账户
依赖安装
requirments.txt
azure-mgmt-resource
azure-identity
azure-mgmt-containerinstance
pip install -r requirments.txt
PS: 安装不上就多安几次或使用魔法
containerhelpor.py
from azure.identity import DefaultAzureCredential
from azure.mgmt.containerinstance import ContainerInstanceManagementClient
from azure.mgmt.containerinstance.models import ContainerGroup, Container, ContainerPort, IpAddress, ResourceRequests, ResourceRequirements
class ContainerHelpor:
def create(subscription_id:str, resource_group:str, location:str, container_name:str, container_image:str, no_port:int, protocol: str='TCP'):
# 使用默认 Azure 凭据进行验证
credentials = DefaultAzureCredential()
# 创建容器实例管理客户端实例
container_client = ContainerInstanceManagementClient(credentials, subscription_id)
# 容器端口和 IP 地址配置
port1 = ContainerPort(port=no_port, protocol=protocol)
ports = [port1]
# port2 = ContainerPort(port=no_port + 1, protocol=protocol)
# ports = [port1, port2]
ip_address = IpAddress(ports=ports, type='Public')
# 容器资源请求
requests = ResourceRequests(memory_in_gb=1.0, cpu=1.0)
requirements = ResourceRequirements(requests=requests)
# 创建容器
container = Container(
name=container_name,
image=container_image,
resources=requirements,
ports=ports
)
# 容器组(容器实例)
container_group = ContainerGroup(
location=location,
containers=[container],
os_type='Linux',
ip_address=ip_address
)
# 创建容器实例
container_group_result = container_client.container_groups.begin_create_or_update(resource_group, container_name, container_group)
print(f"Container instance {container_name} created successfully.")
return container_group_result
def stop(subscription_id:str, resource_group:str, container_name:str):
# 使用默认凭证进行身份验证
credential = DefaultAzureCredential()
# 创建容器实例管理客户端
client = ContainerInstanceManagementClient(credential, subscription_id)
# 停止容器实例
client.container_groups.stop(resource_group, container_name)
print(f"Container instance {container_name} stoped successfully.")
def start(subscription_id:str, resource_group:str, container_name:str):
# 使用默认凭证进行身份验证
credential = DefaultAzureCredential()
# 创建容器实例管理客户端
client = ContainerInstanceManagementClient(credential, subscription_id)
# 停止容器实例
client.container_groups.begin_start(resource_group, container_name)
print(f"Container instance {container_name} started successfully.")
def remove(subscription_id:str, resource_group:str, container_name:str):
# 创建 Azure 认证凭证
credentials = DefaultAzureCredential()
# 创建 ContainerInstanceManagementClient
container_client = ContainerInstanceManagementClient(credentials, subscription_id)
# 删除容器实例
container_client.container_groups.begin_delete(resource_group, container_name)
print(f"Container instance {container_name} deleted successfully.")
def query_ip(subscription_id:str, resource_group:str, container_name:str):
credential = DefaultAzureCredential()
client = ContainerInstanceManagementClient(credential, subscription_id)
container_group = client.container_groups.get(resource_group, container_name)
return container_group.ip_address
测试代码
import sys
import os
import time
# 此处根据你的项目选择包含目录以及导入导出模块
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
from core.containerhelpor import ContainerHelpor
def main():
print("containerhelpor test begin")
# 这里填你的订阅id
sub_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
container_name = 'test' # 你的容器名称
resource_group = 'jp' # 你的资源组组名
print("ContainerHelpor.create begin")
ContainerHelpor.create(subscription_id=sub_id,
no_port=8080,
resource_group=resource_group,
location = 'Japan West',
container_name = container_name,
# 镜像名称以及tag默认指向dockerhub的latset
container_image = ''
)
print("ContainerHelpor.stop begin")
ContainerHelpor.stop(sub_id,resource_group,container_name)
print("ContainerHelpor.start begin")
is_not_start = True
retry_cnt = 0
while is_not_start:
try:
ContainerHelpor.start(sub_id,resource_group,container_name)
is_not_start = False
except Exception as e:
time.sleep(2)
retry_cnt += 1
if retry_cnt > 3:
print(e)
ContainerHelpor.remove(sub_id,resource_group,container_name)
print("containerhelpor test failed")
return
print("ContainerHelpor.query_ip begin")
retry_cnt = 0
ip = None
while ip is None:
ipaddr = ContainerHelpor.query_ip(sub_id,resource_group,container_name)
ip = ipaddr.ip
time.sleep(2)
retry_cnt += 1
if retry_cnt > 10:
print("query_ip failed, try it later")
break
print(ip)
print("ContainerHelpor.remove begin")
ContainerHelpor.remove(sub_id,resource_group,container_name)
print("containerhelpor test pass")
if __name__ == "__main__":
main()