java练习生 - 使用Rabbit MQ

方案1:spring-cloud-starter-stream-rabbit

一、添加依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

二、添加配置(application.yml)

spring:
  application:
    name: demo1
  rabbitmq:
    host: 10.10.10.10
    port: 5672
    username: guest
    password: guest
    virtualHost: /dev-test

三、创建操作类

生产者:

java练习生 - 使用Rabbit MQ
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class MqSender {
    final AmqpTemplate template;

    /**
     * 发送消息的方法
     *
     * @param msg
     */
    public void send(String msg) {
        //向消息队列发送消息
        //参数一:交换器名称。
        //参数二:路由键
        //参数三:消息
        this.template.convertAndSend("test", "hzq", msg);
    }
}
View Code

消费者:

java练习生 - 使用Rabbit MQ
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "test", autoDelete = "false"),
        exchange = @Exchange(value = "test", type = ExchangeTypes.DIRECT),
        key = "test"
))
public class MqListener {
    @RabbitHandler
    public void process(String msg) {
        log.info("mq接收到信息:{}", msg);
    }

    @RabbitHandler
    public void process(byte[] content) {
        try {
            log.info("mq接收到信息:{}", new String(content,"UTF-8"));
        }
        catch (Exception ex){
            log.error("mq接收到信息异常");
        }
    }
}
View Code

四、使用

java练习生 - 使用Rabbit MQ
@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {

    @Resource
    MqSender mqSender;

    // rabbit测试
    @GetMapping("/rabbit/set")
    public String rabbitSet(){
        log.info("添加rabbit开始。");
        String msg = String.format("发送信息,当前时间戳:{%s}", System.currentTimeMillis());
        mqSender.send(msg);
        log.info("添加rabbit结束。");
        return "添加rabbit结束。";
    }
}
View Code

 

方案2:spring-boot-starter-amqp

一、添加依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

二、添加配置(application.yml)

2.1 添加连接配置信息(application.yml) 允许连接多个mq服务

rabbitmq:
  default:
    host: 10.10.10.10
    port: 5672
    username: guest
    password: guest
    vhost: /dev-test
  other:
    host: 10.10.10.11
    port: 5672
    username: guest
    password: guest
    vhost: /sit-test

2.2 创建配置类

java练习生 - 使用Rabbit MQ
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

/**
 * rabbitMq 配置
 */
@Configuration
@Slf4j
public class RabbitMqConfig {

    // 默认系统的MQ连接配置信息
    @Value("${rabbitmq.default.host}")
    private String host;

    @Value("${rabbitmq.default.port}")
    private int port;

    @Value("${rabbitmq.default.username}")
    private String username;

    @Value("${rabbitmq.default.password}")
    private String password;

    @Value("${rabbitmq.default.vhost}")
    private  String vhost;

    // other系统的MQ连接配置信息
    @Value("${rabbitmq.other.host}")
    private String otherHost;

    @Value("${rabbitmq.other.port}")
    private int otherPort;

    @Value("${rabbitmq.other.username}")
    private String otherUserName;

    @Value("${rabbitmq.other.password}")
    private String otherPassWord;

    @Value("${rabbitmq.other.vhost}")
    private String otherVhost;

    /**
     * 重试配置-初始间隔
     */
    @Value("${spring.rabbitmq.listener.simple.retry.initial-interval:6000}")
    private Long initialInterval;
    /**
     * 重试配置-最大尝试次数
     */
    @Value("${spring.rabbitmq.listener.simple.retry.max-attempts:3}")
    private Integer maxAttempts;

    // 默认系统的MQ工厂

    /**
     * 默认系统的MQ工厂
     * @param configurer
     * @param connectionFactory
     * @return
     */
    @Bean(name = "defaultFactory")
    public SimpleRabbitListenerContainerFactory defaultFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                               @Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
//        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

