5.java操作RabbitMQ-简单队列

1.引入依赖

<!--rabbitmq依赖客户端-->
<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
</dependency>

 操作文件的依赖

<!--操作文件流的一个依赖-->
<dependency>
   <groupId>commons-io</groupId>
   <artifactId>commons-io</artifactId>
   <version>2.6</version>
</dependency>

 信道channel声明队列方法详解:

信道channel发送消息:

2.生产者代码

package com.xkj.org.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者:发消息
 */
public class Producer {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.171.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //采用默认交换机,所以可以先不定义
        /**声明队列 */
        //第一个参数,队列名称
        //第二个参数,队列里面的消息是否持久化,是表示存于磁盘上,默认存在内存中。
        //第三个参数,是否共享,该队列是否只供一个消费者进行消费,是否消息共享,true表示可以被多个消费者消费
        //第四个参数,是否自动删除队列,最后一个消费者断开连接以后,该队列是否自动删除,true自动删除
        //第五个参数,其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //发送的消息内容
        String message = "hello world rabbit mq";
        /**发送消息 */
        //第一个参数:发送到那个交换机,本次不写,使用默认交换机
        //第一个参数:路由的key值routingKey,本次是队列名称
        //第一个参数:其他参数
        //第一个参数:消息内容
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("消息发送完毕");
    }
}

生产者发送消息成功以后:

可以看出队列hello中有一个消息正准备被消费。

3.消费者代码

package com.xkj.org.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 */
public class Consumer {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.171.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 声明接收消息的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接收到消息:" + new String(message.getBody()));
        };
        // 取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者消费消息
         * 1.第一个参数,消费那个队列
         * 2.第二个参数,消费成功之后是否要自动应答,true表示自动应答,false表示手动应答
         * 3.第三个参数,消费者成功消费的回调
         * 4.第四个参数,消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

上一篇:KU FPGA FLASH boot失败debug


下一篇:UI设计中的响应式布局策略:让您的界面在各种设备上都表现出色