go 限流

1.官方包中原生简单限流:限流方式为简单粗暴的设置最大处理任务数,当达到限流链接数时,过载链接不会被丢弃,而是排队处理,这导致客户端关闭后,服务器还会排队处理。

l, _ := net.Listen("tcp", "127.0.0.1:8001")
defer l.Close()
//限流监听
l2 := netutil.LimitListener(l, 4)
l2.Addr()
http.Handle("/test", http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
	rand.Seed(time.Now().UnixNano())
	sleep := time.Duration(rand.Int() % 15 + 1) * 100
	log.Println("===========开始处理",request.URL, sleep)
	time.Sleep(time.Millisecond * sleep)
	log.Println("===========处理完成",request.URL, sleep)
	writer.Write([]byte("hello:" + strconv.Itoa(int(sleep)) + "     -> " + request.URL.String()))
}))
http.Serve(l2, nil)

2.官方包令牌桶限流(golang.org/x/time/rate)

func TestLimiterServerByLimiter(t *testing.T) {
	//表示令牌桶容量为10,每隔100ms放一个令牌到桶里面
	limiter := rate.NewLimiter(rate.Every(time.Second / 10), 10)
	l, _ := net.Listen("tcp", "127.0.0.1:8001")
	defer l.Close()
	http.Handle("/test", http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
		reservation := limiter.Reserve()
		waitTime := reservation.Delay().Milliseconds()
		log.Println("等待毫秒", waitTime, limiter.Limit(), limiter.Burst(),"   ", request.URL)
		if waitTime > 1 { //如果有等待时间,则放弃处理当前请求
			reservation.Cancel()
			writer.WriteHeader(http.StatusGatewayTimeout)
			writer.Write([]byte("Error Logic:" + strconv.Itoa(int(waitTime))  + "     -> " + request.URL.String()))
			return
		}
		//模拟业务处理
		rand.Seed(time.Now().UnixNano())
		sleep := time.Duration(rand.Int() % 15 + 1) * 100
		time.Sleep(time.Millisecond * sleep)
		writer.Write([]byte("hello:" + strconv.Itoa(int(sleep)) + "     -> " + request.URL.String()))
	}))
	http.Serve(l, nil)
}

模拟链接测试:

func TestLimiterClient(t *testing.T) {
	for i := 0; i < 300; i++ {
		tag := i
		if i % 20 == 0 {
			time.Sleep(time.Second)
		}
		go func() {
			client := &http.Client{Timeout: 100 * time.Second}
			resp, err := client.Get("http://127.0.0.1:8001/test?" + strconv.Itoa(tag))
			if err != nil {
				log.Println("Err:",tag, err)
			}else{
				var buffer [512]byte
				result := bytes.NewBuffer(nil)
				n, _ := resp.Body.Read(buffer[0:])
				result.Write(buffer[0:n])
				log.Println("请求成功", result)
			}
			defer client.CloseIdleConnections()
		}()
	}
	time.Sleep(time.Second * 200)
}

模拟每秒发送20个请求,可以看到几乎是10个请求成功,10个请求失败。

3.uber开源限流(https://github.com/uber-go/ratelimit),太暴力,获取请求时,如果没有令牌,直接sleep????

4.go-zero限流源码:

import (
	"errors"
	"github.com/tal-tech/go-zero/core/lang"
)
var ErrLimitReturn = errors.New("discarding limited token, resource pool is full, someone returned multiple times")
type Limit struct {
	pool chan lang.PlaceholderType
}
func NewLimit(n int) Limit {
	return Limit{
		pool: make(chan lang.PlaceholderType, n),
	}
}
func (l Limit) Borrow() {
	l.pool <- lang.Placeholder
}
func (l Limit) Return() error {
	select {
	case <-l.pool:
		return nil
	default:
		return ErrLimitReturn
	}
}
func (l Limit) TryBorrow() bool {
	select {
	case l.pool <- lang.Placeholder:
		return true
	default:
		return false
	}
}

使用方式:

latch := syncx.NewLimit(n)
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			if latch.TryBorrow() {
				defer func() {
					if err := latch.Return(); err != nil {
						logx.Error(err)
					}
				}()

				next.ServeHTTP(w, r)
			} else {
				internal.Errorf(r, "concurrent connections over %d, rejected with code %d",
					n, http.StatusServiceUnavailable)
				w.WriteHeader(http.StatusServiceUnavailable)
			}
		})

详情参见go-zero源码。
官方包中原生限流和go-zero类似,只不过go-zero支持达到限流请求时,立即返回,而官方是等待处理。说白了,
go-zero就是多了个select语句,有default 返回error分支而已。

上一篇:java线程常用方法


下一篇:C++制作鼠标连点器