    @Bean(name="defaultRabbitTemplate")
    @Primary
    public RabbitTemplate defaultRabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(defaultConnectionFactory());
        template.setRetryTemplate(retryTemplate());
//        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    /**
     * 设置成手动模式的原因, 不在使用rabbit的默认分发方式(轮询的方式), 而使用公平的方式。 使用公平的方式可以实现负载均衡。
     * 具体实现:basicQos( prefetchCount=1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
     *
     * @return ConnectionFactory
     */
    @Bean(name="defaultConnectionFactory")
    @Primary
    public ConnectionFactory defaultConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        //设置手动确认模式
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    // vbs系统的MQ工厂

    /**
     * vbs系统的MQ工厂
     * @param configurer
     * @param connectionFactory
     * @return
     */
    @Bean(name = "otherFactory")
    public SimpleRabbitListenerContainerFactory vbsFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                           @Qualifier("otherConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name="otherRabbitTemplate")
    public RabbitTemplate vbsrabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(otherConnectionFactory());
        template.setRetryTemplate(retryTemplate());
        return template;
    }

    @Bean(name="otherConnectionFactory")
    public ConnectionFactory otherConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(otherHost, otherPort);
        connectionFactory.setUsername(otherUserName);
        connectionFactory.setPassword(otherPassWord);
        connectionFactory.setVirtualHost(otherVhost);
        //设置手动确认模式
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    /**
     * 构建rabbitMQ重试模板
     * @return RetryTemplate
     */
    private RetryTemplate retryTemplate(){
        RetryTemplate retryTemplate = new RetryTemplate();
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(initialInterval);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(maxAttempts));
        return retryTemplate;
    }
//
//    /**
//     * 初始化默认系统MQ路由配置
//     */
//    @Bean
//    public RabbitAdmin rabbitAdmin() {
//        RabbitAdmin rabbitAdmin = new RabbitAdmin(defaultConnectionFactory());
//        rabbitAdmin.setAutoStartup(true);
//        //创建队列和交换机,并绑定
//        creactQueueAndChange(rabbitAdmin);
//        return rabbitAdmin;
//    }
//
//    /**
//     * 默认系统中所有的MQ路由配置信息
//     */
//    @Value("${rabbitInfo:{}}")
//    private String rabbitInfo;
//
//    /**
//     * 默认交换机名称
//     */
//    private static final String defualtExchange = "test";

//    /**
//     * 创建队列和交换机,并绑定
//     * @param rabbitAdmin
//     */
//    private void creactQueueAndChange(RabbitAdmin rabbitAdmin){
//        rabbitAdmin.declareExchange(new DirectExchange(defualtExchange,true,false));
//        Map<String, Object> arguments = new HashMap<>();
//        arguments.put("x-max-priority",10); // 设置队列优先级级别
//        List<RabbitObject> rabbitObjectDTOList = JSONObject.parseArray(rabbitInfo,RabbitObject.class);
//        for(RabbitObject rabbitObjectDTO:rabbitObjectDTOList){
//            rabbitAdmin.declareQueue(new Queue(rabbitObjectDTO.queueName,true,false,false,arguments));
//            rabbitAdmin.declareBinding(new Binding(rabbitObjectDTO.queueName, Binding.DestinationType.QUEUE, rabbitObjectDTO.exchangeName, rabbitObjectDTO.routingKey, null));
//        }
//    }
}
View Code

 

三、创建操作类

生产者:

java练习生 - 使用Rabbit MQ
import com.alibaba.fastjson.JSONObject;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * rabbit 发送实例
 */
@Service
@Log4j2
public class RabbitSendServiceImpl implements RabbitSendService, RabbitTemplate.ConfirmCallback {

    private RabbitTemplate rabbitTemplate;

