1.官方包中原生简单限流:限流方式为简单粗暴的设置最大处理任务数,当达到限流链接数时,过载链接不会被丢弃,而是排队处理,这导致客户端关闭后,服务器还会排队处理。
l, _ := net.Listen("tcp", "127.0.0.1:8001")
defer l.Close()
//限流监听
l2 := netutil.LimitListener(l, 4)
l2.Addr()
http.Handle("/test", http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
rand.Seed(time.Now().UnixNano())
sleep := time.Duration(rand.Int() % 15 + 1) * 100
log.Println("===========开始处理",request.URL, sleep)
time.Sleep(time.Millisecond * sleep)
log.Println("===========处理完成",request.URL, sleep)
writer.Write([]byte("hello:" + strconv.Itoa(int(sleep)) + " -> " + request.URL.String()))
}))
http.Serve(l2, nil)
2.官方包令牌桶限流(golang.org/x/time/rate)
func TestLimiterServerByLimiter(t *testing.T) {
//表示令牌桶容量为10,每隔100ms放一个令牌到桶里面
limiter := rate.NewLimiter(rate.Every(time.Second / 10), 10)
l, _ := net.Listen("tcp", "127.0.0.1:8001")
defer l.Close()
http.Handle("/test", http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
reservation := limiter.Reserve()
waitTime := reservation.Delay().Milliseconds()
log.Println("等待毫秒", waitTime, limiter.Limit(), limiter.Burst()," ", request.URL)
if waitTime > 1 { //如果有等待时间,则放弃处理当前请求
reservation.Cancel()
writer.WriteHeader(http.StatusGatewayTimeout)
writer.Write([]byte("Error Logic:" + strconv.Itoa(int(waitTime)) + " -> " + request.URL.String()))
return
}
//模拟业务处理
rand.Seed(time.Now().UnixNano())
sleep := time.Duration(rand.Int() % 15 + 1) * 100
time.Sleep(time.Millisecond * sleep)
writer.Write([]byte("hello:" + strconv.Itoa(int(sleep)) + " -> " + request.URL.String()))
}))
http.Serve(l, nil)
}
模拟链接测试:
func TestLimiterClient(t *testing.T) {
for i := 0; i < 300; i++ {
tag := i
if i % 20 == 0 {
time.Sleep(time.Second)
}
go func() {
client := &http.Client{Timeout: 100 * time.Second}
resp, err := client.Get("http://127.0.0.1:8001/test?" + strconv.Itoa(tag))
if err != nil {
log.Println("Err:",tag, err)
}else{
var buffer [512]byte
result := bytes.NewBuffer(nil)
n, _ := resp.Body.Read(buffer[0:])
result.Write(buffer[0:n])
log.Println("请求成功", result)
}
defer client.CloseIdleConnections()
}()
}
time.Sleep(time.Second * 200)
}
模拟每秒发送20个请求,可以看到几乎是10个请求成功,10个请求失败。
3.uber开源限流(https://github.com/uber-go/ratelimit),太暴力,获取请求时,如果没有令牌,直接sleep????
4.go-zero限流源码:
import (
"errors"
"github.com/tal-tech/go-zero/core/lang"
)
var ErrLimitReturn = errors.New("discarding limited token, resource pool is full, someone returned multiple times")
type Limit struct {
pool chan lang.PlaceholderType
}
func NewLimit(n int) Limit {
return Limit{
pool: make(chan lang.PlaceholderType, n),
}
}
func (l Limit) Borrow() {
l.pool <- lang.Placeholder
}
func (l Limit) Return() error {
select {
case <-l.pool:
return nil
default:
return ErrLimitReturn
}
}
func (l Limit) TryBorrow() bool {
select {
case l.pool <- lang.Placeholder:
return true
default:
return false
}
}
使用方式:
latch := syncx.NewLimit(n)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if latch.TryBorrow() {
defer func() {
if err := latch.Return(); err != nil {
logx.Error(err)
}
}()
next.ServeHTTP(w, r)
} else {
internal.Errorf(r, "concurrent connections over %d, rejected with code %d",
n, http.StatusServiceUnavailable)
w.WriteHeader(http.StatusServiceUnavailable)
}
})
详情参见go-zero源码。
官方包中原生限流和go-zero类似,只不过go-zero支持达到限流请求时,立即返回,而官方是等待处理。说白了,
go-zero就是多了个select语句,有default 返回error分支而已。