1 导入坐标
<!--rocket mq --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.8.0</version> </dependency>
2 编写生产者
/** * 生产者 发送消息 */ public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("39.99.224.53:9876"); producer.start(); for (int i = 1; i <= 10; i++) { //同步 Message msg = new Message("topic1", ("同步消息 hello rocketmq " + i).getBytes()); SendResult result = producer.send(msg); System.out.println("返回结果:" + result); //异步 // Message msg = new Message("topic1", (" 异步消息 hello rocketmq " + i).getBytes()); // producer.send(msg, new SendCallback() { // //表示成功返回结果 // @Override // public void onSuccess(SendResult sendResult) { // System.out.println(sendResult.toString()); // } // //表示发送消息失败 // @Override // public void onException(Throwable throwable) { // System.out.println(throwable.toString()); // } // }); //单向消息 // Message msg = new Message("topic1", (" 单向消息 hello rocketmq " + i).getBytes()); // producer.sendOneway(msg); } TimeUnit.SECONDS.sleep(10); producer.shutdown(); } }
3 编写消费者
/** * 消费者 接收消息 */ public class Consumer { public static void main(String[] args) throws Exception { //1.创建一个接收消息的对象Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.设定接收的命名服务器地址 consumer.setNamesrvAddr("39.99.224.53:9876"); //3.设定接收消息对应的topic1,对应的sub标签为任意* consumer.subscribe("topic1","*"); //设置当前消费者的消费模式 (默认:负债均衡 ) consumer.setMessageModel(MessageModel.CLUSTERING); //设置当前消费者的消费模式 :广播模式 ( 所有客户端接收的消息一摸一样 ) //consumer.setMessageModel(MessageModel.BROADCASTING); //4.开启监听,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //遍历消息列表 for (MessageExt msg : msgs) { System.out.println("接收的消息:" + new String( msg.getBody()) + msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动接收的服务 consumer.start(); System.out.println("接收消息服务,已开启运行"); } }