springboot整合RocketMq

参考:https://www.cnblogs.com/myseries/p/13153797.html
springboot整合RocketMq
参考:https://www.cnblogs.com/qdhxhz/p/11109696.html

首先创建一个springboot项目,引入rocketmq的依赖:

 <!-- rocketmq依赖 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>

我们可以定义一个消息体类,用来封装消息。

@Data
public class MessageBody {
    // 消息id
    private String messageId;
    // body组装时间
    private long timestamp;
    // 来源 附加信息
    private String msgSource;
    // overload
    private Object data;
    public MessageBody() {

    }
    public MessageBody(String msgKey, Object data, String msgSource) {
        this.messageId = msgKey;
        this.data = data ;
        this.msgSource = msgSource;
        this.timestamp = System.currentTimeMillis();
    }
}

发消息工具类:


@Component
public class MQService {
    private final static Logger logger = LoggerFactory.getLogger(MQService.class);
    private static enum MSG_TYPE{ ONEWAY, ASYNC, SYNC };
    @Autowired
    public RocketMQTemplate rocketMQTemplate;

    /**
     * 发送消息,通用
     * @param msg_type
     * @param destination
     * @param payload
     */
    private void sendMsg(MSG_TYPE msg_type, String destination, Object payload, String msgSource){
        String msgKey = IdUtils.simpleUUID();
        MessageBody msgBody = new MessageBody(msgKey, payload , msgSource);
        Message<MessageBody> message = MessageBuilder.withPayload(msgBody).setHeader("KEYS",msgKey ).build();
        logger.info(String.format("消息发送 MQService 开始: %s %s", destination, message));
        SendResult result = null;
        switch (msg_type) {
            case ONEWAY:
                rocketMQTemplate.sendOneWay(destination, message);
                break;
            case ASYNC:
                rocketMQTemplate.asyncSend(destination, message,new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                    }
                    @Override
                    public void onException(Throwable throwable) {
                        logger.error("MQService:" + ExceptionUtils.getStackTrace(throwable));
                        throw new CustomException(String.format("消息发送失败 topic_tag:%s", destination ));
                    }
                });
                break;
            case SYNC:
                result = rocketMQTemplate.syncSend(destination, message);
                break;
        }
        logger.info(String.format("消息发送 MQService 结束: msgId: %s dest: %s msg: %s",result != null ? result.getMsgId() : "", destination, message));
    }

    /**
     * 同步发送消息,会确认应答
     * @param destination
     * @param payload
     */
    public void syncSendMsg(String destination, Object payload, String msgSource){
        sendMsg(MSG_TYPE.SYNC,destination, payload,msgSource) ;
    }
    /**
     * 同步发送消息,会确认应答
     * @param topic
     * @param tag
     * @param payload
     */
    public void syncSendMsg(String topic, String tag, Object payload, String msgSource){
        // 发送的消息体,消息体必须存在
        // 业务主键作为消息key
        String destination = topic + ":" + tag;
        syncSendMsg(destination, payload,msgSource);
    }

    /**
     * 异步消息发送,异步日志确认异常
     * @param destination
     * @param payload
     */
    public void asyncSendMsg(String destination, Object payload, String msgSource){
        sendMsg(MSG_TYPE.ASYNC,destination, payload,msgSource);
    }
    /**
     * 异步消息发送,异步日志确认异常
     * @param topic
     * @param tag
     * @param payload
     * @return
     */
    public void asyncSendMsg(String topic, String tag, Object payload, String msgSource){
        // 发送的消息体,消息体必须存在
        // 业务主键作为消息key
        String destination = topic + ":" + tag;
        asyncSendMsg(destination, payload,msgSource);
    }

    /**
     * 单向发送消息,不关注结果
     * @param destination
     * @param payload
     */
    public void oneWaySendMsg(String destination, Object payload, String msgSource){
        sendMsg(MSG_TYPE.ONEWAY,destination, payload,msgSource);
    }
    /**
     * 单向发送消息,不关注结果
     * @param topic
     * @param tag
     * @param payload
     */
    public void oneWaySendMsg(String topic, String tag, Object payload, String msgSource){
        // 发送的消息体,消息体必须存在
        // 业务主键作为消息key
        String destination = topic + ":" + tag;
        oneWaySendMsg(destination, payload,msgSource);
    }
}

消费者:

@RocketMQMessageListener(topic = "test-topic",nameServer = "${rocketmq.nameServer}",consumerGroup = "${rocketmq.consumer.group}", selectorExpression = "test-tag")
@Component
@Slf4j
public class ComsumerListener implements RocketMQListener<MessageBody> {
    @Autowired
    private ItestService testService;
    @Override
    public void onMessage(MessageBody messageBody) {

        Map<String, Object> map2=JSON.parseObject(JSON.toJSONString(messageBody.getData()),Map.class);
       
    }
}

data的结构生产者和消费者约定好就行了。

上一篇:Python编程:pycharm2017设置默认字体大小


下一篇:OSSIM系统启动故障处理方法