描述:说到请求限流,一般都会用到MQ,无论何种MQ,都需要生产者和消费者才能发挥MQ的强大作用。但在对接项目,可能就会出现对接方不能够配合使用MQ的情况。此时,使用线程池做限流也是一种可行的思路。
流程:
1.需手动实现一个线程池。说到线程池,要考虑的因素有:核心线程数,任务队列,最大线程数,线程空闲时间,保留策略。
①开启线程池,接受任务,每接受一个任务创建一条线程。
②当线程数达到核心线程数时,之后的任务放入任务队列中。建议使用阻塞队列,防止内存溢出。
③当任务队列饱和,会在线程池中创建额外的线程来处理任务,直至达到最大线程数。
④当在线程池中的这部分额外线程处于空闲状态,并且达到线程空闲时间的要求,这部分线程会被销毁。
⑤当达到最大线程数,依然有后续的任务要处理,此时就要对这部分任务的去留做出决策。提供三种保留策略:
Ⅰ.直接丢弃,不予处理。
Ⅱ.开辟脱离线程池的线程来处理。
Ⅲ.将任务队列中等待时间久的任务丢弃,加入后续任务。
2.请求限流,先要了解server的运行原理
①服务端需要有一个监听器用来监听请求连接。当客户端发送来一个请求,服务端会先和客户端建立tcp连接。
②开辟一条线程用来单独处理这条tcp连接中发送来的http请求,直至http请求读取完毕,返回响应。默认tcp连接会存活90秒。我们要执行请求限流的操作便在此处进行,详细操作看代码。
//线程池 package myroutine import ( "fmt" "strconv" ) /** * @ Author : jgbb * @ Date : Created in 2019/9/4 13:19 * @ Description : TODO 线程池 * @ Modified by : * @ Version : 1.0 */ func Init(poolSize int,name string) *RoutinePool{ pool := &RoutinePool{ Queue:make(chan func()), PoolSize:poolSize, Name:name, } defer pool.ExeTask() return pool } type RoutinePool struct { //缓存任务 Queue chan func() PoolSize int Name string } // 添加任务到线程池 func (pool *RoutinePool) AddTask(task func()){ pool.Queue <- task } //执行任务 func (pool *RoutinePool) ExeTask(){ counter := make(chan int) for i:=0;i<pool.PoolSize;i++ { go func() { j := <- counter//哪条线程 var count int64= 0//计数(线程跑了多少次) var stdout =pool.Name+"\t线程"+strconv.Itoa(j)+"\t" for task := range pool.Queue{ count++ fmt.Printf("%p\t%s\n",pool,stdout+strconv.FormatInt(count,10)) task() } }() counter <- i } }
//对server源码修改
const(
DefaultPoolSize = 10
)
//每个请求对应的线程池 var PoolMap = make(map[string]*myroutine.RoutinePool) //golang源码 func (srv *Server) Serve(l net.Listener) error { defer l.Close() if fn := testHookServerServe; fn != nil { fn(srv, l) } var tempDelay time.Duration // how long to sleep on accept failure if err := srv.setupHTTP2_Serve(); err != nil { return err } srv.trackListener(l, true) defer srv.trackListener(l, false) baseCtx := context.Background() // base is always background, per Issue 16220 ctx := context.WithValue(baseCtx, ServerContextKey, srv) for { rw, e := l.Accept() if e != nil { select { case <-srv.getDoneChan(): return ErrServerClosed default: } if ne, ok := e.(net.Error); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay) time.Sleep(tempDelay) continue } return e } tempDelay = 0 c := srv.newConn(rw) /***********************修改开始*******************************/ //将c.server(ctx)的处理过程放入线程池中 //首先需要请求path,根据path获取对应的线程池 c.r = &connReader{conn: c} c.r.setReadLimit(c.server.initialReadLimitSize()) //若不setReadLimit,无法读取到缓冲流中的数据 c.bufr = newBufioReader(c.r)//用来读取流 s,err := c.bufr.Peek(100)//缓冲流使用peek(),游标不会进行计数,这样才能流中的数据在后面的处理中复用。否则后续读取流会从游标开始 news := make([]byte,0) for i:=0;i<100;i++ { news = append(news,s[i]) if s[i] == 10 { //10表示换行符,到此获取到所需信息 break } } if err != nil { fmt.Errorf("my err:%v",err) } newss := string(news) //请求path当作线程池名称 poolName := newss[strings.Index(newss,"/"):strings.LastIndex(newss," ")] c.setState(c.rwc, StateNew) // before Serve can return //go c.serve(ctx) //源码 //放入线程池处理请求 putPoolMap(poolName).AddTask(func() { c.serve(ctx) }) /***********************修改结束*******************************/ } } //生成线程池 //-参数1:线程池大小 //-参数2:线程池名称 func PutPoolMap(poolSize int,name string) *myroutine.RoutinePool{ if _,ok := PoolMap[name]; !ok { //如果不存在对应的线程池,则生成一个 PoolMap[name] = myroutine.Init(poolSize,name) } //返回对应的线程池 return PoolMap[name] } //默认使用此方法生成线程池 //-参数1:线程池名称 func putPoolMap(name string) *myroutine.RoutinePool{ return PutPoolMap(DefaultPoolSize,name) }