golang limiter

当我们需要用脚本批量处理数据的时候  如果被调用方需要我们控制下qps的话 就需要用到golang的 limiter来做一个控制
具体看下面的例子:
func main() {
	test()
}

// 首先是通用的开启N个进程来处理数据的代码
func test() {
	startTime := time.Now().Format("2006-01-02 15:04:05.000")
	//定义一个有缓冲的channel
	var ch = make(chan int, 200)
	//异步写入到这个里面 主要是这个里面开控制速率,控制写入速率就能控制请求速率
	go write(ch)
	var wg sync.WaitGroup
	// 开启两个进程处理
	var workNum = 2
	for i := 0; i < workNum; i++ {
		wg.Add(1)
		//开启进程处理
		go func(i int) {
			handler(ch)
			wg.Done()
		}(i)
	}
	wg.Wait()
	endTime := time.Now().Format("2006-01-02 15:04:05.000")
	fmt.Println(startTime)
	fmt.Println(endTime)
}

// time.Millisecond*2 不管几个协程都是2秒1000个 qps=500
// rate.NewLimiter() 一共两个参数,第一个参数是每隔多久可以放入一个,第二个参数可以为0,意思就是初始化的时候里面有多少个容量
func write(ch chan int) {
	//b:初始10个容量,每秒放入一个 可以初始的时候放入0个,我们这里初始化放入了1个
	limiter := rate.NewLimiter(rate.Every(time.Millisecond*1000), 1)
	//也可以用下面这种写法,rate.Limit(10) 表示 每秒10个
	//limiter := rate.NewLimiter(rate.Limit(10), 10)
	for i := 0; i < 30; i++ {
		ch <- i
		r := limiter.Reserve()
		//卧槽 想打印出来延迟时间 不能在sleep之后 也就说这个 r.Delay 是实时的
		fmt.Println("Delay", r.Delay())
		//这里用来做延迟
		time.Sleep(r.Delay())
		if i > 10 {
			//控制速率,变更为没3秒处理一个
			limiter.SetLimit(rate.Every(time.Millisecond * 1000 * 3))
		}
	}
	close(ch)
}

//这里用来处理具体的逻辑
func handler(ch chan int) {
	for {
		if item, ok := <-ch; ok {
			//处理一些的东西
			fmt.Println("item:", item)
		} else {
			return
		}
	}
}

  执行结果:

item: 0
Delay 0s
Delay 1s
item: 1
Delay 999.908199ms
item: 2
Delay 988.3924ms
item: 3
Delay 989.628099ms
item: 4
Delay 994.222999ms
item: 5
Delay 984.920999ms
item: 6
Delay 993.86ms
item: 7
Delay 990.9507ms
item: 8
Delay 990.275499ms
item: 9
Delay 999.776299ms
item: 10
Delay 998.6238ms
item: 11
Delay 2.9834244s
item: 12
Delay 2.999689s
item: 13
Delay 2.9903908s
item: 14
Delay 2.9944316s
item: 15

可以看到从低11次开始 就延迟3秒才开始把数据放入到channel里面

golang limiter

上一篇:网页的动态刷新


下一篇:集合——ArrayList