生产者发送消息时需要把同一类有序消息都发送到相同的消息队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
消费者需要使用有序监听器来消费消息
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
消费者启动后开始平衡分配消费消息队列,当给消费者实例分配了新的消息队列时,首先得先检查一下该消息队列有没有被其他消费者实例锁住,组装锁定消息队列请求实体LockBatchRequestBody
public boolean lock(final MessageQueue mq) {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);
try {
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
boolean lockOK = lockedMq.contains(mq);
log.info("the message queue lock {}, {} {}",
lockOK ? "OK" : "Failed",
this.consumerGroup,
mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
}
return false;
}
获取主broker的地址信息
public FindBrokerResult findBrokerAddressInSubscribe(
final String brokerName,
final long brokerId,
final boolean onlyThisBroker
) {
String brokerAddr = null;
boolean slave = false;
boolean found = false;
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
brokerAddr = map.get(brokerId);
slave = brokerId != MixAll.MASTER_ID;
found = brokerAddr != null;
if (!found && !onlyThisBroker) {
Entry<Long, String> entry = map.entrySet().iterator().next();
brokerAddr = entry.getValue();
slave = entry.getKey() != MixAll.MASTER_ID;
found = true;
}
}
if (found) {
return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
}
return null;
}
批量锁定消息队列,请求码为LOCK_BATCH_MQ = 41
public Set<MessageQueue> lockBatchMQ(
final String addr,
final LockBatchRequestBody requestBody,
final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
request.setBody(requestBody.encode());
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();
return messageQueues;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
broker处理请求
private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(
requestBody.getConsumerGroup(),
requestBody.getMqSet(),
requestBody.getClientId());
LockBatchResponseBody responseBody = new LockBatchResponseBody();
responseBody.setLockOKMQSet(lockOKMQSet);
response.setBody(responseBody.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
尝试锁定客户端请求锁定的那些消息队列
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
final String clientId) {
Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
for (MessageQueue mq : mqs) {
if (this.isLocked(group, mq, clientId)) {
lockedMqs.add(mq);
} else {
notLockedMqs.add(mq);
}
}
if (!notLockedMqs.isEmpty()) {
try {
this.lock.lockInterruptibly();
try {
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (null == groupValue) {
groupValue = new ConcurrentHashMap<>(32);
this.mqLockTable.put(group, groupValue);
}
for (MessageQueue mq : notLockedMqs) {
LockEntry lockEntry = groupValue.get(mq);
if (null == lockEntry) {
lockEntry = new LockEntry();
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
log.info(
"tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}",
group,
clientId,
mq);
}
if (lockEntry.isLocked(clientId)) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
lockedMqs.add(mq);
continue;
}
String oldClientId = lockEntry.getClientId();
if (lockEntry.isExpired()) {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn(
"tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
lockedMqs.add(mq);
continue;
}
log.warn(
"tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
}
} finally {
this.lock.unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
}
return lockedMqs;
}
先判断消息队列是否被该客户端锁定,一个就是客户端id是否相等,另一个就是锁定时间未过期。锁定成功则更新最新的更新时间。最后返回成功锁定的消息队列信息。判断当前是否有消息队列对应的处理队列,有的话就设置锁定状态,更新锁定时间,最后返回该消息队列是否被锁定。如果锁定成功就继续往下走创建处理队列,否则的话就忽略该消息队列。
REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
"rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (groupValue != null) {
LockEntry lockEntry = groupValue.get(mq);
if (lockEntry != null) {
boolean locked = lockEntry.isLocked(clientId);
if (locked) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
}
return locked;
}
}
return false;
}
public boolean isLocked(final String clientId) {
boolean eq = this.clientId.equals(clientId);
return eq && !this.isExpired();
}
public boolean isExpired() {
boolean expired =
(System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
return expired;
}
当有些消息队列没有分配给本实例的话,就需要去掉本实例对该消息队列的锁定
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
try {
if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
try {
return this.unlockDelay(mq, pq);
} finally {
pq.getLockConsume().unlock();
}
} else {
log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
mq,
pq.getTryUnlockTimes());
pq.incTryUnlockTimes();
}
} catch (Exception e) {
log.error("removeUnnecessaryMessageQueue Exception", e);
}
return false;
}
return true;
}
当执行队列中还有消息时,需要等待20s再进行解锁
private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
if (pq.hasTempMessage()) {
log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
@Override
public void run() {
log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
RebalancePushImpl.this.unlock(mq, true);
}
}, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
} else {
this.unlock(mq, true);
}
return true;
}
对消息队列进行解锁,解锁请求体为UnlockBatchRequestBody
public void unlock(final MessageQueue mq, final boolean oneway) {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);
try {
this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);
log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}",
this.consumerGroup,
this.mQClientFactory.getClientId(),
mq);
} catch (Exception e) {
log.error("unlockBatchMQ exception, " + mq, e);
}
}
}
进行解锁请求,请求码为UNLOCK_BATCH_MQ = 42
public void unlockBatchMQ(
final String addr,
final UnlockBatchRequestBody requestBody,
final long timeoutMillis,
final boolean oneway
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
request.setBody(requestBody.encode());
if (oneway) {
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
} else {
RemotingCommand response = this.remotingClient
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
}