    /**
     * 构造方法注入rabbitTemplate
     */
    @Autowired
    public RabbitSendServiceImpl(@Qualifier("defaultRabbitTemplate") RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void sendMsg(String exchangeName, String routeName, Object message, String messageId) {
        CorrelationData correlationId = new CorrelationData(messageId);
        rabbitTemplate.convertAndSend(exchangeName, routeName, JSONObject.toJSONString(message), correlationId);
    }

//    @Override
//    public void sendMsg(MqInfoEnum mqInfoEnum, Object message, String messageId) {
//        CorrelationData correlationId = new CorrelationData(messageId);
//        rabbitTemplate.convertAndSend(mqInfoEnum.getExchangeName(), mqInfoEnum.getRouteName(), JSONObject.toJSONString(message),
//                correlationId);
//    }

    @Override
    public void sendMsg(String exchangeName, String routeName, Object message) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(exchangeName, routeName, JSONObject.toJSONString(message), correlationId);
    }

//    @Override
//    public void sendMsg(MqInfoEnum mqInfoEnum, Object message) {
//        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
//        rabbitTemplate.convertAndSend(mqInfoEnum.getExchangeName(), mqInfoEnum.getRouteName(), JSONObject.toJSONString(message),
//                correlationId);
//    }


//    @Override
//    public void sendDefaultMsg(Object message, String messageId) {
//        CorrelationData correlationId = new CorrelationData(messageId);
//        rabbitTemplate.convertAndSend(MqInfoEnum.VTS_DEFAULT.getExchangeName(), MqInfoEnum.VTS_DEFAULT.getRouteName(),
//                JSONObject.toJSONString(message),
//                correlationId);
//    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        if (b) {
            log.info("回调[{}]结果:消息成功消费", correlationData);
        } else {
            log.info("回调[{}]结果:消息消费失败:{}", correlationData, s);
        }
    }
}
View Code

消费者:

公共类

java练习生 - 使用Rabbit MQ
import hzq.maven.demo.service.rabbit.RabbitListenService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.stereotype.Service;

import java.io.UnsupportedEncodingException;

@Service
public abstract class RabbitListenServiceImpl implements RabbitListenService {

    /**
     * 监听
     *
     * @param content mq消息
     */
    @RabbitHandler
    public void process(String content) {
        try {
            execute(content);
        } catch (Exception e) {
            // 保存异常消息到消息补偿表中,根据具体要求实现
            // 必须将异常抛出去,不然是不会触发rabbit unack
            throw e;
        }
    }

    @RabbitHandler
    public void process(byte[] content) {
        try {
            String result = new String(content,"UTF-8");
            execute(result);
        } catch (Exception e) {
            // 保存异常消息到消息补偿表中,根据具体要求实现
            // 必须将异常抛出去,不然是不会触发rabbit unack
            try {
                throw e;
            } catch (UnsupportedEncodingException e1) {
                e1.printStackTrace();
            }
        }
    }

    /**
     * 接受消息
     * @param message 消息体
     */
    public abstract void execute(String message);

}
View Code 子类 java练习生 - 使用Rabbit MQ
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import hzq.maven.demo.model.dto.TestMqDataDTO;
import hzq.maven.demo.service.business.TestService;
import hzq.maven.demo.service.rabbit.impl.RabbitListenServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 测试消息队列
 */
@Slf4j
@Component
@RabbitListener(queues = "test",containerFactory="defaultFactory")
public class TestListen extends RabbitListenServiceImpl {
    @Resource
    private TestService testService;

    public void execute(String message) {
        TestMqDataDTO rabbitMqDataDTO;
        if(JSON.isValid(message)){
            JSONObject jsonObject = JSON.parseObject(message);
            rabbitMqDataDTO = jsonObject.toJavaObject(TestMqDataDTO.class);
        }
        else{
            rabbitMqDataDTO = new TestMqDataDTO();
            rabbitMqDataDTO.setName(message);
        }

        testService.mqListen(rabbitMqDataDTO);
    }
}
View Code

四、使用

java练习生 - 使用Rabbit MQ
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Slf4j
@Service
@RequiredArgsConstructor
public class TestServiceImpl implements TestService {
    @Resource
    private RabbitSendService rabbitSendService;

    @Override
    public void mqSend(TestMqDataDTO testMqDataDTO){
        //推送信息到MQ
        rabbitSendService.sendMsg("test","hzq",testMqDataDTO);
        log.info("mq发送数据成功:" + testMqDataDTO.toString());
    }

    @Override
    public void mqListen(TestMqDataDTO testMqDataDTO){
        //从MQ接收信息
        log.info("mq消费数据成功:" + testMqDataDTO.toString());
        // do...
    }
}
View Code

 

上一篇:二本4年Java经验,五面阿里艰苦经历(定薪45K),回馈一波心得体会


下一篇:EF总结