rocketmq使用记录

官方网站

http://rocketmq.apache.org/

中文说明网页

https://github.com/apache/rocketmq/tree/master/docs/cn

官方代码实践说明

https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

 

自己项目实践代码

pom坐标

<!--rocketmq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.0</version>
        </dependency>

生产者代码

/**
	 * NameServer地址
	 */
	@Value("${space.config.rocketmq.namesrvAddr}")
	private String namesrvAddr;

	@Value("${space.config.rocketmq.topic}")
	private String topic;

	private String producerGroup = "space_producer_group";

	private DefaultMQProducer producer;

	
	public DefaultMQProducer getProducer() {
		return producer;
	}

	@PostConstruct
	public void defaultMQProducer() {


		//生产者的组名
		producer = new DefaultMQProducer(producerGroup);

		//指定NameServer地址,多个地址以 ; 隔开
		producer.setNamesrvAddr(namesrvAddr);
		producer.setVipChannelEnabled(false);


		try {

			/**
			 * Producer对象在使用之前必须要调用start初始化,初始化一次即可
			 * 注意:切记不可以在每次发送消息时,都调用start方法
			 */
			producer.start();

			for (int i = 0; i < 10; i++) {

				String messageBody = "我是消息内容:" + i;

				String message = new String(messageBody.getBytes(), "utf-8");

				//构建消息
				Message msg = new Message(topic /* PushTopic */, "push"/* Tag  */, "key_" + i /* Keys */, message.getBytes());


				//发送消息
				SendResult result = producer.send(msg);

				System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());

			}

			
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			//producer.shutdown();
		}

	}

	public void sendMsg(String msg){
		Message message = new Message(topic, "push", "key_" + new Date(), msg.getBytes());
		try {
			producer.send(message);
		} catch (MQClientException e) {
			e.printStackTrace();
		} catch (RemotingException e) {
			e.printStackTrace();
		} catch (MQBrokerException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

消费者代码

/**
	 * 消费者的组名
	 */
	private String consumerGroup = "hmigroup";

	/**
	 * NameServer地址
	 */
	@Value("${space.config.rocketmq.namesrvAddr}")
	private String namesrvAddr;

	@Value("${space.config.rocketmq.topic}")
	private String topic;

	private String tagExpression = "push";

	private DefaultMQPushConsumer consumer;

	@Autowired
	private SessionManager sessionManager;


	@PostConstruct
	public void defaultMQPushConsumer() {
		

		//消费者的组名
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
		consumer.setInstanceName("RocketConsumer11");

		//指定NameServer地址,多个地址以 ; 隔开
		consumer.setNamesrvAddr(namesrvAddr);
		try {
			//订阅PushTopic下Tag为push的消息
			consumer.subscribe(topic, tagExpression);
			//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
			//如果非第一次启动,那么按照上次消费的位置继续消费
			consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
			consumer.setMessageModel(MessageModel.BROADCASTING);
			consumer.registerMessageListener(new MessageListenerConcurrently() {
				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {

					try {
						
	System.out.println("messageExt: " + messageExt);//输出消息内容
							String messageBody = new String(messageExt.getBody(), "utf-8");
							System.out.println("一号:消费响应:Msg: " + messageExt.getMsgId() + ",msgBody: " + messageBody);//输出消息内容
					} catch (Exception e) {
						e.printStackTrace();
						return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
					}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
				}
			});
			consumer.start();
			this.consumer = consumer;
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

 

 
上一篇:kafka配置参数详解


下一篇:Kafka_kafka的配置文件说明