KafKa

服务注册发现的过程

KafKa

etcd由哪几部分构成?

KafKa

etcd作为一个高可用的键值存储系统,天生就是为了集群化而设计的,一般etcd推荐奇数个节点,推荐的节点数量是 357 构成一个集群。

启动etcd

安装完 etcd 以后,使用 go 进行连接

连接 etcd

import (
	"context"
	"fmt"
	"time"

	"go.etcd.io/etcd/clientv3"
)

func main() {
	// 连接etcd
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"},
		DialTimeout: 5 * time.Second, //超时时间
	})
	if err != nil {
		fmt.Println("connect to etcd failed ,err ", err)
		return
	}
	defer cli.Close()

	// put
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	_, err = cli.Put(ctx, "s4", "真好")
	if err != nil {
		fmt.Println("put to etcd faild err:", err)
		return
	}
	cancel() //写完关闭

	// get
	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	gr, err := cli.Get(ctx, "s4")
	if err != nil {
		fmt.Println("Get from etcd faild err:", err)
		return
	}
	for _, ev := range gr.Kvs {
		fmt.Println("从etcd中读取成功")
		fmt.Printf("key:%s,value :%s", ev.Key, ev.Value)
	}
	cancel() //写完关闭
}

watch

用来监控etcd中key的变化(创建,更改,删除)

import (
	"context"
	"fmt"
	"time"

	"go.etcd.io/etcd/clientv3"
)

func main() {
	// 连接etcd
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"},
		DialTimeout: 5 * time.Second, //超时时间
	})
	if err != nil {
		fmt.Println("connect to etcd failed ,err ", err)
		return
	}
	fmt.Println("etcd 连接成功......")
	defer cli.Close()
	// watch
	watchChan := cli.Watch(context.Background(), "s4")
	for wresp := range watchChan { //如果通道里面没有值得话,就会一直在这里阻塞
		for _, evt := range wresp.Events {
			fmt.Printf("watch 事件,type:%s , key:%s , value: %s\n", evt.Type, evt.Kv.Key, evt.Kv.Value)
		}
	}
}

测试 watch

使用 etcd 的客户端进行测试

etcdctl put s4 "liudehua"

KafKa

上一篇:k8s -- etcd集群部署


下一篇:从零开始入门 K8s | 手把手带你理解 etcd