Golang使用RabbitMQ消息中间件amqp协议

"github.com/streadway/amqp"

Publish发布

// amqp://<user>:<password>@<ip>:<port>
var addr = "amqp://test:test123@127.0.0.1:5672" //test func main() { // 建立连接
conn, err := amqp.Dial(addr)
if nil != err {
logs.Error(err)
return
} defer conn.Close()
// 申请通道
ch, err := conn.Channel()
if nil != err {
logs.Error(err)
return
} defer ch.Close()
// 定义交换“direct”、“fanout”、“topic”和“headers”
err = ch.ExchangeDeclare("happy", amqp.ExchangeTopic, true, false, false, false, nil)
if nil != err {
logs.Error(err)
return
} data = fmt.Sprintf("hello,world!!!")
//a.b.c.d.e 为发布key,以.分割;
err = ch.Publish("happy", "a.b.c.d.e", false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(data),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
})
if nil != err {
logs.Error(err)
return
} }

Recover 接收

// amqp://<user>:<password>@<ip>:<port>
var addr = "amqp://test:test123@127.0.0.1:5672" //test func main() { // 建立连接
conn, err := amqp.Dial(addr)
if nil != err {
logs.Error(err)
return
} defer conn.Close()
// 申请通道
ch, err := conn.Channel()
if nil != err {
logs.Error(err)
return
} defer ch.Close()
// 定义交换
err = ch.ExchangeDeclare("happy", amqp.ExchangeTopic, true, false, false, false, nil)
if nil != err {
logs.Error(err)
return
} queName := "test.test1.test2"
topic := "a.#" // 定义通道
que, err := ch.QueueDeclare(queName, false, false, false, false, nil)
if nil != err {
logs.Error(err)
} err = ch.QueueBind(que.Name, topic, "happy", false, nil)
if nil != err {
logs.Error(err)
return
} msges, err := ch.Consume(que.Name, "", true, false, false, false, nil)
if nil != err {
logs.Error(err)
return
} logs.Info("start recv") for msg := range msges {
fmt.Println(">>> %s", string(msg.Body)) } }

http管理端口是15672

注:队列应先注册一次,才能收到消息

上一篇:http笔记汇总


下一篇:jquery过滤器之:contains()、.filter()