// 检查messageQueue是否被锁定 if (processQueue.isLocked()) { // 如果是第一次拉取该messageQueue if (!pullRequest.isLockedFirst()) { finallongoffset=this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); booleanbrokerBusy= offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset); }
pullRequest.setLockedFirst(true); pullRequest.setNextOffset(offset); } } else { // 如果messageQueue未被锁定,等待一段时间再次尝试拉取,ConsumeMessageOrderlyService启动时会启动任务定时去锁定所有消费的messageQueue this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); log.info("pull message later because not locked in broker, {}", pullRequest); return; }
在MessageListenerOrderly接口中有这么一段注释A MessageListenerOrderly object is used to receive asynchronously delivered messages orderly. one queue,one thread。意思是顺序消费一个queue对应一个消费线程。因为要顺序消费,必然要保证前一个消费后才能消费后面一个,所以多线程在此处没有存在的必要性。
// 如果发生负载均衡当前消费者不再处理该messageQueue时,processQueue会被标记为删除 if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; }
// 每个messageQueue对应一把锁,同一时刻只能有一个线程在消费一个messageQueue finalObjectobjLock= messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { // 检查消费模式及锁的有效性 if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { finallongbeginTime= System.currentTimeMillis(); for (booleancontinueConsume=true; continueConsume; ) { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; }
// 集群模式下检查是否上锁 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; }