pre:
系统上的nsq服务已经起来
create a consumer:
package main
import (
"log"
"sync"
"github.com/nsqio/go-nsq"
)
func main() {
wg := &sync.WaitGroup{}
wg.Add(1)
decodeConfig := nsq.NewConfig()
c, err := nsq.NewConsumer("My_NSQ_Topic", "My_NSQ_Channel", decodeConfig)
if err != nil {
log.Panic("Could not create consumer")
}
//c.MaxInFlight defaults to 1
c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
log.Println("NSQ message received:")
log.Println(string(message.Body))
return nil
}))
err = c.ConnectToNSQD("127.0.0.1:4150")
if err != nil {
log.Panic("Could not connect")
}
log.Println("Awaiting messages from NSQ topic \"My NSQ Topic\"...")
wg.Wait()
}
go run consumer.go
2022/01/13 09:13:08 INF 1 [My_NSQ_Topic/My_NSQ_Channel] (127.0.0.1:4150) connecting to nsqd
2022/01/13 09:13:08 Awaiting messages from NSQ topic "My NSQ Topic"...
create a producer.go
package main
import (
"log"
"github.com/nsqio/go-nsq"
)
func main() {
config := nsq.NewConfig()
p, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Panic(err)
}
err = p.Publish("My_NSQ_Topic", []byte("sample NSQ message"))
if err != nil {
log.Panic(err)
}
}
go run producer.go
2022/01/13 09:16:29 INF 1 (127.0.0.1:4150) connecting to nsqd
2022/01/13 09:13:08 INF 1 [My_NSQ_Topic/My_NSQ_Channel] (127.0.0.1:4150) connecting to nsqd
2022/01/13 09:13:08 Awaiting messages from NSQ topic "My NSQ Topic"...
2022/01/13 09:16:29 NSQ message received:
2022/01/13 09:16:29 sample NSQ message
查看nsqadmin的Ui界面:
http://192.168.31.128:4171/topics/My_NSQ_Topic