2021-04-13-Java如何操作RabbitMQ

文章目录

Java操作RabbitMQ

  • 依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.4.3</version>
</dependency>

简单模式

  • 消息提供者
// 1.连接RamabbitMQ
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.189.137"); // 连接地址
factory.setPort(5672); // 设置端口号
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/"); // 配置虚拟主机(每个团队用自己的消息队列)

Connection connection = factory.newConnection();// 创建连接

// 2.通过连接对象获取channel对象(队列,绑定,消息都是通过这个对象操作)
Channel channel = connection.createChannel();

// 3.通过channel创建一个队列(1:队列名称[如果队列不存在就创建])
channel.queueDeclare("myqueue",false,false,false,null);

// 4.发送消息到指定的队列(1:交换机,2:队列的名称,3:其他属性,4:消息体)
String  msg = "hello 中文 2";
channel.basicPublish("","myqueue",null,msg.getBytes("utf-8"));

// 5.断开连接
connection.close();
  • 消息消费者
    工具类提取
public class ConnectionUtils {

    private static ConnectionFactory factory =null;

    static {
        // 1.连接RamabbitMQ
        factory = new ConnectionFactory();
        factory.setHost("192.168.189.137"); // 连接地址
        factory.setPort(5672); // 设置端口号
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/"); // 配置虚拟主机(每个团队用自己的消息队列)

    }

    public static Connection getConnection(){
        Connection connection = null;// 创建连接
        try {
            connection = factory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return connection;
    }
}

消费者

// 1.获取连接对象
Connection connection = ConnectionUtils.getConnection();

// 2.获取Channel
Channel channel = connection.createChannel();

// 3.通过channel创建一个队列(1:队列名称)
channel.queueDeclare("myqueue",false,false,false,null);

// 3.设置监听的队列(1:设置监听的队列,2:,3:消费者的回调方法
channel.basicConsume("myqueue",true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者:"+new String(body,"utf-8"));
    }
});
// 消费者不能关闭连接
  • 注意:
    1.如果在队列没有的创建的情况下,消费者先启动,提供者后启动这时消费者会报错,并且线程也会被阻塞主,而且还获取不到提供者发送的消息。所以在开发中遇到这样的启动顺序的问题,在消息提供者和消息消费者都进行队列的创建。
    2.消费者消费消息是一个同步的过程。开发的过程中一般都会使用一个线程池并发的消费消息。
  • 线程池并发消费消息
// 创建一个线程池
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws IOException {

    // 1.获取连接对象
    Connection connection = ConnectionUtils.getConnection();

    // 2.获取Channel
    Channel channel = connection.createChannel();

    // 3.通过channel创建一个队列(1:队列名称)
    channel.queueDeclare("myqueue",false,false,false,null);

    // 3.设置监听的队列(1:设置监听的队列,2:true为自动回复模式,3:消费者的回调方法)
    channel.basicConsume("myqueue",true,new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("消费者:"+new String(body,"utf-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    });
}

Work模式

work模式和简单模式类似,区别在于它有多个消费者交替获取队列中的内容。实现起来也比简单,以上面的代码为例,重新拷贝一个消费者变成两个消费者即可。

  • Fonout模式(广播类型)
  • 消息提供者
// 1.得到连接对象
Connection connection = ConnectionUtils.getConnection();

// 2.获取channel对象
Channel channel = connection.createChannel();

// 3.创建交换机(fanout:广播类型)
channel.exchangeDeclare("myexchange1","fanout");

// 4.发送消息给交换机(1:交换机名称,2:队列名称,3:其他属性,4:消息内容)
String msg ="hello RebbitMQ";

channel.basicPublish("myexchange","",null,msg.getBytes("utf-8"));

connection.close();
  • 消息消费者1
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();

// 创建一个队列
channel.queueDeclare("queue1",false,false,false,null);

// 把队列和路交换机绑定(1:队列名称,2:交换机名称,3:路由键)
channel.queueBind("queue1","myexchange1","");

channel.basicConsume("queue1",true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("consumer1:"+new String(body,"utf-8"));
    }
});
  • 消息消费者2
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();

// 创建一个队列
channel.queueDeclare("queue2",false,false,false,null);

// 把队列和路交换机绑定(1:队列名称,2:交换机名称,3:路由键)
channel.queueBind("queue2","myexchange1","");

// 设置监听的队列
channel.basicConsume("queue2",true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("consumer2:"+new String(body,"utf-8"));
    }
});

SpringBoot操作RabbitMQ

  • 之前在做商品添加的时候除了做把商品添加到数据库之外,还要添加商品信息添加到索引库,还要生成静态页面,这些都是通过同步来完成的效率比较低。现在我们学习完消息队列后可以进行改造了,利用消息队列异步来完成上面的操作从而来提高效率。
    商品添加完之后可以通过异步的方式给MQ中添加一条消息,然后索引和静态页面消费MQ中的消息即可。
  • 添加依赖
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 配置属性文件
spring.rabbitmq.host=192.168.189.137
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
  • 提供者
  • 创建队列和交换机
    在创建商品的时候要通知索引和item两个工程,所以我们要用广播模式。创建完商品给交换机发送一条消息,然后交换机把消息广播给两个工程。所以我们需要创建两个队列和一个广播。
import org.springframework.amqp.core.*;
@Configuration
public class RabbitMQConfig {

    // 创建一个搜索的队列
    @Bean
    public Queue getSeachQueue() {
        return new Queue("search_queue");
    }

    // 创建一个静态页面的广播
    @Bean
    public Queue getItemQueue(){
        return new Queue("item_queue");
    }

    // 创建一个交换机
    @Bean
    public FanoutExchange getFanoutExchange(){
        return new FanoutExchange("goods_exchange");
    }

    // 把搜索队列绑定到交换机
    @Bean
    public Binding getBinDing1(){
        return BindingBuilder.bind(getSeachQueue()).to(getFanoutExchange());
    }

    // 把详情页面绑定到交换机
    @Bean
    public Binding getBinDing2(){
        return BindingBuilder.bind(getItemQueue()).to(getFanoutExchange());
    }
}
  • 发送消息给交换机
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public int add(Goods goods) {

        int i = goodsMapper.insertSelective(goods);

        System.out.println("GoodsServiceImpl.add:"+goods.getId());

        // 2.同步到索引库
//        searchService.addGoods(goods);

        // 3.创建静态模板
//        HttpUtils.sendRequset("http://localhost:8083/createHtml?gID="+goods.getId());

        // 发送消息给交换机(1:交换机的名称,2:路由键,3:发送的对象[实现序列化接口])
        rabbitTemplate.convertAndSend("goods_exchange","",goods);

        return i;
    }
  • 消费者
    添加依赖和配置属性文件和之前都一样
  • 同步到索引库
@Component
public class MyRabbitMQListener {
    @Autowired
    private ISearchService searchService;

    @RabbitListener(queues = "search_queue")
    public void goodsSysncToSolr(Goods goods){
        searchService.addGoods(goods);
    }
}
  • 生成静态页面
@Component
public class MyRabbitListener {

    @Reference
    private IGoodsService goodsService;

    @Autowired
    private Configuration configuration;

    @RabbitListener(queues = "item_queue")
    public void createItemPage(Goods goodsParam) throws Exception {
      //生成静态页面的代码
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
上一篇:邮件传送协议SMTP


下一篇:Linux使用SMTP服务发送邮件