1、安装 RocketMQ
下载:rocketmq-all-4.8.0-bin-release.zip
1、传入 Linux 服务器
2、解压缩
unzip rocketmq-all-4.8.0-bin-release.zip
3、调整启动参数,
cd rocketmq-all-4.8.0-bin-release/bin
修改默认启动参数,默认启动的最大内存为4G,比较大,修改小一点,否则如果服务器内存不够会启动失败
调整namesrv
vim runserver.sh
调整如下
调整broker
vim runbroker.sh
调整如下
4、启动namesrv和启动broker
启动navmesrv
nohup sh mqnamesrv &
启动broker,注意ip为公网ip,端口为navmesrv的默认端口9876
nohup ./mqbroker -n localhost:9876 &
5、检查是否启动成功
jps -l
也可以查看日志
tail -f ~/logs/rocketmqlogs/broker.log
启动成功
6、测试 RocketMQ
消息发送
export NAMESRV_ADDR=localhost:
./tools.sh org.apache.rocketmq.example.quickstart.Producer
消息接收
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
7、关闭 RocketMQ
./mqshutdown broker
./mqshutdown namesrv
2、安装 RocketMQ 控制台
git clone https://github.com/apache/rocketmq-externals.git
1、进入到rocketmq-console的配置文件,修改如下:
2、打包
mvn clean package -Dmaven.test.skip=true
3、进入 target 启动 jar
java -jar rocketmq-console-ng-2.0.0.jar
打开浏览器访问 localhost:9877,如果报错
这是因为我们的 RocketMQ 安装在 Linux 中,控制台在 windows,Linux 需要开放端口才能访问,开放 10909 和 9876 端口
firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload
重新启动控制台项目
3、Java 实现消息发送
1、pom.xml 中引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
2、生产消息
package com.godfrey;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
@SpringBootTest
class ProviderApplicationTests {
@Test
@DisplayName("测试RocketMQ消息发送")
void test3() throws Exception {
//创建消息生产者
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//设置NameServer
producer.setNamesrvAddr("39.106.41.184:9876");
//启动生产者
producer.start();
//构建消息对象
Message message = new Message("myTopic", "myTag", ("Test MQ").getBytes());
//发送消息
SendResult result = producer.send(message, 1000);
System.out.println(result);
//关闭生产者
producer.shutdown();
}
}
3、直接运行,如果报错 sendDefaultImpl call timeout,可以开放 10911 端口
firewall-cmd --zone=public --add-port=10911/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload
打开 RocketMQ 控制台,可查看消息。
4、Java 实现消息消费
package com.godfrey;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
@SpringBootTest
class ProviderApplicationTests {
@Test
@DisplayName("测试RocketMQ消息接收")
void test4() throws MQClientException {
//创建消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
//设置NameServer
consumer.setNamesrvAddr("39.106.41.184:9876");
//指定订阅的主题和标签
consumer.subscribe("myTopic", "*");
//回调函数
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
}
}
5、Spring Boot 整合 RocketMQ
provider
1、pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
2、application.yml
rocketmq:
name-server: 39.106.41.184:9876
producer:
group: myprovider
3、Order
package com.godfrey.entity;
import java.io.Serializable;
import java.util.Date;
/**
* @author godfrey
* @since 2020-12-27
*/
public class Order implements Serializable {
private static final long serialVersionUID = -5397628182599822017L;
private Integer id;
private String buyerName;
private String buyerTel;
private String address;
private Date createDate;
public Order() {
}
public Order(Integer id, String buyerName, String buyerTel, String address, Date createDate) {
this.id = id;
this.buyerName = buyerName;
this.buyerTel = buyerTel;
this.address = address;
this.createDate = createDate;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getBuyerName() {
return buyerName;
}
public void setBuyerName(String buyerName) {
this.buyerName = buyerName;
}
public String getBuyerTel() {
return buyerTel;
}
public void setBuyerTel(String buyerTel) {
this.buyerTel = buyerTel;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public Date getCreateDate() {
return createDate;
}
public void setCreateDate(Date createDate) {
this.createDate = createDate;
}
@Override
public String toString() {
return "Order{" +
"id=" + id +
", buyerName='" + buyerName + '\'' +
", buyerTel='" + buyerTel + '\'' +
", address='" + address + '\'' +
", createDate=" + createDate +
'}';
}
}
4、Controller
private RocketMQTemplate rocketMQTemplate;
@Autowired
public ProviderController(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
@GetMapping("/create")
public Order create(){
Order order = new Order(
1,
"张三",
"123123",
"软件园",
new Date()
);
this.rocketMQTemplate.convertAndSend("myTopic",order);
return order;
}
consumer
1、pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
2、application.yml
rocketmq:
name-server: 39.106.41.184:9876
3、Order
package com.godfrey.entity;
import java.io.Serializable;
import java.util.Date;
/**
* @author godfrey
* @since 2020-12-27
*/
public class Order implements Serializable {
private static final long serialVersionUID = -5397628182599822017L;
private Integer id;
private String buyerName;
private String buyerTel;
private String address;
private Date createDate;
public Order() {
}
public Order(Integer id, String buyerName, String buyerTel, String address, Date createDate) {
this.id = id;
this.buyerName = buyerName;
this.buyerTel = buyerTel;
this.address = address;
this.createDate = createDate;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getBuyerName() {
return buyerName;
}
public void setBuyerName(String buyerName) {
this.buyerName = buyerName;
}
public String getBuyerTel() {
return buyerTel;
}
public void setBuyerTel(String buyerTel) {
this.buyerTel = buyerTel;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public Date getCreateDate() {
return createDate;
}
public void setCreateDate(Date createDate) {
this.createDate = createDate;
}
@Override
public String toString() {
return "Order{" +
"id=" + id +
", buyerName='" + buyerName + '\'' +
", buyerTel='" + buyerTel + '\'' +
", address='" + address + '\'' +
", createDate=" + createDate +
'}';
}
}
4、Service
package com.godfrey.service;
import com.godfrey.entity.Order;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* @author godfrey
* @since 2020-12-27
*/
@Service
@RocketMQMessageListener(consumerGroup = "myConsumer", topic = "myTopic")
public class SmsService implements RocketMQListener<Order> {
private static final Logger log = LoggerFactory.getLogger(SmsService.class);
@Override
public void onMessage(Order order) {
log.info("新订单{},发短信通知用户", order);
}
}