Go语言学习之12 etcd、contex、kafka消费实例、logagent

本节内容:
    1. etcd介绍与使用
    2. ElastcSearch介绍与使用

1. etcd介绍与使用
    概念:高可用的分布式key-value存储,可以使用配置共享和服务发现
    类似项目:zookeeper和consul
    开发语言:Go
    接口:提供restful的http接口,使用简单
    实现算法:基于raft算法的强一致性、高可用的服务存储目录

2. etcd的应用场景
    a. 服务发现和服务注册
    b. 配置中心
    c. 分布式存储
    d. master选举

3. etcd搭建
    a. 下载etcd release版本:https://github.com/etcd-io/etcd/releases 版本
    b. 解压后,进入到etcd的根目录,直接执行./etcd 可以启动etcd
    c. 使用etcdctl工具更改配置

4. context使用介绍
    a. 如何控制goroutine
    b. 如何保存上下文数据

 (1)使用context处理超时
    ctx, cancel = context.With.Timeout(context.Background(), 2*time.Second)

示例是通过设置ctx超时时间为2s,如果2s类无法接收到baidu的请求返回,则超时异常。

 package main

 import (
"context"
"fmt"
"io/ioutil"
"net/http"
"time"
)
type Result struct {
r *http.Response
err error
} func process() {
ctx, cancel := context.WithTimeout(context.Background(), *time.Second)
defer cancel()
tr := &http.Transport{}
client := &http.Client{Transport: tr}
c := make(chan Result, )
req, err := http.NewRequest("GET", "http://www.baidu.com", nil)
if err != nil {
fmt.Println("http request failed, err:", err)
return
}
go func() {
resp, err := client.Do(req)
pack := Result{r: resp, err: err}
c <- pack
}()
select {
case <-ctx.Done():
tr.CancelRequest(req)
res := <-c
fmt.Println("Timeout! err:", res.err)
case res := <-c:
defer res.r.Body.Close()
out, _ := ioutil.ReadAll(res.r.Body)
fmt.Printf("Server Response: %s", out)
}
return
}
func main() {
process()
}

ctx_timeout

(2) 使用context保存上下文
    利用context来保存上下文值:

 package main

 import (
"context"
"fmt"
) func process(ctx context.Context) {
ret,ok := ctx.Value("trace_id").(int)
if !ok {
ret =
} fmt.Printf("ret:%d\n", ret) s , _ := ctx.Value("session").(string)
fmt.Printf("session:%s\n", s)
} func main() {
ctx := context.WithValue(context.Background(), "trace_id", )
ctx = context.WithValue(ctx, "session", "sdlkfjkaslfsalfsafjalskfj")
process(ctx)
}

ctx_value

同时还有context ctx_cancel 和 ctx_deadline

 package main

 import (
"context"
"fmt"
"time"
) func gen(ctx context.Context) <-chan int {
dst := make(chan int)
n :=
go func() {
for {
select {
case <-ctx.Done(): //当43行的test函数执行结束之后,执行defer cancel(),则会触发该行
fmt.Println("i exited")
return // returning not to leak the goroutine
case dst <- n:
n++
}
}
}()
return dst
} func test() {
// gen generates integers in a separate goroutine and
// sends them to the returned channel.
// The callers of gen need to cancel the context once
// they are done consuming generated integers not to leak
// the internal goroutine started by gen.
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel when we are finished consuming integers
intChan := gen(ctx)
for n := range intChan {
fmt.Println(n)
if n == {
break
}
}
}
func main() {
test()
time.Sleep(time.Hour)
}

ctx_cancel

 package main

 import (
"context"
"fmt"
"time"
) func main() {
d := time.Now().Add( * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d) // Even though ctx will be expired, it is good practice to call its
// cancelation function in any case. Failure to do so may keep the
// context and its parent alive longer than necessary.
defer cancel() select {
case <-time.After( * time.Second):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err()) //context deadline exceeded
} }

ctx_deadline

5. etcd介绍与使用
    etcd使用示例 (由于虚拟机出现问题,后面的程序全在Windows上面操作):

(1)客户端连接 etcd server端

 package main

 import (
"fmt"
//etcd_client "github.com/coreos/etcd/clientv3"
etcd_client "go.etcd.io/etcd/clientv3"
"time"
) func main() { cli, err := etcd_client.New(etcd_client.Config{
Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
DialTimeout: * time.Second,
})
if err != nil {
fmt.Println("connect failed, err:", err)
return
} fmt.Println("connect succ")
defer cli.Close()
}

etcd_conn

(2)put 和 get

 package main

 import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
) func main() { cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
DialTimeout: * time.Second,
})
if err != nil {
fmt.Println("connect failed, err:", err)
return
} fmt.Println("connect succ")
defer cli.Close()
//设置1秒超时,访问etcd有超时控制
ctx, cancel := context.WithTimeout(context.Background(), *time.Second)
//操作etcd
_, err = cli.Put(ctx, "/logagent/conf/", "192.168.0.1")
//操作完毕,取消etcd
cancel()
if err != nil {
fmt.Println("put failed, err:", err)
return
}
//取值,设置超时为1秒
ctx, cancel = context.WithTimeout(context.Background(), *time.Second)
resp, err := cli.Get(ctx, "/logagent/conf/")
cancel()
if err != nil {
fmt.Println("get failed, err:", err)
return
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
}

