Python 操作Kubernetes集群完全指南

Python 操作Kubernetes集群完全指南

目录

  1. 基础环境准备
  2. Python Kubernetes客户端介绍
  3. 连接Kubernetes集群
  4. Pod操作实战
  5. Deployment管理
  6. Service资源操作
  7. ConfigMap和Secret管理
  8. 自定义资源定义(CRD)操作
  9. 事件监听和Watch操作
  10. 高级应用场景

基础环境准备

1. 安装必要的包

首先,我们需要安装Python的Kubernetes客户端库:

pip install kubernetes
pip install openshift # 可选,用于OpenShift集群

2. 配置文件准备

import os
from kubernetes import client, config

# 加载kubeconfig配置
config.load_kube_config()

Python Kubernetes客户端介绍

1. 主要模块说明

from kubernetes import client, config, watch
from kubernetes.client import ApiClient
from kubernetes.client.rest import ApiException

主要模块功能:

  • client: 提供各种API操作接口
  • config: 处理配置文件加载
  • watch: 用于监控资源变化
  • ApiClient: 底层API客户端
  • ApiException: 异常处理

连接Kubernetes集群

示例1:基础连接配置

from kubernetes import client, config

def connect_kubernetes():
    try:
        # 加载本地kubeconfig
        config.load_kube_config()
        
        # 创建API客户端
        v1 = client.CoreV1Api()
        
        # 测试连接
        ret = v1.list_pod_for_all_namespaces(limit=1)
        print("连接成功!发现 {} 个Pod".format(len(ret.items)))
        return v1
    except Exception as e:
        print(f"连接失败:{str(e)}")
        return None

# 测试连接
api = connect_kubernetes()

示例2:多集群配置

def connect_multiple_clusters():
    clusters = {
        'prod': '/path/to/prod-kubeconfig',
        'dev': '/path/to/dev-kubeconfig'
    }
    
    apis = {}
    for cluster_name, config_file in clusters.items():
        try:
            config.load_kube_config(config_file=config_file)
            apis[cluster_name] = client.CoreV1Api()
            print(f"成功连接到{cluster_name}集群")
        except Exception as e:
            print(f"连接{cluster_name}集群失败:{str(e)}")
    
    return apis

Pod操作实战

示例3:创建Pod

from kubernetes import client, config

def create_pod(name, image, namespace="default"):
    # 创建Pod对象
    pod = client.V1Pod(
        metadata=client.V1ObjectMeta(name=name),
        spec=client.V1PodSpec(
            containers=[
                client.V1Container(
                    name=name,
                    image=image,
                    ports=[client.V1ContainerPort(container_port=80)]
                )
            ]
        )
    )
    
    # 获取API实例
    v1 = client.CoreV1Api()
    
    try:
        # 创建Pod
        api_response = v1.create_namespaced_pod(
            namespace=namespace,
            body=pod
        )
        print(f"Pod {name} 创建成功")
        return api_response
    except ApiException as e:
        print(f"Pod创建失败:{str(e)}")
        return None

# 使用示例
create_pod("nginx-pod", "nginx:latest")

示例4:查询Pod状态

def get_pod_status(name, namespace="default"):
    v1 = client.CoreV1Api()
    try:
        pod = v1.read_namespaced_pod(name=name, namespace=namespace)
        return {
            "name": pod.metadata.name,
            "status": pod.status.phase,
            "pod_ip": pod.status.pod_ip,
            "host_ip": pod.status.host_ip,
            "start_time": pod.status.start_time,
            "conditions": [
                {
                    "type": condition.type,
                    "status": condition.status
                }
                for condition in pod.status.conditions or []
            ]
        }
    except ApiException as e:
        print(f"获取Pod状态失败:{str(e)}")
        return None

# 使用示例
status = get_pod_status("nginx-pod")
print(status)

Deployment管理

示例5:创建Deployment

