0%

rocketmq-深入消费源码

对比看两种消费方式的实现:顺序消费与并发消费。这里对顺序消费只关注消费端,不关心producer与broker怎么处理顺序消息,假设架构及策略已保证消息的全局或者局部顺序性。通过构建假定前提,我们可以忽略本次讨论的非重点内容。以下仍以Push方式为例。

消费方式

宏观上看rmq自身是一个生产-消费模式,在他各个角色的具体实现中也不乏生产-消费模式的使用。DefaultMQPushConsumerImpl中消息的拉取及拉取成功后的消费均采用生产-消费的方式进行组织(PullRequestConsumeRequest),消费的流程在之前rocketmq-消息重复分析一文中记录的很清楚,不再展开。

不论是顺序还是并发消费,都使用ProcessQueue做为本地消息的存储介质,每个MessageQueue对应一个ProcessQueue,该关系保存在RebalanceImpl中,每次复杂均衡时可能发生改变。ProcessQueue内部用TreeMap<Long, MessageExt>来保存消息,key是消息的offset。TreeMap内部使用的红黑树,根据树的特性可知消息的本地存储是offset有序的,并发消费时DefaultMQPushConsumerImpl去拉取消息会根据offset(RebalanceImpl#getMaxSpan)的跨度判断是否限流。

并发

关于并发消费的源码在rocketmq-消息重复分析中已有分析。

这里在强调下其ack机制的实现,ack是发送响应的过程,来确保消息的送达,在rmq实现中,为了确保消息“至少消费一次”语义,采用 offset + sendback 的方式来实现。

在并发消费下,不论消费成功还是失败offset都会记录为本次消费的最大一个offset,对于消费失败的消息,rmq的consumer会再次发回broker,如果此步骤也失败,降级为本地延迟消费,然后重复消费步骤。

顺序

顺序分全局顺序和局部顺序。全局有序的话就一个队列一个消费者;局部有序情况下,按照某个业务为度将统一纬度的消息发送到指定队列(可以通过自定义发送策略实现),消费者顺序消费分配到的队列消息。PS:广播与集群略有差异,以下默认集群消费。

rocketmq-消息重复分析中有谈到负载均衡的流程,在负载均衡完成的最后有这么一个操作:

1
2
3
4
5
6
7
// RebalanceImpl
// omit

// Rebalance后,更新本地的queue信息,消费者提交PullRequest,从新队列拉取消息
this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);

//omit

这方法的三个参数意义是:当前消费者分配到哪个topic下的哪些队列,当前消费者的消费方式是否为顺序。

下面的代码需要注意对isOrder的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// RebalanceImpl

// 1. 删除已经不再订阅的messageQueue
// omit

// 2. 订阅新的messageQueue,并封装新的PullRequest
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
// 如果为新增messageQueue,需要添加到本地记录
if (!this.processQueueTable.containsKey(mq)) {

// 如果为顺序消费,并且锁messageQueue失败,则忽略该messageQueue
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}

// 删除之前的消费进度
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();

// 计算消费进度,参见 ConsumeFromWhere,该值在启动消费者时设置
long nextOffset = this.computePullFromWhere(mq);

// 进行本地存根,若为新存根则发起PullRequest
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}

// 3. 提交PullRequest进行消息拉取
// omit

我们看到在为顺序消费时,多了一步对messageQueue的加锁操作,看他做了些什么事情。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// RebalanceImpl

public boolean lock(final MessageQueue mq) {
// 找到该messageQueue所在的broker,该broker必须是master并且必须是该messageQueue所在的broker
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);

try {
// 请求broker锁定该messageQueue
// broker在RebalanceLockManager中尝试锁定messageQueue,若锁定成功则保存MessageQueue与clientId的映射关系
// 一个messageQueue只能被一个client锁定,来确保一个messageQueue在顺序情况下只能被一个消费者订阅,保证消费的顺序性
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mmqq : lockedMq) {
// 更新本地存根
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
processQueue.setLocked(true);
// 记录锁定时间
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}

// 返回锁定结果
return lockedMq.contains(mq);
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
}

return false;
}

在顺序消费的模式下,负载均衡时需要锁定队列来避免多个消费者同时订阅一个messageQueue的情况,并发消费模式不需要考虑这些问题。在消息的拉取时,顺序消费也略有不同。我们以Push方式为例来看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

// DefaultMQPushConsumerImpl

// 检查messageQueue是否被锁定
if (processQueue.isLocked()) {
// 如果是第一次拉取该messageQueue
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}

pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
// 如果messageQueue未被锁定,等待一段时间再次尝试拉取,ConsumeMessageOrderlyService启动时会启动任务定时去锁定所有消费的messageQueue
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}

MessageListenerOrderly接口中有这么一段注释A MessageListenerOrderly object is used to receive asynchronously delivered messages orderly. one queue,one thread。意思是顺序消费一个queue对应一个消费线程。因为要顺序消费,必然要保证前一个消费后才能消费后面一个,所以多线程在此处没有存在的必要性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// ConsumeMessageOrderlyService

// 如果发生负载均衡当前消费者不再处理该messageQueue时,processQueue会被标记为删除
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}

// 每个messageQueue对应一把锁,同一时刻只能有一个线程在消费一个messageQueue
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// 检查消费模式及锁的有效性
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}

// 集群模式下检查是否上锁
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}

// 集群模式下检查锁是否过期
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}

// 消费循环时长不能超过MAX_TIME_CONSUME_CONTINUOUSLY
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}

// 一次消费的消息数量,默认为1
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

// 从本地缓存获取消息
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (msgs.isEmpty()) {
break;
}

final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

ConsumeOrderlyStatus status = null;

ConsumeMessageContext consumeMessageContext = null;
// omit hook

long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
// 对processQueue加上消费锁,防止负载均衡时可能会发生争抢
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}

// 参考 ConsumeOrderlyStatus
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}

// omit hook and stats preprocess

// 抛出异常时
if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

// omit hook and stats
// 处理消费结果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}

// 挂起片刻再次尝试消费
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}

再来看下怎么处理消费结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// ConsumeMessageOrderlyService

public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) {
switch (status) {
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
case SUCCESS:
// 清空consumingMsgOrderlyTreeMap,并更新一些统计信息,返回下一个offset的起始
commitOffset = consumeRequest.getProcessQueue().commit();
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
// 检查消息是否已经超过最大消费次数
// 如果没有超过,本地继续尝试消费
// 如果超过,将消息发送到死信队列,不在处理
// 如果发送到死信队列失败,本地继续尝试消费
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
} else {
// omit
}

// 更新消费偏移
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}

return continueConsume;
}

offset保存

offset的持久化可以在本地也可以在远程(broker上,默认RemoteBrokerOffsetStore),offset并不是每次消费完成都会向broker发起持久化请求,有这么几个持久化入口:

  1. 定时任务,默认5S一次
  2. 每次去broker拉取消息时
  3. 消费者shutdown时

后记

ProcessQueue中已offset为key做本地排序怎么能保证消息的顺序呢?很简单,因为broker是顺序写commitLog,所以后来消息的offset一定比先到的消息offset大。

ProcessQueue中为顺序消费保留了一个consumingMsgOrderlyTreeMap字段,该字段保存某次消费的消息,为什么需要做这个字段呢?猜测只是为了应对批量消费的,即ConsumeMessageBatchMaxSize大于1时。

小生不才,以上如有描述有误的地方还望各位不吝赐教 !^_^!

贴出的源码均基于release-4.3.2,为了更好的表达描述的重点,贴出的源代码会有所删减。