获取mq连接
//获取mq的连接
public Connection getConnection(String virtualHost) throws IOException, TimeoutException {
//定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//设置端口
factory.setPort(5672);
//设置路径
factory.setVirtualHost(virtualHost);
//设置用户名
factory.setUsername("user1");
//设置密码
factory.setPassword("user1");
return factory.newConnection();
}
定义生产者
@Autowired
private MessageUtils messageUtils;
public Object setMessage(String queueName,String msg) throws IOException, TimeoutException{
//获取一个连接
Connection connection = messageUtils.getConnection("user_num1");
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列申明
channel.queueDeclare(queueName,false,false,false,null);
channel.basicPublish("",queueName,null,msg.getBytes());
//关闭通道
channel.close();
connection.close();
return msg;
}
定义消费者
@Autowired
private MessageUtils messageUtils;
public Object getMessage(String queueName) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = messageUtils.getConnection("user_num1");
//创建频道
Channel channel = connection.createChannel();
//定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(queueName,true,consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msgString = new String(delivery.getBody());
return msgString;
}
}