参考:https://www.cnblogs.com/myseries/p/13153797.html
参考:https://www.cnblogs.com/qdhxhz/p/11109696.html
首先创建一个springboot项目,引入rocketmq的依赖:
<!-- rocketmq依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
我们可以定义一个消息体类,用来封装消息。
@Data
public class MessageBody {
// 消息id
private String messageId;
// body组装时间
private long timestamp;
// 来源 附加信息
private String msgSource;
// overload
private Object data;
public MessageBody() {
}
public MessageBody(String msgKey, Object data, String msgSource) {
this.messageId = msgKey;
this.data = data ;
this.msgSource = msgSource;
this.timestamp = System.currentTimeMillis();
}
}
发消息工具类:
@Component
public class MQService {
private final static Logger logger = LoggerFactory.getLogger(MQService.class);
private static enum MSG_TYPE{ ONEWAY, ASYNC, SYNC };
@Autowired
public RocketMQTemplate rocketMQTemplate;
/**
* 发送消息,通用
* @param msg_type
* @param destination
* @param payload
*/
private void sendMsg(MSG_TYPE msg_type, String destination, Object payload, String msgSource){
String msgKey = IdUtils.simpleUUID();
MessageBody msgBody = new MessageBody(msgKey, payload , msgSource);
Message<MessageBody> message = MessageBuilder.withPayload(msgBody).setHeader("KEYS",msgKey ).build();
logger.info(String.format("消息发送 MQService 开始: %s %s", destination, message));
SendResult result = null;
switch (msg_type) {
case ONEWAY:
rocketMQTemplate.sendOneWay(destination, message);
break;
case ASYNC:
rocketMQTemplate.asyncSend(destination, message,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable throwable) {
logger.error("MQService:" + ExceptionUtils.getStackTrace(throwable));
throw new CustomException(String.format("消息发送失败 topic_tag:%s", destination ));
}
});
break;
case SYNC:
result = rocketMQTemplate.syncSend(destination, message);
break;
}
logger.info(String.format("消息发送 MQService 结束: msgId: %s dest: %s msg: %s",result != null ? result.getMsgId() : "", destination, message));
}
/**
* 同步发送消息,会确认应答
* @param destination
* @param payload
*/
public void syncSendMsg(String destination, Object payload, String msgSource){
sendMsg(MSG_TYPE.SYNC,destination, payload,msgSource) ;
}
/**
* 同步发送消息,会确认应答
* @param topic
* @param tag
* @param payload
*/
public void syncSendMsg(String topic, String tag, Object payload, String msgSource){
// 发送的消息体,消息体必须存在
// 业务主键作为消息key
String destination = topic + ":" + tag;
syncSendMsg(destination, payload,msgSource);
}
/**
* 异步消息发送,异步日志确认异常
* @param destination
* @param payload
*/
public void asyncSendMsg(String destination, Object payload, String msgSource){
sendMsg(MSG_TYPE.ASYNC,destination, payload,msgSource);
}
/**
* 异步消息发送,异步日志确认异常
* @param topic
* @param tag
* @param payload
* @return
*/
public void asyncSendMsg(String topic, String tag, Object payload, String msgSource){
// 发送的消息体,消息体必须存在
// 业务主键作为消息key
String destination = topic + ":" + tag;
asyncSendMsg(destination, payload,msgSource);
}
/**
* 单向发送消息,不关注结果
* @param destination
* @param payload
*/
public void oneWaySendMsg(String destination, Object payload, String msgSource){
sendMsg(MSG_TYPE.ONEWAY,destination, payload,msgSource);
}
/**
* 单向发送消息,不关注结果
* @param topic
* @param tag
* @param payload
*/
public void oneWaySendMsg(String topic, String tag, Object payload, String msgSource){
// 发送的消息体,消息体必须存在
// 业务主键作为消息key
String destination = topic + ":" + tag;
oneWaySendMsg(destination, payload,msgSource);
}
}
消费者:
@RocketMQMessageListener(topic = "test-topic",nameServer = "${rocketmq.nameServer}",consumerGroup = "${rocketmq.consumer.group}", selectorExpression = "test-tag")
@Component
@Slf4j
public class ComsumerListener implements RocketMQListener<MessageBody> {
@Autowired
private ItestService testService;
@Override
public void onMessage(MessageBody messageBody) {
Map<String, Object> map2=JSON.parseObject(JSON.toJSONString(messageBody.getData()),Map.class);
}
}
data的结构生产者和消费者约定好就行了。