output-http-queue-len
output_http.go 各种初始化
func NewHTTPOutput(address string, config *HTTPOutputConfig) PluginReadWriter { o := new(HTTPOutput) var err error config.url, err = url.Parse(address) if err != nil { log.Fatal(fmt.Sprintf("[OUTPUT-HTTP] parse HTTP output URL error[%q]", err)) } if config.url.Scheme == "" { config.url.Scheme = "http" } config.rawURL = config.url.String() if config.Timeout < time.Millisecond*100 { config.Timeout = time.Second } if config.BufferSize <= 0 { config.BufferSize = 100 * 1024 // 100kb } if config.WorkersMin <= 0 { config.WorkersMin = 1 } if config.WorkersMin > 1000 { config.WorkersMin = 1000 } if config.WorkersMax <= 0 { config.WorkersMax = math.MaxInt32 // idealy so large } if config.WorkersMax < config.WorkersMin { config.WorkersMax = config.WorkersMin } if config.QueueLen <= 0 { config.QueueLen = 1000 } if config.RedirectLimit < 0 { config.RedirectLimit = 0 } if config.WorkerTimeout <= 0 { config.WorkerTimeout = time.Second * 2 } o.config = config o.stop = make(chan bool) if o.config.Stats { o.queueStats = NewGorStat("output_http", o.config.StatsMs) } o.queue = make(chan *Message, o.config.QueueLen) if o.config.TrackResponses { o.responses = make(chan *response, o.config.QueueLen) } // it should not be buffered to avoid races o.stopWorker = make(chan struct{}) if o.config.ElasticSearch != "" { o.elasticSearch = new(ESPlugin) o.elasticSearch.Init(o.config.ElasticSearch) } o.client = NewHTTPClient(o.config) o.activeWorkers += int32(o.config.WorkersMin) for i := 0; i < o.config.WorkersMin; i++ { go o.startWorker() } go o.workerMaster() return o }
队列用在哪儿
func (o *HTTPOutput) startWorker() { for { select { case <-o.stopWorker: return case msg := <-o.queue: o.sendRequest(o.client, msg) } } } // PluginWrite writes message to this plugin func (o *HTTPOutput) PluginWrite(msg *Message) (n int, err error) { if !isRequestPayload(msg.Meta) { return len(msg.Data), nil } select { case <-o.stop: return 0, ErrorStopped case o.queue <- msg: } if o.config.Stats { o.queueStats.Write(len(o.queue)) } if len(o.queue) > 0 { // try to start a new worker to serve if atomic.LoadInt32(&o.activeWorkers) < int32(o.config.WorkersMax) { go o.startWorker() atomic.AddInt32(&o.activeWorkers, 1) } } return len(msg.Data) + len(msg.Meta), nil }