当我们需要用脚本批量处理数据的时候 如果被调用方需要我们控制下qps的话 就需要用到golang的 limiter来做一个控制
具体看下面的例子:
func main() { test() } // 首先是通用的开启N个进程来处理数据的代码 func test() { startTime := time.Now().Format("2006-01-02 15:04:05.000") //定义一个有缓冲的channel var ch = make(chan int, 200) //异步写入到这个里面 主要是这个里面开控制速率,控制写入速率就能控制请求速率 go write(ch) var wg sync.WaitGroup // 开启两个进程处理 var workNum = 2 for i := 0; i < workNum; i++ { wg.Add(1) //开启进程处理 go func(i int) { handler(ch) wg.Done() }(i) } wg.Wait() endTime := time.Now().Format("2006-01-02 15:04:05.000") fmt.Println(startTime) fmt.Println(endTime) } // time.Millisecond*2 不管几个协程都是2秒1000个 qps=500 // rate.NewLimiter() 一共两个参数,第一个参数是每隔多久可以放入一个,第二个参数可以为0,意思就是初始化的时候里面有多少个容量 func write(ch chan int) { //b:初始10个容量,每秒放入一个 可以初始的时候放入0个,我们这里初始化放入了1个 limiter := rate.NewLimiter(rate.Every(time.Millisecond*1000), 1) //也可以用下面这种写法,rate.Limit(10) 表示 每秒10个 //limiter := rate.NewLimiter(rate.Limit(10), 10) for i := 0; i < 30; i++ { ch <- i r := limiter.Reserve() //卧槽 想打印出来延迟时间 不能在sleep之后 也就说这个 r.Delay 是实时的 fmt.Println("Delay", r.Delay()) //这里用来做延迟 time.Sleep(r.Delay()) if i > 10 { //控制速率,变更为没3秒处理一个 limiter.SetLimit(rate.Every(time.Millisecond * 1000 * 3)) } } close(ch) } //这里用来处理具体的逻辑 func handler(ch chan int) { for { if item, ok := <-ch; ok { //处理一些的东西 fmt.Println("item:", item) } else { return } } }
执行结果:
item: 0 Delay 0s Delay 1s item: 1 Delay 999.908199ms item: 2 Delay 988.3924ms item: 3 Delay 989.628099ms item: 4 Delay 994.222999ms item: 5 Delay 984.920999ms item: 6 Delay 993.86ms item: 7 Delay 990.9507ms item: 8 Delay 990.275499ms item: 9 Delay 999.776299ms item: 10 Delay 998.6238ms item: 11 Delay 2.9834244s item: 12 Delay 2.999689s item: 13 Delay 2.9903908s item: 14 Delay 2.9944316s item: 15
可以看到从低11次开始 就延迟3秒才开始把数据放入到channel里面