def create_deployment(name, image, replicas=3, namespace="default"):
    # 创建Deployment对象
    deployment = client.V1Deployment(
        metadata=client.V1ObjectMeta(name=name),
        spec=client.V1DeploymentSpec(
            replicas=replicas,
            selector=client.V1LabelSelector(
                match_labels={"app": name}
            ),
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(
                    labels={"app": name}
                ),
                spec=client.V1PodSpec(
                    containers=[
                        client.V1Container(
                            name=name,
                            image=image,
                            ports=[client.V1ContainerPort(container_port=80)]
                        )
                    ]
                )
            )
        )
    )
    
    # 获取API实例
    apps_v1 = client.AppsV1Api()
    
    try:
        # 创建Deployment
        api_response = apps_v1.create_namespaced_deployment(
            namespace=namespace,
            body=deployment
        )
        print(f"Deployment {name} 创建成功")
        return api_response
    except ApiException as e:
        print(f"Deployment创建失败:{str(e)}")
        return None

# 使用示例
create_deployment("nginx-deployment", "nginx:latest")

示例6:更新Deployment

def update_deployment(name, new_image, namespace="default"):
    apps_v1 = client.AppsV1Api()
    
    try:
        # 获取现有deployment
        deployment = apps_v1.read_namespaced_deployment(name, namespace)
        
        # 更新镜像
        deployment.spec.template.spec.containers[0].image = new_image
        
        # 应用更新
        api_response = apps_v1.patch_namespaced_deployment(
            name=name,
            namespace=namespace,
            body=deployment
        )
        print(f"Deployment {name} 更新成功")
        return api_response
    except ApiException as e:
        print(f"Deployment更新失败:{str(e)}")
        return None

# 使用示例
update_deployment("nginx-deployment", "nginx:1.19")

Service资源操作

示例7:创建Service

def create_service(name, selector, port, target_port, namespace="default"):
    # 创建Service对象
    service = client.V1Service(
        metadata=client.V1ObjectMeta(name=name),
        spec=client.V1ServiceSpec(
            selector=selector,
            ports=[client.V1ServicePort(
                port=port,
                target_port=target_port
            )]
        )
    )
    
    v1 = client.CoreV1Api()
    
    try:
        # 创建Service
        api_response = v1.create_namespaced_service(
            namespace=namespace,
            body=service
        )
        print(f"Service {name} 创建成功")
        return api_response
    except ApiException as e:
        print(f"Service创建失败:{str(e)}")
        return None

# 使用示例
create_service(
    "nginx-service",
    {"app": "nginx-deployment"},
    80,
    80
)

ConfigMap和Secret管理

示例8:创建ConfigMap

def create_configmap(name, data, namespace="default"):
    # 创建ConfigMap对象
    configmap = client.V1ConfigMap(
        metadata=client.V1ObjectMeta(name=name),
        data=data
    )
    
    v1 = client.CoreV1Api()
    
    try:
        # 创建ConfigMap
        api_response = v1.create_namespaced_config_map(
            namespace=namespace,
            body=configmap
        )
        print(f"ConfigMap {name} 创建成功")
        return api_response
    except ApiException as e:
        print(f"ConfigMap创建失败:{str(e)}")
        return None

# 使用示例
config_data = {
    "app.properties": """
    app.name=myapp
    app.env=production
    """
}
create_configmap("app-config", config_data)

示例9:创建Secret

import base64

def create_secret(name, data, namespace="default"):
    # 编码数据
    encoded_data = {
        k: base64.b64encode(v.encode()).decode()
        for k, v in data.items()
    }
    
    # 创建Secret对象
    secret = client.V1Secret(
        metadata=client.V1ObjectMeta(name=name),
        type="Opaque",
        data=encoded_data
    )
    
    v1 = client.CoreV1Api()
    
    try:
        # 创建Secret
        api_response = v1.create_namespaced_secret(
            namespace=namespace,
            body=secret
        )
        print(f"Secret {name} 创建成功")
        return api_response
    except ApiException as e:
        print(f"Secret创建失败:{str(e)}")
        return None

# 使用示例
secret_data = {
    "username": "admin",
    "password": "secret123"
}
create_secret("app-secrets", secret_data)

自定义资源定义(CRD)操作

示例10:操作CRD资源

