前面我们了解了 etcd 的集群搭建模式,也了解了如何在 Kubernetes 集群中来部署 etcd 集群,要开发一个对应的 Operator 其实也就是让我们用代码去实现 etcd 的这一系列的运维工作而已,说白了就是把 StatefulSet 中的启动脚本翻译成我们的 golang 代码。这里我们分成不同的版本来渐进式开发,首先第一个版本我们开发一个最简单的 Operator,直接用我们的 Operator 去生成前面的 StatefulSet 模板即可。
项目初始化
同样在开发 Operator 之前我们需要先提前想好我们的 CRD 资源对象,比如我们想要通过下面的 CR 资源来创建对应的 etcd 集群:
apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdCluster
metadata:
name: demo
spec:
size: 3 # 副本数量
image: cnych/etcd:v3.4.13 # 镜像
因为其他信息都是通过脚本获取的,所以基本上我们通过 size 和 image 两个字段就可以确定一个 Etcd 集群部署的样子了,所以我们的第一个版本非常简单,只要能够写出正确的部署脚本即可,然后我们在 Operator 当中根据上面我们定义的 EtcdCluster 这个 CR 资源来组装一个 游戏StatefulSet 和 Headless SVC 对象就可以了。
首先初始化项目,这里我们使用 kubebuilder 来构建我们的脚手架:
➜ kubebuilder init --domain ydzs.io --owner cnych --repo github.com/cnych/etcd-operator
Writing scaffold for you to edit...
Get controller runtime:
$ go get sigs.k8s.io/controller-runtime@v0.5.0
Update go.mod:
$ go mod tidy
Running make:
$ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
Next: define a resource with:
$ kubebuilder create api
项目脚手架创建完成后,然后定义资源 API:
➜ kubebuilder create api --group etcd --version v1alpha1 --kind EtcdCluster
Create Resource [y/n]
y
Create Controller [y/n]
y
Writing scaffold for you to edit...
api/v1alpha1/etcdcluster_types.go
controllers/etcdcluster_controller.go
Running make:
$ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
这样我们的项目就初始化完成了,整体的代码结构如下所示:
➜ etcd-operator tree -L 2
.
├── Dockerfile
├── Makefile
├── PROJECT
├── api
│ └── v1alpha1
├── bin
│ └── manager
├── config
│ ├── certmanager
│ ├── crd
│ ├── default
│ ├── manager
│ ├── prometheus
│ ├── rbac
│ ├── samples
│ └── webhook
├── controllers
│ ├── etcdcluster_controller.go
│ └── suite_test.go
├── go.mod
├── go.sum
├── hack
│ └── boilerplate.go.txt
└── main.go
14 directories, 10 files
然后根据我们上面设计的 EtcdCluster 这个对象来编辑 Operator 的结构体即可,修改文件www.sangpi.com 中的 EtcdClusterSpec 结构体:
// api/v1alpha1/etcdcluster_types.go
// EtcdClusterSpec defines the desired state of EtcdCluster
type EtcdClusterSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Size uint `json:"size"`
Image string `json:"image"`
}
要注意每次修改完成后需要执行 make 命令重新生成代码:
➜ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
接下来我们就可以去控制器的 Reconcile 函数中来实现我们自己的业务逻辑了。
业务逻辑
首先在目录 controllers 下面创建一个 resource.go
文件,用来根据我们定义的 EtcdCluster 对象生成对应的 StatefulSet 和 Headless SVC 对象。
// controllers/resource.go
package controllers
import (
"strconv"
"github.com/cnych/etcd-operator/api/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var (
EtcdClusterLabelKey = "etcd.ydzs.io/cluster"
EtcdClusterCommonLabelKey = "app"
EtcdDataDirName = "datadir"
)
func MutateStatefulSet(cluster *v1alpha1.EtcdCluster, sts *appsv1.StatefulSet) {
sts.Labels = map[string]string{
EtcdClusterCommonLabelKey: "etcd",
}
sts.Spec = appsv1.StatefulSetSpec{
Replicas: cluster.Spec.Size,
ServiceName: cluster.Name,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{
EtcdClusterLabelKey: cluster.Name,
}},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
EtcdClusterLabelKey: cluster.Name,
EtcdClusterCommonLabelKey: "etcd",
},
},
Spec: corev1.PodSpec{
Containers: newContainers(cluster),
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: EtcdDataDirName,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
},
},
}
}
func newContainers(cluster *v1alpha1.EtcdCluster) []corev1.Container {
return []corev1.Container{
corev1.Container{
Name: "etcd",
Image: cluster.Spec.Image,
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
Name: "peer",
ContainerPort: 2380,
},
corev1.ContainerPort{
Name: "client",
ContainerPort: 2379,
},
},
Env: []corev1.EnvVar{
corev1.EnvVar{
Name: "INITIAL_CLUSTER_SIZE",
Value: strconv.Itoa(int(*cluster.Spec.Size)),
},
corev1.EnvVar{
Name: "SET_NAME",
Value: cluster.Name,
},
corev1.EnvVar{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
corev1.EnvVar{
Name: "MY_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
VolumeMounts: []corev1.VolumeMount{
corev1.VolumeMount{
Name: EtcdDataDirName,
MountPath: "/var/run/etcd",
},
},
Command: []string{
"/bin/sh", "-ec",
"HOSTNAME=$(hostname)\n\n ETCDCTL_API=3\n\n eps() {\n EPS=\"\"\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n EPS=\"${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379\"\n done\n echo ${EPS}\n }\n\n member_hash() {\n etcdctl member list | grep -w \"$HOSTNAME\" | awk '{ print $1}' | awk -F \",\" '{ print $1}'\n }\n\n initial_peers() {\n PEERS=\"\"\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n PEERS=\"${PEERS}${PEERS:+,}${SET_NAME}-${i}=http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380\"\n done\n echo ${PEERS}\n }\n\n # etcd-SET_ID\n SET_ID=${HOSTNAME##*-}\n\n # adding a new member to existing cluster (assuming all initial pods are available)\n if [ \"${SET_ID}\" -ge ${INITIAL_CLUSTER_SIZE} ]; then\n # export ETCDCTL_ENDPOINTS=$(eps)\n # member already added?\n\n MEMBER_HASH=$(member_hash)\n if [ -n \"${MEMBER_HASH}\" ]; then\n # the member hash exists but for some reason etcd failed\n # as the datadir has not be created, we can remove the member\n # and retrieve new hash\n echo \"Remove member ${MEMBER_HASH}\"\n etcdctl --endpoints=$(eps) member remove ${MEMBER_HASH}\n fi\n\n echo \"Adding new member\"\n\n etcdctl member --endpoints=$(eps) add ${HOSTNAME} --peer-urls=http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380 | grep \"^ETCD_\" > /var/run/etcd/new_member_envs\n\n if [ $? -ne 0 ]; then\n echo \"member add ${HOSTNAME} error.\"\n rm -f /var/run/etcd/new_member_envs\n exit 1\n fi\n\n echo \"==> Loading env vars of existing cluster...\"\n sed -ie \"s/^/export /\" /var/run/etcd/new_member_envs\n cat /var/run/etcd/new_member_envs\n . /var/run/etcd/new_member_envs\n\n exec etcd --listen-peer-urls http://${POD_IP}:2380 \\\n --listen-client-urls http://${POD_IP}:2379,http://127.0.0.1:2379 \\\n --advertise-client-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379 \\\n --data-dir /var/run/etcd/default.etcd\n fi\n\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n while true; do\n echo \"Waiting for ${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local to come up\"\n ping -W 1 -c 1 ${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local > /dev/null && break\n sleep 1s\n done\n done\n\n echo \"join member ${HOSTNAME}\"\n # join member\n exec etcd --name ${HOSTNAME} \\\n --initial-advertise-peer-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380 \\\n --listen-peer-urls http://${POD_IP}:2380 \\\n --listen-client-urls http://${POD_IP}:2379,http://127.0.0.1:2379 \\\n --advertise-client-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379 \\\n --initial-cluster-token etcd-cluster-1 \\\n --data-dir /var/run/etcd/default.etcd \\\n --initial-cluster $(initial_peers) \\\n --initial-cluster-state new",
},
Lifecycle: &corev1.Lifecycle{
PreStop: &corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{
"/bin/sh", "-ec",
"HOSTNAME=$(hostname)\n\n member_hash() {\n etcdctl member list | grep -w \"$HOSTNAME\" | awk '{ print $1}' | awk -F \",\" '{ print $1}'\n }\n\n eps() {\n EPS=\"\"\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n EPS=\"${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379\"\n done\n echo ${EPS}\n }\n\n export ETCDCTL_ENDPOINTS=$(eps)\n SET_ID=${HOSTNAME##*-}\n\n # Removing member from cluster\n if [ \"${SET_ID}\" -ge ${INITIAL_CLUSTER_SIZE} ]; then\n echo \"Removing ${HOSTNAME} from etcd cluster\"\n etcdctl member remove $(member_hash)\n if [ $? -eq 0 ]; then\n # Remove everything otherwise the cluster will no longer scale-up\n rm -rf /var/run/etcd/*\n fi\n fi",
},
},
},
},
},
}
}
func MutateHeadlessSvc(cluster *v1alpha1.EtcdCluster, svc *corev1.Service) {
svc.Labels = map[string]string{
EtcdClusterCommonLabelKey: "etcd",
}
svc.Spec = corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Selector: map[string]string{
EtcdClusterLabelKey: cluster.Name,
},
Ports: []corev1.ServicePort{
corev1.ServicePort{
Name: "peer",
Port: 2380,
},
corev1.ServicePort{
Name: "client",
Port: 2379,
},
},
}
}
上面的代码虽然很多,但逻辑很简单,就是根据我们的 EtcdCluter 去构造 StatefulSet 和 Headless SVC 资源对象,构造完成后,当我们创建 EtcdCluster 的时候就可以在控制器的 Reconcile 函数中去进行逻辑处理了,这里我们也可以使用前面示例中的代码来简单处理即可,代码如下所示:
// controllers/etcdcluster_controller.go
func (r *EtcdClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("etcdcluster", req.NamespacedName)
// 首先我们获取 EtcdCluster 实例
var etcdCluster etcdv1alpha1.EtcdCluster
if err := r.Client.Get(ctx, req.NamespacedName, &etcdCluster); err != nil {
// EtcdCluster was deleted,Ignore
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// 得到 EtcdCluster 过后去创建对应的StatefulSet和Service
// CreateOrUpdate
// (就是观察的当前状态和期望的状态进行对比)
// 调谐,获取到当前的一个状态,然后和我们期望的状态进行对比是不是就可以
// CreateOrUpdate Service
var svc corev1.Service
svc.Name = etcdCluster.Name
svc.Namespace = etcdCluster.Namespace
or, err := ctrl.CreateOrUpdate(ctx, r, &svc, func() error {
// 调谐必须在这个函数中去实现
MutateHeadlessSvc(&etcdCluster, &svc)
return controllerutil.SetControllerReference(&etcdCluster, &svc, r.Scheme)
})
if err != nil {
return ctrl.Result{}, err
}
log.Info("CreateOrUpdate", "Service", or)
// CreateOrUpdate StatefulSet
var sts appsv1.StatefulSet
sts.Name = etcdCluster.Name
sts.Namespace = etcdCluster.Namespace
or, err = ctrl.CreateOrUpdate(ctx, r, &sts, func() error {
// 调谐必须在这个函数中去实现
MutateStatefulSet(&etcdCluster, &sts)
return controllerutil.SetControllerReference(&etcdCluster, &sts, r.Scheme)
})
if err != nil {
return ctrl.Result{}, err
}
log.Info("CreateOrUpdate", "StatefulSet", or)
return ctrl.Result{}, nil
}
这里我们就是去对我们的 EtcdCluster 对象进行调谐,然后去创建或者更新对应的 StatefulSet 或者 Headless SVC 对象,逻辑很简单,这样我们就实现我们的第一个版本的 etcd-operator。
调试
接下来我们首先安装我们的 CRD 对象,让我们的 Kubernetes 系统识别我们的 EtcdCluster 对象:
➜ make install
/Users/ych/devs/projects/go/bin/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
kustomize build config/crd | kubectl apply -f -
customresourcedefinition.apiextensions.k8s.io/etcdclusters.etcd.ydzs.io configured
然后运行控制器:
➜ make run
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
/Users/ych/devs/projects/go/bin/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
go run ./main.go
2020-11-20T17:44:48.222+0800 INFO controller-runtime.metrics metrics server is starting to listen {"addr": ":8080"}
2020-11-20T17:44:48.223+0800 INFO setup starting manager
2020-11-20T17:44:48.223+0800 INFO controller-runtime.manager starting metrics server {"path": "/metrics"}
2020-11-20T17:44:48.223+0800 INFO controller-runtime.controller Starting EventSource {"controller": "etcdcluster", "source": "kind source: /, Kind="}
2020-11-20T17:44:48.326+0800 INFO controller-runtime.controller Starting Controller {"controller": "etcdcluster"}
2020-11-20T17:44:48.326+0800 INFO controller-runtime.controller Starting workers {"controller": "etcdcluster", "worker count": 1}
控制器启动成功后我们就可以去创建我们的 Etcd 集群了,将示例 CR 资源清单修改成下面的 YAML:
apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdCluster
metadata:
name: etcd-sample
spec:
size: 3
image: cnych/etcd:v3.4.13
另外开启一个终端创建上面的资源对象:
➜ kubectl apply -f config/samples/etcd_v1alpha1_etcdcluster.yaml
etcdcluster.etcd.ydzs.io/etcd-sample created
创建完成后我们可以查看对应的 EtcdCluster 对象:
➜ kubectl get etcdcluster
NAME AGE
etcd-sample 2m35s
对应也会自动创建我们的 StatefulSet 和 Service 资源清单:
➜ kubectl get all -l app=etcd
NAME READY STATUS RESTARTS AGE
pod/etcd-sample-0 1/1 Running 0 85s
pod/etcd-sample-1 1/1 Running 0 71s
pod/etcd-sample-2 1/1 Running 0 66s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/etcd-sample ClusterIP None <none> 2380/TCP,2379/TCP 86s
NAME READY AGE
statefulset.apps/etcd-sample 3/3 87s
到这里我们的 Etcd 集群就启动起来了,我们是不是只通过简单的几行代码就实现了一个 etcd-operator。
当然还有很多细节没有处理,比如还没有添加对 StatefulSet 和 Headless SVC 的 RBAC 权限声明以及这两个资源对象变更的 Watch,这个前面我们已经讲解过了,大家可以试着完善这块实现。不过这里我们实现 etcd operator 的方式比较讨巧,我们需要提前去编写启动脚本,这个当然不算一个常规的方式,但是我们知道了如果去启动 etcd 集群了,后续也就可以用 golang 代码去实现了,所以这只是一个一个过程的实现而已~