spring boot和RabbitMQ整合实现

1.基于API的方式
首先编写测试类:
代码如下:
package com.example.demo;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.example.demo.eight.User;

@RunWith(SpringRunner.class)
@SpringBootTest
public class HeimaApplicationTests {

@Autowired
private AmqpAdmin amqpAdmin;

@Test
public void amqpAdmin() {
	// 创建一个fanout类型的交换器
	amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
	// 定义两个持久化队列
	amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
	amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));

	// 将队列分别和交换器进行绑定
	amqpAdmin.declareBinding(
			new Binding("fanout_queue_email", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
	amqpAdmin.declareBinding(
			new Binding("fanout_queue_sms", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));

}

}
然后打开RabbitMQ,发现创建成功:最后一个就是
spring boot和RabbitMQ整合实现
spring boot和RabbitMQ整合实现
消息发送者发送消息,发送消息借助一个实体类传递消息,所以创建一个实体类:
package com.example.demo.eight;

public class User {
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
private Integer id;
private String username;
@Override
public String toString() {
return “User [id=” + id + “, username=” + username + “]”;
}

}
然后在在测试类进行编写
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void psubPublisher() {
	User user = new User();
	user.setId(1);
	user.setUsername("石头");
	rabbitTemplate.convertAndSend("fanout_exchange", "", user);
}
运行时会出现错误,发送实体类对象消息是程序出现异常,需要定制其他类型的异常:
package com.example.demo.eight;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
//自定义消息转换器
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}

消息消费者接受消息:
package com.example.demo.eight;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQService {
//工作模式接受,处理邮件业务
@RabbitListener(queues = “fanout_queue_email”)
public void psubConsumerEmail(Message message) {
byte[] body=message.getBody();
String s=new String(body);
System.out.println(“邮件业务接受到消息”+s);
}

//监听
@RabbitListener(queues = "fanout_queue_sms")
public void psubConsumeSms(Message message) {
	byte[] body=message.getBody();
	String s=new String(body);
    System.out.println("短信服务接受到消息"+s);
}

}

然后运行主程序
spring boot和RabbitMQ整合实现
成功实现。

上一篇:RabbitMQ Fanout交换机代码实现


下一篇:app端文章查看,静态化freemarker,分布式文件系统minIO