put_get

(3)watch(观测key值发生变化)

 package main

 import (
"context"
"fmt"
"time" etcd_client "go.etcd.io/etcd/clientv3"
) func main() { cli, err := etcd_client.New(etcd_client.Config{
Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
DialTimeout: * time.Second,
})
if err != nil {
fmt.Println("connect failed, err:", err)
return
}
defer cli.Close() fmt.Println("connect succ") ctx, cancel := context.WithTimeout(context.Background(), *time.Second)
_, err = cli.Put(ctx, "/logagent/conf/", "")
if err != nil {
fmt.Println("put failed, err:", err)
return
}
cancel() fmt.Println("put succ") for {
rch := cli.Watch(context.Background(), "/logagent/conf/")
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
}

watch

运行上面的watch程序监控key(/logagent/conf/)操作的变化,然后再运行(2)的程序,结果如下:

Go语言学习之12 etcd、contex、kafka消费实例、logagent

kafka消费示例代码:

 package main

 import (
"fmt"
"strings"
"sync" "github.com/Shopify/sarama"
) var (
wg sync.WaitGroup
) func main() { consumer, err := sarama.NewConsumer(strings.Split("192.168:30.136:9092", ","), nil)
if err != nil {
fmt.Println("Failed to start consumer: %s", err)
return
}
partitionList, err := consumer.Partitions("nginx_log")
if err != nil {
fmt.Println("Failed to get the list of partitions: ", err)
return
} fmt.Println(partitionList) for partition := range partitionList {
pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
return
}
defer pc.AsyncClose() go func(pc sarama.PartitionConsumer) {
wg.Add()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
fmt.Println()
}
wg.Done()
}(pc)
}
//time.Sleep(time.Hour)
wg.Wait()
consumer.Close()
}

kafka消费示例代码

6. sync.WaitGroup介绍
1)等待一组groutine结束
2)使用Add方法设置等待的数量加1
3)使用Delete方法设置等待的数量减1
4)当等待的数量等于0,Wait函数返回

sync.WaitGroup实例:

 package main

 import (
"fmt"
"sync"
"time"
) func main() {
wg := sync.WaitGroup{}
for i := ; i < ; i++ {
wg.Add()
go calc(&wg, i)
} wg.Wait() //阻塞,等待所有groutine结束
fmt.Println("all goroutine finish")
} func calc(w *sync.WaitGroup, i int) {
//注意: wg.Add(1) 放到这会有问题,也就是main函数结束比wg.Add(1)要快
fmt.Println("calc:", i)
time.Sleep(time.Second)
w.Done()
}

waitGroup示例

7. ElastcSearch安装及go操作es

(1)安装 es
   1)下载ES,下载地址:https://www.elastic.co/products/elasticsearch,我下载的是 elasticsearch-6.7.1.zip。
   2)修改在解压后根目录下的 /config/elasticsearch.yml 配置:

放开注释并将 youIP换成你自己机器的 ip

cluster.name: my-application
node.name: node-
network.host: youIP
http.port:

3)修改 /config/jvm.options 文件,当然如果机器性能好也可以不用修改:

-Xms512m
-Xmx512m

4)进入根目录,启动es,.\bin\elasticsearch.bat

(2)go 操作 es 示例

安装第三方插件:

go get gopkg.in/olivere/elastic.v2

示例:注意将程序里面的 url = "http://yourIP:9200/",yourIP替换为你安装es机器的 ip:

 package main

 import (
"fmt" elastic "gopkg.in/olivere/elastic.v2"
) type Tweet struct {
User string
Message string
} var (
url = "http://yourIP:9200/"
) func main() {
client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(url))
if err != nil {
fmt.Println("connect es error", err)
return
} fmt.Println("conn es succ") tweet := Tweet{User: "olivere", Message: "Take Five"}
_, err = client.Index().
Index("twitter").
Type("tweet").
Id("").
BodyJson(tweet).
Do()
if err != nil {
// Handle error
panic(err)
return
} fmt.Println("insert succ")
}

es示例

链式存储:

 package main

 import "fmt"

 type Stu struct {
Name string
Age int
} func (p *Stu) SetName(name string) *Stu {
p.Name = name
return p
} func (p *Stu) SetAge(age int) *Stu {
p.Age = age
return p
} func (p *Stu) Print() {
fmt.Printf("age:%d name:%s\n", p.Age, p.Name)
} func main() {
stu := &Stu{}
stu.SetAge().SetName("stu01").Print()
//stu.SetName("stu01")
//stu.Print()
}

链式存储示例

上一篇:loj #6.Guess Number


下一篇:长沙理工大学第十二届ACM大赛-重现赛 K - 大家一起来数二叉树吧