def create_custom_resource(group, version, plural, namespace, body):
    # 获取CustomObjectsApi
    custom_api = client.CustomObjectsApi()
    
    try:
        # 创建自定义资源
        api_response = custom_api.create_namespaced_custom_object(
            group=group,
            version=version,
            namespace=namespace,
            plural=plural,
            body=body
        )
        print(f"自定义资源创建成功")
        return api_response
    except ApiException as e:
        print(f"自定义资源创建失败:{str(e)}")
        return None

# 使用示例
custom_resource = {
    "apiVersion": "stable.example.com/v1",
    "kind": "CronTab",
    "metadata": {
        "name": "my-crontab"
    },
    "spec": {
        "cronSpec": "* * * * */5",
        "image": "my-cron-image"
    }
}

create_custom_resource(
    group="stable.example.com",
    version="v1",
    plural="crontabs",
    namespace="default",
    body=custom_resource
)

事件监听和Watch操作

示例11:监听Pod事件

from kubernetes import watch

def watch_pods(namespace="default"):
    v1 = client.CoreV1Api()
    w = watch.Watch()
    
    try:
        for event in w.stream(v1.list_namespaced_pod, namespace=namespace):
            pod = event['object']
            event_type = event['type']
            
            print(f"事件类型: {event_type}")
            print(f"Pod名称: {pod.metadata.name}")
            print(f"Pod状态: {pod.status.phase}")
            print("-------------------")
            
    except ApiException as e:
        print(f"监听失败:{str(e)}")
    except KeyboardInterrupt:
        w.stop()
        print("监听已停止")

# 使用示例
# watch_pods()  # 此函数会持续运行直到被中断

高级应用场景

示例12:批量操作和错误处理

def batch_create_resources(resources):
    results = {
        'success': [],
        'failed': []
    }
    
    for resource in resources:
        try:
            if resource['kind'] == 'Deployment':
                apps_v1 = client.AppsV1Api()
                response = apps_v1.create_namespaced_deployment(
                    namespace=resource['namespace'],
                    body=resource['spec']
                )
                results['success'].append({
                    'kind': 'Deployment',
                    'name': resource['spec'].metadata.name
                })
            elif resource['kind'] == 'Service':
                v1 = client.CoreV1Api()
                response = v1.create_namespaced_service(
                    namespace=resource['namespace'],
                    body=resource['spec']
                )
                results['success'].append({
                    'kind': 'Service',
                    'name': resource['spec'].metadata.name
                })
        except ApiException as e:
            results['failed'].append({
                'kind': resource['kind'],
                'name': resource['spec'].metadata.name,
                'error': str(e)
            })
    
    return results

# 使用示例
resources = [
    {
        'kind': 'Deployment',
        'namespace': 'default',
        'spec': client.V1Deployment(
            metadata=client.V1ObjectMeta(name="nginx-deployment"),
            spec=client.V1DeploymentSpec(
                replicas=3,
                selector=client.V1LabelSelector(
                    match_labels={"app": "nginx"}
                ),
                template=client.V1PodTemplateSpec(
                    metadata=client.V1ObjectMeta(
                        labels={"app": "nginx"}
                    ),
                    spec=client.V1PodSpec(
                        containers=[
                            client.V1Container(
                                name="nginx",
                                image="nginx:latest"
                            )
                        ]
                    )
                )
            )
        )
	}
]
		
### 示例13:资源清理和垃圾回收

```python
def cleanup_resources(namespace="default", label_selector=None):
    """
    清理指定命名空间下的资源
    """
    v1 = client.CoreV1Api()
    apps_v1 = client.AppsV1Api()
    
    cleanup_results = {
        'pods': [],
        'deployments': [],
        'services': [],
        'errors': []
    }
    
    try:
        # 删除Pod
        pods = v1.list_namespaced_pod(
            namespace=namespace,
            label_selector=label_selector
        )
        for pod in pods.items:
            try:
                v1.delete_namespaced_pod(
                    name=pod.metadata.name,
                    namespace=namespace
                )
                cleanup_results['pods'].append(pod.metadata.name)
            except ApiException as e:
                cleanup_results['errors'].append(f"Pod {pod.metadata.name}: {str(e)}")
        
        # 删除Deployment
        deployments = apps_v1.list_namespaced_deployment(
            namesp
上一篇:论文解读:CARAT


下一篇:Qt生成coredump文件(支持arm和x86架构)