参考了:https://www.cnblogs.com/mfrank/p/11260355.html#autoid-0-8-0
demo:
package new_rabbitmq import ( "fmt" "github.com/streadway/amqp" "time" ) var ( dm *RabbitMQ dl *RabbitMQ err error url = "amqp://guest:guest@localhost:5672/?heartbeat=5" ) // 获取rabbitmq连接 type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel //队列名称 QueueName string //交换机 Exchange string //key key string //连接信息 Url string } //创建RabbitMQ结构体实例 func NewRabbitMQ(url, queueName, exchange, key string) (*RabbitMQ, error) { rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: exchange, key: key, Url: url} var err error //创建RabbitMQ连接 rabbitmq.conn, err = amqp.Dial(rabbitmq.Url) //rabbitmq.failOnErr(err, "创建连接错误!") if err != nil { return nil, err } rabbitmq.channel, err = rabbitmq.conn.Channel() if err != nil { return nil, err } return rabbitmq, nil } // new一个延迟队列 绑定交换机和queue func NewDelayMq() *RabbitMQ { dm, err = NewRabbitMQ(url, "delay_queue", "delay_exchange", "delay_key") if err != nil { fmt.Println("connect err:" + err.Error()) } dm.SetExchange() dm.SetAndBindQueue(map[string]interface{}{ "x-message-ttl": 10000, // 10s超时 "x-dead-letter-exchange": "dead_letter_exchange", "x-dead-letter-routing-key": "dead_letter_key", }) return dm } // new死信队列 绑定交换机和queue func NewDeadLetterMq() *RabbitMQ { dl, err = NewRabbitMQ(url, "dead_letter_queue", "dead_letter_exchange", "dead_letter_key") if err != nil { fmt.Println("connect err:" + err.Error()) } dl.SetExchange() dl.SetAndBindQueue(nil) return dl } // 延时队列推消息 func DelayPublish() { dm.DelayPublish("test delay1") dm.DelayPublish("test delay2") } // 死信消费 func DeadLetterConsume() { dl.DeadLetterConsume() } func (r *RabbitMQ) SetExchange() { err := r.channel.ExchangeDeclare(r.Exchange, "direct", true, false, false, false, nil) if err != nil { fmt.Println("exchange declare err" + err.Error()) } } func (r *RabbitMQ) SetAndBindQueue(args amqp.Table) { _, err := r.channel.QueueDeclare(r.QueueName, true, false, false, false, args) if err != nil { fmt.Println("queue declare err" + err.Error()) } err = r.channel.QueueBind(r.QueueName, r.key, r.Exchange, false, nil) if err != nil { fmt.Println("queue bind err" + err.Error()) } } func (r *RabbitMQ) DelayPublish(msgStr string) { msg := amqp.Publishing{ DeliveryMode: amqp.Persistent, Timestamp: time.Now(), ContentType: "text/plain", Body: []byte(msgStr), } err := r.channel.Publish(r.Exchange, r.key, false, false, msg) if err != nil { fmt.Println("DelayPublish err" + err.Error()) } fmt.Println("delay queue push msg: " + msgStr) } func (r *RabbitMQ) DeadLetterConsume() { _ = r.channel.Qos(1, 0, false) msgChan, err := r.channel.Consume(r.QueueName, r.key, false, false, false, false, nil, ) if err != nil { fmt.Println("dead consume err:" + err.Error()) } for delivery := range msgChan { _ = delivery.Ack(true) msg := string(delivery.Body) fmt.Println("dead letter get msg:" + msg) } }
test:
package new_rabbitmq import "testing" func TestDelay(t *testing.T) { NewDelayMq() NewDeadLetterMq() DelayPublish() DeadLetterConsume() }