Wrapper
Wrapper提供了一种包装机制,使得在执行某方法前先执行Wrapper,优点Filter的意思;因此可以在客户端和服务器做很多功能:熔断限流、Filter、Auth等。
client代码如下:调用greeter.Hello时先执行logWrap.Call方法,再调用RPC请求。
// log wrapper logs every time a request is made
type logWrapper struct {
client.Client
}
func (l *logWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
fmt.Printf("[wrapper] client request service: %s method: %s\n", req.Service(), req.Method())
return l.Client.Call(ctx, req, rsp)
}
// Implements client.Wrapper as logWrapper
func logWrap(c client.Client) client.Client {
return &logWrapper{c}
}
func main() {
service := micro.NewService(
micro.Name("greeter.client"),
micro.Registry(mdns.NewRegistry()),
// wrap the client
micro.WrapClient(logWrap),
)
service.Init()
greeter := proto.NewGreeterService("greeter", service.Client())
rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"})
if err != nil {
fmt.Println(err)
return
}
fmt.Println(rsp.Greeting)
}
server代码如下:当RPC调用进来时先执行logWrapper,再执行Hello
type Greeter struct{}
func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error {
rsp.Greeting = "Hello " + req.Name
log.Println("in Hello")
return nil
}
// logWrapper is a handler wrapper
func logWrapper(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
log.Printf("[wrapper] server request: %v", req.Method())
err := fn(ctx, req, rsp)
return err
}
}
func main() {
service := micro.NewService(
micro.Name("greeter"),
// wrap the handler
micro.WrapHandler(logWrapper),
micro.Registry(mdns.NewRegistry()),
)
service.Init()
proto.RegisterGreeterHandler(service.Server(), new(Greeter))
if err := service.Run(); err != nil {
fmt.Println(err)
}
}
熔断
Micro提供了两种实现,gobreaker和hystrix,熔断是在客户端实现。先看看 hystrix:
var (
// DefaultTimeout is how long to wait for command to complete, in milliseconds
DefaultTimeout = 1000
// DefaultMaxConcurrent is how many commands of the same type can run at the same time
DefaultMaxConcurrent = 10
// DefaultVolumeThreshold is the minimum number of requests needed before a circuit can be tripped due to health
DefaultVolumeThreshold = 20
// DefaultSleepWindow is how long, in milliseconds, to wait after a circuit opens before testing for recovery
DefaultSleepWindow = 5000
// DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests
DefaultErrorPercentThreshold = 50
// DefaultLogger is the default logger that will be used in the Hystrix package. By default prints nothing.
DefaultLogger = NoopLogger{}
)
type Settings struct {
Timeout time.Duration
MaxConcurrentRequests int
RequestVolumeThreshold uint64
SleepWindow time.Duration
ErrorPercentThreshold int
}
hystrix会根据这5个参数(超时时间、并发请求数、请求量、空歇床、错误率)来选择合适的服务进行调度,目前是使用的 hystrix提供的默认参数,不支持自定义参数,示例:
func TestBreaker(t *testing.T) {
// setup
r := mock.NewRegistry()
s := selector.NewSelector(selector.Registry(r))
c := client.NewClient(
// set the selector
client.Selector(s),
// add the breaker wrapper
client.Wrap(NewClientWrapper()),
)
req := c.NewRequest("test.service", "Test.Method", map[string]string{
"foo": "bar",
}, client.WithContentType("application/json"))
var rsp map[string]interface{}
// Force to point of trip
for i := 0; i < (hystrix.DefaultVolumeThreshold * 3); i++ {
c.Call(context.TODO(), req, rsp)
}
err := c.Call(context.TODO(), req, rsp)
if err == nil {
t.Error("Expecting tripped breaker, got nil error")
}
if err.Error() != "hystrix: circuit open" {
t.Errorf("Expecting tripped breaker, got %v", err)
}
}
gobreaker方案与hystrix类似,可以自定义参数。
限流
ratelimit可以在客户端做,也可以在服务端做;micro提供了两种方案:juju/ratelimit
、uber/ratelimit
。
客户端实现:
func TestRateClientLimit(t *testing.T) {
// setup
r := mock.NewRegistry()
s := selector.NewSelector(selector.Registry(r))
testRates := []int{1, 10, 20, 100}
for _, limit := range testRates {
b := ratelimit.NewBucketWithRate(float64(limit), int64(limit))
c := client.NewClient(
// set the selector
client.Selector(s),
// add the breaker wrapper
client.Wrap(NewClientWrapper(b, false)),//fasle=快速失败?
)
req := c.NewRequest(
"test.service",
"Test.Method",
&TestRequest{},
client.WithContentType("application/json"),
)
rsp := TestResponse{}
for j := 0; j < limit; j++ {
err := c.Call(context.TODO(), req, &rsp)
e := errors.Parse(err.Error())
if e.Code == 429 {
t.Errorf("Unexpected rate limit error: %v", err)
}
}
err := c.Call(context.TODO(), req, rsp)
e := errors.Parse(err.Error())
if e.Code != 429 {
t.Errorf("Expected rate limit error, got: %v", err)
}
}
}
- NewBucketWithRate入参为速率(QPS)和容量(CAP),比如每秒5个请求,最大保持50个活动的请求
- NewClientWrapper第二个参数wait,指示当受到限流时是否等待,如果是false即快速失败,返回(429,too mant request)
服务端实现(以下代码包含了客户端测试代码):
func TestRateServerLimit(t *testing.T) {
// setup
r := mock.NewRegistry()
s := selector.NewSelector(selector.Registry(r))
testRates := []int{1, 10, 20}
for _, limit := range testRates {
b := ratelimit.NewBucketWithRate(float64(limit), int64(limit))
c := client.NewClient(client.Selector(s))
name := fmt.Sprintf("test.service.%d", limit)
s := server.NewServer(
server.Name(name),
// add registry
server.Registry(r),
// add the breaker wrapper
server.WrapHandler(NewHandlerWrapper(b, false)),
)
type Test struct {
*testHandler
}
s.Handle(
s.NewHandler(&Test{new(testHandler)}),
)
if err := s.Start(); err != nil {
t.Fatalf("Unexpected error starting server: %v", err)
}
if err := s.Register(); err != nil {
t.Fatalf("Unexpected error registering server: %v", err)
}
req := c.NewRequest(name, "Test.Method", &TestRequest{}, client.WithContentType("application/json"))
rsp := TestResponse{}
for j := 0; j < limit; j++ {
if err := c.Call(context.TODO(), req, &rsp); err != nil {
t.Fatalf("Unexpected request error: %v", err)
}
}
err := c.Call(context.TODO(), req, &rsp)
if err == nil {
t.Fatalf("Expected rate limit error, got nil: rate %d, err %v", limit, err)
}
e := errors.Parse(err.Error())
if e.Code != 429 {
t.Fatalf("Expected rate limit error, got %v", err)
}
s.Deregister()
s.Stop()
// artificial test delay
time.Sleep(time.Millisecond * 20)
}
}
client.NewClient支持多个Wrapper,将熔断限流功能都添加上
import "github.com/micro/go-plugins/wrapper/breaker/hystrix"
import "github.com/micro/go-plugins/wrapper/ratelimiter/ratelimit"
c := client.NewClient(
client.Wrap(ratelimit.NewClientWrapper(b, false)),
client.Wrap(hystrixNewClientWrapper()),
)