RocketMQ

1 导入坐标

        <!--rocket mq -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>    

2 编写生产者

/**
 * 生产者 发送消息
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("39.99.224.53:9876");
        producer.start();
        for (int i = 1; i <= 10; i++) {
            //同步
            Message msg = new Message("topic1", ("同步消息 hello rocketmq " + i).getBytes());
            SendResult result = producer.send(msg);
            System.out.println("返回结果:" + result);

            //异步
//            Message msg = new Message("topic1", (" 异步消息 hello rocketmq " + i).getBytes());
//            producer.send(msg, new SendCallback() {
//                //表示成功返回结果
//                @Override
//                public void onSuccess(SendResult sendResult) {
//                    System.out.println(sendResult.toString());
//                }
//                //表示发送消息失败
//                @Override
//                public void onException(Throwable throwable) {
//                    System.out.println(throwable.toString());
//                }
//            });

            //单向消息
//            Message msg = new Message("topic1", (" 单向消息 hello rocketmq " + i).getBytes());
//            producer.sendOneway(msg);
        }
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}

3 编写消费者

/**
 * 消费者 接收消息
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("39.99.224.53:9876");
        //3.设定接收消息对应的topic1,对应的sub标签为任意*
        consumer.subscribe("topic1","*");

        //设置当前消费者的消费模式  (默认:负债均衡 )
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //设置当前消费者的消费模式 :广播模式 ( 所有客户端接收的消息一摸一样 )
        //consumer.setMessageModel(MessageModel.BROADCASTING);

        //4.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                //遍历消息列表
                for (MessageExt msg : msgs) {
                    System.out.println("接收的消息:" + new String( msg.getBody()) + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动接收的服务
        consumer.start();
        System.out.println("接收消息服务,已开启运行");
    }
}

  

上一篇:JAVA代码之RocketMQ生产和消费数据


下一篇:springboot集成kafka实现producer和consumer