1、 rabbitmq简介
rabbitmq是一个消息代理,或者讲是一个消息中间件。主要是用来接收和转发信息的,它是对消息不做任何处理的。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
用来发送消息的程序我们称为生产者:
用来存储信息的我们称为队列,队列只受到内存和磁盘的限制,是一个大的消息缓存区。生产者可以发送消息到队列中,而消费者从队列中接收消息。
等待接收消息的程序我们称为消费者:
下面图中, “P”是生产者, “C”是消费者,中间的框是队列——代表是消息的缓冲区。
下面我们开始我们的简单程序hello world
2 、sending(生产者)
生产者的代码如下:
package rabbitmq.main; import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import rabbitmq.utils.ConnectionUtils; public class Send { private static final String QUEUE_NAME = "rabbitmq_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world";
//往队列里发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道
channel.close();
//关闭连接
connection.close();
}
}
获取连接类的代码如下:
package rabbitmq.utils; import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtils { //返回一个连接类
public static Connection getConnection() throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//设置用户
factory.setUsername("guest");
//密码
factory.setPassword("guest");
//设置端口 不设置默认是5672
factory.setPort(5672);
//创建一个新的连接
Connection connection = factory.newConnection();
return connection;
} }
执行代码后,在rabbitmq的界面会看到你创建的队列,并且里面有一条消息,如下
3、 Receiving(消费者)
消费者从队列中接收信息,消费者监听消息队列,一旦队列中有消息,队列将消息发送到消费者,消息就从队列中删除。
消费者代码如下:
package rabbitmq.main; import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; import rabbitmq.utils.ConnectionUtils; public class Recv {
private static final String QUEUE_NAME = "rabbitmq_queue"; public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建 管道
Channel channel = connection.createChannel();
//创建声明队列(可有可无)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
} }
执行后,将收到一个消息并打印到控制台,消息队列中就没有消息了,如下: