一、安装Erlang
1.安装erlang环境,erlang环境必须要与rabbitmq对应,需要去官网版本对照!
我的版本较新,可以跟我的对应:
erlang:otp_src_23.2.tar.gz
rabbitmq:rabbitmq-server-generic-unix-3.9.1.tar
附上资源:
链接:https://pan.baidu.com/s/1_URgCFI9eIjhNyqctxtsCw
提取码:b6n4
2.erlang准备环境
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
3.进入目录后cd otp_src_23.2
./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac
4.安装
make && make install
5.配置环境变量vim /etc/profile
export ERLANG_HOME=/usr/local/erlang
export PATH=${ERLANG_HOME}/bin:${PATH}
6.使配置生效 source /etc/profile
7.检验是否成功 erl
二、安装rabbitmq
1.解压后 配置环境vim /etc/profile
export RABBITMQ_HOME=/root/rabbitmq_server-3.9.1
export PATH=${RABBITMQ_HOME}/sbin:${PATH}
保存退出后 source /etc/profile`
进入rabbitmq解压路径
./rabbitmq-plugins enable rabbitmq_management --启动web管理插件
./rabbitmqctl add_user user password --添加用户,密码
./rabbitmqctl set_user_tags user administrator --设置user为administrator权限
./rabbitmq-server -deched --后台启动服务
./rabbitmqctl start_app --启动服务
./rabbitmqctl stop_app --关闭服务
添加一个新用户的好处就是外部访问的时候可以直接登录,默认root的那个账户只能在linux访问
netstat -anp |grep 5672
这样就启动成功了
三、springboot基础使用mq
1.springboot的yml配置
2.rabbitmq交换机、队列、路由配置
@Configuration
public class RabbitMQConfig {
//购物车交换机
public static final String SHOPCAR_EXCHANGE = "SHOPCAR_EXCHANGE";
//购物车队列
public static final String SHOPCAR_QUEUE = "SHOPCAR_QUEUE";
//购物车路由键
public static final String SHOPCAR_ROUTING_KEY = "SHOPCAR_ROUTING_KEY";
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
/**
AcknowledgeMode.NONE:不确认
AcknowledgeMode.AUTO:自动确认
AcknowledgeMode.MANUAL:手动确认
*/
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
//创建购物车交换机
@Bean
public Exchange getStoryExchange() {
return ExchangeBuilder.directExchange(STORY_EXCHANGE).durable(true).build();
}
//创建库存队列
@Bean
public Queue getStoryQueue() {
return new Queue(STORY_QUEUE);
}
//购物车交换机和购物车队列绑定
@Bean
public Binding bindStory() {
return BindingBuilder.bind(getStoryQueue()).to(getStoryExchange()).with(STORY_ROUTING_KEY).noargs();
}
}
3.消费者配置:
@Component
@Slf4j
public class ShopCarListener {
@Autowired
private ShopCarMapper shopCarMapper;
/**
* 监听队列中的消息,如果redis中修改了商品的数量,将修改信息发送到该队列中,然后再将消息同步到数据库中
*
* @param
*/
@RabbitListener(queues = SHOPCAR_QUEUE)
public void handler(Channel channel, Message message) {
String msgContent = null;
try {
if (message != null) {
//信息格式处理
msgContent = new String(message.getBody(), "utf-8");
ProductDto dto = JSON.parseObject(msgContent, ProductDto.class);
log.info(msgContent);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//配置的是手动确认,这里是消息确认
}
} catch (IOException e) {
e.printStackTrace();
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//配置的是手动确认,这里是消息确认
}
}
4.生产者发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
rabbitTemplate.convertAndSend(SHOPCAR_EXCHANGE, SHOPCAR_ROUTING_KEY, JSON.toJSONString(dto));
四、延时队列搭配死信队列(更贴近业务)
1.要在linux的rabbitmq中添加x-delayed-type插件,步骤很简单,首先下载插件,然后把插件放到rabbitmq的plugins目录下,重启rabbitmq,输入命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2.延时队列config配置
@Configuration
public class RabbitMQDelayedConfig {
/**
* 跑腿订单
* 延迟消息交换机
*/
public final static String HELP_DELAY_EXCHANGE = "HELP_DELAY_EXCHANGE";
/**
* 跑腿订单
* 队列
*/
public final static String HELP_DELAY_QUEUE = "HELP_DELAY_QUEUE";
/**
* 跑腿订单
* 路由Key
*/
public final static String HELP_DELAY_ROUTING_KEY = "HELP_DELAY_ROUTING_KEY";
/**
* 死信队列
*/
public final static String HELP_DEAD_EXCHANGE="HELP_DEAD_EXCHANGE";
public final static String HELP_DEAD_QUEUE="HELP_DEAD_QUEUE";
public final static String HELP_DEAD_ROUTING_KEY="HELP_DEAD_ROUTING_KEY";
/**
* 服务订单延迟队列
*/
@Bean
public CustomExchange delayMessageExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//自定义交换机
return new CustomExchange(HELP_DELAY_EXCHANGE, "x-delayed-message", false, false, args);
}
@Bean
public Queue delayMessageQueue() {
Map<String, Object> arguments = new HashMap<>(2);
// 绑定死信交换机
arguments.put("x-dead-letter-exchange", HELP_DEAD_EXCHANGE);
// 绑定死信的路由key
arguments.put("x-dead-letter-routing-key", HELP_DEAD_ROUTING_KEY);
return new Queue(HELP_DELAY_QUEUE, true, false, false, arguments);
}
@Bean
public Binding bindingDelayExchangeAndQueue() {
return BindingBuilder.bind(delayMessageQueue()).to(delayMessageExchange()).with(HELP_DELAY_ROUTING_KEY).noargs();
}
/**
* 死信队列和交换器
*/
@Bean
public Queue deadHeLpLetterQueue() {
return new Queue(HELP_DEAD_QUEUE);
}
@Bean
TopicExchange deadHeLpLetterExchange() {
return new TopicExchange(HELP_DEAD_EXCHANGE);
}
@Bean
Binding bindingDeadHeLpLetterQueue() {
return BindingBuilder.bind(deadHeLpLetterQueue()).to(deadHeLpLetterExchange()).with(HELP_DEAD_ROUTING_KEY);
}
}
3.生产者发送延时消息
Integer ttl = 30 * 60 * 1000;//毫秒 30分钟
rabbitTemplate.convertAndSend(RabbitMQDelayedConfig.HELP_DELAY_EXCHANGE, RabbitMQDelayedConfig.HELP_DELAY_ROUTING_KEY, JSON.toJSONString(order), message -> {
// 设置过期时间
message.getMessageProperties().setDelay(ttl);
return message;
});
4.消费者监听队列以及死信队列监听:
@Component
@Slf4j
public class HelpDealyListener {
@Autowired
private OrderDao orderDao;
@RabbitListener(queues = {RabbitMQDelayedConfig.HELP_DELAY_QUEUE})
// @RabbitHandler
public void receiveMessage(Channel channel, Message message) throws IOException {
String msgContent = null;
OrderEntity dto = null;
log.info("收到消息:" + message.getBody());
try {
msgContent = new String(message.getBody(), "utf-8");
//手动异常进入死信队列
int k=2/0;
dto = JSON.parseObject(msgContent, OrderEntity.class);
OrderEntity orderEntityNow = orderDao.queryById(dto.getId());
if (orderEntityNow.getOrderStatus() == 1) {//如果是待付款,则取消
orderEntityNow.setOrderStatus(0);//设置取消
int result = orderDao.updateOrderStatus(dto.getId());
if (result > 0) {
log.info("[跑腿订单延时消息] - [取消待付款订单,已取消 消费时间] - [{}] - [{}]", LocalDateTime.now(), dto.getCreateTime() + ",dtoId:" + dto.getId());
} else {
log.info("[跑腿订单延时消息] - [取消待付款订单,已付款 消费时间] - [{}] - [{}]", LocalDateTime.now(), dto.getCreateTime() + ",dtoId:" + dto.getId());
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//配置的是手动确认,这里是消息确认
}
} catch (Exception e) {
e.printStackTrace();
log.info("[跑腿订单延时消息] - [消费出错 消费时间] - [{}] - [{}]", LocalDateTime.now(), dto.getCreateTime() + ",dtoId:" + dto.getId());
//异常,ture 重新入队,或者false,进入死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//配置的是手动确认,这里是消息确认
}
}
/**
* 死信消费者,自动签收开启状态下,超过重试次数,或者手动签收,reject或者Nack
*
* @param message
*/
@RabbitListener(queues = HELP_DEAD_QUEUE)
public void handleDeadLetterMessage(Message message, Channel channel, @Headers Map<String, Object> headers) throws IOException {
//可以考虑数据库记录,每次进来查数量,达到一定的数量,进行预警,人工介入处理
log.info("接收到跑腿订单死信消息:---{}---消息ID---{}", new String(message.getBody()), headers.get("spring_returned_message_correlation"));
//回复ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
到这里rabbitmq大致的用法已经差不多了。