rabbitMQ 四种 Exchange 模式简单使用(direct fanout topic header 实例)

关键字 

  • Exchange :交换机

  • routing key:路由键


 四种Exchange 模式

  1. direct :需要生产者和消费者绑定相同的Exchange和routing key。

  2. fanout:广播模式需要生产者消费者绑定相同的Exchange。

  3. topic:支持模糊匹配的广播模式以点分隔,*表示一个单词,#表示任意数量(零个或多个)单词。

  4. header:根据生产者和消费者的header中信息进行匹配性能较差 ,x-match [all 匹配所有/any 任意一个]。

1:首先定义一个 ConnectionUtil类

public class ConnectionUtil {

    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        //端口
        factory.setHost("192.168.26.134");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

2:定义一个公共的Produce 类和Consumer 类

import com.mq.rabbit.Test.ConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * Created by Administrator on 2019/7/7.
 */
public class Produce {
    /**
     * 
     * @param exchanageType 交换器类型
     * @param exchanageName 交换器名称
     * @param direct         路由键
     * @param properties     AMQP.BasicProperties 
     * @param message        要发送的消息
     * @throws Exception
     */
    public static void senInfo(String exchanageType, String exchanageName, String direct, AMQP.BasicProperties properties, String message) throws Exception {
        System.out.println("生产者:" + message);
        Connection connection = ConnectionUtil.getConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 指定一个交换器
        channel.exchangeDeclare(exchanageName, exchanageType);
        // 发送消息 
        channel.basicPublish(exchanageName, direct, properties, message.getBytes());
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

import com.mq.rabbit.Test.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;

/**
 * Created by Administrator on 2019/6/30.
 */
public class Consumer {
    /**
     *
     * @param exchanageType 交换器类型
     * @param exchanageName 交换器名称
     * @param direct         路由键
     * @param headers        头信息
     * @throws Exception
     */
    public static void readInfo(String exchanageType,String exchanageName,String direct,Map<String,Object> headers  ) throws  Exception{
        // 创建连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 指定一个交换器
        channel.exchangeDeclare(exchanageName,  exchanageType);
        // 创建一个非持久的、唯一的、自动删除的队列
        String queueName = channel.queueDeclare().getQueue();
        //  queue:对列名称 exchange :交换器名 routingKey :路由键名
        channel.queueBind(queueName, exchanageName, direct,headers);
        // 创建队列消费者
        final com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者收到消息:  " + message + "");
            }
        };
        // 自动响应ack
        channel.basicConsume(queueName, true, consumer);
    }
}

3:direct fanout topic header 实例


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import org.junit.jupiter.api.Test;

import java.util.Hashtable;
import java.util.Map;

/**
 * Created by Administrator on 2019/7/7.
 */
public class TestExchange {


    @Test
    public void direct() throws Exception {
        String exchanageType = BuiltinExchangeType.DIRECT.getType(); // direct 类型
        String exchanageName = "direct_logs1"; // 交换机名称
        String direct = "direct_01"; // 路由键
        String message = "direct 类型消息"; // 消息
        Map<String, Object> headers = null; // 头信息
        AMQP.BasicProperties properties = null; // AMQP.BasicProperties
        Consumer.readInfo(exchanageType, exchanageName, direct, headers);
        for (int i = 0; i < 5; i++) {
            Produce.senInfo(exchanageType, exchanageName, direct, properties, message);
        }
    }

    @Test
    public void fanout() throws Exception {
        String exchanageType = BuiltinExchangeType.FANOUT.getType(); // direct 类型
        String exchanageName = "fanout_logs1"; // 交换机名称
        String direct = "fanout_01"; // 路由键
        String message = "fanout 类型消息"; // 消息
        Map<String, Object> headers = null; // 头信息
        AMQP.BasicProperties properties = null; // AMQP.BasicProperties
        // fanout 广播类型 我们启动多个消费者
        Consumer.readInfo(exchanageType, exchanageName, direct, headers);
        Consumer.readInfo(exchanageType, exchanageName, direct, headers);
        Produce.senInfo(exchanageType, exchanageName, direct, properties, message);

    }

    @Test
    public void topic() throws Exception {
        String exchanageType = BuiltinExchangeType.TOPIC.getType(); // direct 类型
        String exchanageName = "topic_logs1"; // 交换机名称
        Map<String, Object> headers = null; // 头信息
        AMQP.BasicProperties properties = null; // AMQP.BasicProperties
        // topic 模式 * 匹配一个单词 # 匹配所有(必须是.分割 比如 com.*.test  com.#)
        Consumer.readInfo(exchanageType, exchanageName, "*.my", headers);
        Consumer.readInfo(exchanageType, exchanageName, "#.com", headers);
        Consumer.readInfo(exchanageType, exchanageName, "#", headers);
        Produce.senInfo(exchanageType, exchanageName, "a.my", properties, "消息a.my");
        Produce.senInfo(exchanageType, exchanageName, "www.baidu.com", properties, "消息www.baidu.com");
        Produce.senInfo(exchanageType, exchanageName, "taobao.com", properties, "消息www.baidu.com");

    }

    @Test
    public void header() throws Exception {
        String exchanageType = BuiltinExchangeType.DIRECT.getType(); // direct 类型
        String exchanageName = "header_logs1"; // 交换机名称
        String direct = "header_01"; // 路由键
        String message = "header 类型消息"; // 消息
        // 设置消费者
        Map<String, Object> constomerHeaders = new Hashtable<>();
        ; // 头信息
        constomerHeaders.put("x-match", "any");//all:匹配所有 any:匹配一个
        constomerHeaders.put("type", "json");
        constomerHeaders.put("name", "baidu");
        Consumer.readInfo(exchanageType, exchanageName, direct, constomerHeaders);
        // 设置生产者
        Map<String, Object> productHeaders = new Hashtable<>();
        productHeaders.put("type", "json");
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .headers(productHeaders)
                .build(); // AMQP.BasicProperties
        Produce.senInfo(exchanageType, exchanageName, direct, props, message);
    }
}

 

 

 

 

上一篇:RabbitMQ学习系列(一): 介绍


下一篇:SpringBoot成神之路--21.消息队列简介及RabbitMQ的安装与使用(一)