Python 操作Kubernetes集群完全指南
目录
- 基础环境准备
- Python Kubernetes客户端介绍
- 连接Kubernetes集群
- Pod操作实战
- Deployment管理
- Service资源操作
- ConfigMap和Secret管理
- 自定义资源定义(CRD)操作
- 事件监听和Watch操作
- 高级应用场景
基础环境准备
1. 安装必要的包
首先,我们需要安装Python的Kubernetes客户端库:
pip install kubernetes
pip install openshift # 可选,用于OpenShift集群
2. 配置文件准备
import os
from kubernetes import client, config
# 加载kubeconfig配置
config.load_kube_config()
Python Kubernetes客户端介绍
1. 主要模块说明
from kubernetes import client, config, watch
from kubernetes.client import ApiClient
from kubernetes.client.rest import ApiException
主要模块功能:
-
client
: 提供各种API操作接口 -
config
: 处理配置文件加载 -
watch
: 用于监控资源变化 -
ApiClient
: 底层API客户端 -
ApiException
: 异常处理
连接Kubernetes集群
示例1:基础连接配置
from kubernetes import client, config
def connect_kubernetes():
try:
# 加载本地kubeconfig
config.load_kube_config()
# 创建API客户端
v1 = client.CoreV1Api()
# 测试连接
ret = v1.list_pod_for_all_namespaces(limit=1)
print("连接成功!发现 {} 个Pod".format(len(ret.items)))
return v1
except Exception as e:
print(f"连接失败:{str(e)}")
return None
# 测试连接
api = connect_kubernetes()
示例2:多集群配置
def connect_multiple_clusters():
clusters = {
'prod': '/path/to/prod-kubeconfig',
'dev': '/path/to/dev-kubeconfig'
}
apis = {}
for cluster_name, config_file in clusters.items():
try:
config.load_kube_config(config_file=config_file)
apis[cluster_name] = client.CoreV1Api()
print(f"成功连接到{cluster_name}集群")
except Exception as e:
print(f"连接{cluster_name}集群失败:{str(e)}")
return apis
Pod操作实战
示例3:创建Pod
from kubernetes import client, config
def create_pod(name, image, namespace="default"):
# 创建Pod对象
pod = client.V1Pod(
metadata=client.V1ObjectMeta(name=name),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name=name,
image=image,
ports=[client.V1ContainerPort(container_port=80)]
)
]
)
)
# 获取API实例
v1 = client.CoreV1Api()
try:
# 创建Pod
api_response = v1.create_namespaced_pod(
namespace=namespace,
body=pod
)
print(f"Pod {name} 创建成功")
return api_response
except ApiException as e:
print(f"Pod创建失败:{str(e)}")
return None
# 使用示例
create_pod("nginx-pod", "nginx:latest")
示例4:查询Pod状态
def get_pod_status(name, namespace="default"):
v1 = client.CoreV1Api()
try:
pod = v1.read_namespaced_pod(name=name, namespace=namespace)
return {
"name": pod.metadata.name,
"status": pod.status.phase,
"pod_ip": pod.status.pod_ip,
"host_ip": pod.status.host_ip,
"start_time": pod.status.start_time,
"conditions": [
{
"type": condition.type,
"status": condition.status
}
for condition in pod.status.conditions or []
]
}
except ApiException as e:
print(f"获取Pod状态失败:{str(e)}")
return None
# 使用示例
status = get_pod_status("nginx-pod")
print(status)
Deployment管理
示例5:创建Deployment
def create_deployment(name, image, replicas=3, namespace="default"):
# 创建Deployment对象
deployment = client.V1Deployment(
metadata=client.V1ObjectMeta(name=name),
spec=client.V1DeploymentSpec(
replicas=replicas,
selector=client.V1LabelSelector(
match_labels={"app": name}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": name}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name=name,
image=image,
ports=[client.V1ContainerPort(container_port=80)]
)
]
)
)
)
)
# 获取API实例
apps_v1 = client.AppsV1Api()
try:
# 创建Deployment
api_response = apps_v1.create_namespaced_deployment(
namespace=namespace,
body=deployment
)
print(f"Deployment {name} 创建成功")
return api_response
except ApiException as e:
print(f"Deployment创建失败:{str(e)}")
return None
# 使用示例
create_deployment("nginx-deployment", "nginx:latest")
示例6:更新Deployment
def update_deployment(name, new_image, namespace="default"):
apps_v1 = client.AppsV1Api()
try:
# 获取现有deployment
deployment = apps_v1.read_namespaced_deployment(name, namespace)
# 更新镜像
deployment.spec.template.spec.containers[0].image = new_image
# 应用更新
api_response = apps_v1.patch_namespaced_deployment(
name=name,
namespace=namespace,
body=deployment
)
print(f"Deployment {name} 更新成功")
return api_response
except ApiException as e:
print(f"Deployment更新失败:{str(e)}")
return None
# 使用示例
update_deployment("nginx-deployment", "nginx:1.19")
Service资源操作
示例7:创建Service
def create_service(name, selector, port, target_port, namespace="default"):
# 创建Service对象
service = client.V1Service(
metadata=client.V1ObjectMeta(name=name),
spec=client.V1ServiceSpec(
selector=selector,
ports=[client.V1ServicePort(
port=port,
target_port=target_port
)]
)
)
v1 = client.CoreV1Api()
try:
# 创建Service
api_response = v1.create_namespaced_service(
namespace=namespace,
body=service
)
print(f"Service {name} 创建成功")
return api_response
except ApiException as e:
print(f"Service创建失败:{str(e)}")
return None
# 使用示例
create_service(
"nginx-service",
{"app": "nginx-deployment"},
80,
80
)
ConfigMap和Secret管理
示例8:创建ConfigMap
def create_configmap(name, data, namespace="default"):
# 创建ConfigMap对象
configmap = client.V1ConfigMap(
metadata=client.V1ObjectMeta(name=name),
data=data
)
v1 = client.CoreV1Api()
try:
# 创建ConfigMap
api_response = v1.create_namespaced_config_map(
namespace=namespace,
body=configmap
)
print(f"ConfigMap {name} 创建成功")
return api_response
except ApiException as e:
print(f"ConfigMap创建失败:{str(e)}")
return None
# 使用示例
config_data = {
"app.properties": """
app.name=myapp
app.env=production
"""
}
create_configmap("app-config", config_data)
示例9:创建Secret
import base64
def create_secret(name, data, namespace="default"):
# 编码数据
encoded_data = {
k: base64.b64encode(v.encode()).decode()
for k, v in data.items()
}
# 创建Secret对象
secret = client.V1Secret(
metadata=client.V1ObjectMeta(name=name),
type="Opaque",
data=encoded_data
)
v1 = client.CoreV1Api()
try:
# 创建Secret
api_response = v1.create_namespaced_secret(
namespace=namespace,
body=secret
)
print(f"Secret {name} 创建成功")
return api_response
except ApiException as e:
print(f"Secret创建失败:{str(e)}")
return None
# 使用示例
secret_data = {
"username": "admin",
"password": "secret123"
}
create_secret("app-secrets", secret_data)
自定义资源定义(CRD)操作
示例10:操作CRD资源
def create_custom_resource(group, version, plural, namespace, body):
# 获取CustomObjectsApi
custom_api = client.CustomObjectsApi()
try:
# 创建自定义资源
api_response = custom_api.create_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
body=body
)
print(f"自定义资源创建成功")
return api_response
except ApiException as e:
print(f"自定义资源创建失败:{str(e)}")
return None
# 使用示例
custom_resource = {
"apiVersion": "stable.example.com/v1",
"kind": "CronTab",
"metadata": {
"name": "my-crontab"
},
"spec": {
"cronSpec": "* * * * */5",
"image": "my-cron-image"
}
}
create_custom_resource(
group="stable.example.com",
version="v1",
plural="crontabs",
namespace="default",
body=custom_resource
)
事件监听和Watch操作
示例11:监听Pod事件
from kubernetes import watch
def watch_pods(namespace="default"):
v1 = client.CoreV1Api()
w = watch.Watch()
try:
for event in w.stream(v1.list_namespaced_pod, namespace=namespace):
pod = event['object']
event_type = event['type']
print(f"事件类型: {event_type}")
print(f"Pod名称: {pod.metadata.name}")
print(f"Pod状态: {pod.status.phase}")
print("-------------------")
except ApiException as e:
print(f"监听失败:{str(e)}")
except KeyboardInterrupt:
w.stop()
print("监听已停止")
# 使用示例
# watch_pods() # 此函数会持续运行直到被中断
高级应用场景
示例12:批量操作和错误处理
def batch_create_resources(resources):
results = {
'success': [],
'failed': []
}
for resource in resources:
try:
if resource['kind'] == 'Deployment':
apps_v1 = client.AppsV1Api()
response = apps_v1.create_namespaced_deployment(
namespace=resource['namespace'],
body=resource['spec']
)
results['success'].append({
'kind': 'Deployment',
'name': resource['spec'].metadata.name
})
elif resource['kind'] == 'Service':
v1 = client.CoreV1Api()
response = v1.create_namespaced_service(
namespace=resource['namespace'],
body=resource['spec']
)
results['success'].append({
'kind': 'Service',
'name': resource['spec'].metadata.name
})
except ApiException as e:
results['failed'].append({
'kind': resource['kind'],
'name': resource['spec'].metadata.name,
'error': str(e)
})
return results
# 使用示例
resources = [
{
'kind': 'Deployment',
'namespace': 'default',
'spec': client.V1Deployment(
metadata=client.V1ObjectMeta(name="nginx-deployment"),
spec=client.V1DeploymentSpec(
replicas=3,
selector=client.V1LabelSelector(
match_labels={"app": "nginx"}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": "nginx"}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="nginx",
image="nginx:latest"
)
]
)
)
)
)
}
]
### 示例13:资源清理和垃圾回收
```python
def cleanup_resources(namespace="default", label_selector=None):
"""
清理指定命名空间下的资源
"""
v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
cleanup_results = {
'pods': [],
'deployments': [],
'services': [],
'errors': []
}
try:
# 删除Pod
pods = v1.list_namespaced_pod(
namespace=namespace,
label_selector=label_selector
)
for pod in pods.items:
try:
v1.delete_namespaced_pod(
name=pod.metadata.name,
namespace=namespace
)
cleanup_results['pods'].append(pod.metadata.name)
except ApiException as e:
cleanup_results['errors'].append(f"Pod {pod.metadata.name}: {str(e)}")
# 删除Deployment
deployments = apps_v1.list_namespaced_deployment(
namesp