0%

rocketmq-消息重复分析

场景描述

有两个namesrv(namesrv1与namesrv2),某个topic分别在两个master上均分队列1,2,3。

当broker-ba的主从与namesrv1网络出现问题,namesrv2正常,此时namesrv出现不一致,当两个从属相同consumerGroup的消费者分别从两个namesrv获取broker信息时,按照均分的策略就会出现c1订阅队列1和2,c2订阅队列2。此时,消息便会重复被消费,而且出现了一个队列被多个消费者订阅现象。

出现这种情况是不是正常的呢?出现这种情况是否可以自动恢复呢?尝试从以下几个角度来分析,部署结构见下图(图中未给出namesrv)。

broker生产环境部署图

以下介绍均假设已经初步了解rocketmq,如果还不是很清楚,可以看下我的这边博客rocketmq-半入门级架构及核心流程概览

部署结构

在聊重复消费前,先大概了解下Broker的部署结构及其HA。

  1. 一个master

    自己部署玩玩用,测试或者生产环境打算这么搞得,脑子一定上火长泡了。

  2. 一个master一个slave(同步或者异步复制)

    测试环境可以使用,这种部署结构下可以基本保证应用的高可用。当master挂了或者磁盘损坏等异常情况下,采用同步复制方式的,消息不会丢失;采用异步复制的,除了哪些没来的及同步的消息,其他消息都有备份。

  3. 一个master多个slave(同步或者异步复制)

    这种方式个人认为意义不是很大,因为这种情况下不可能开启同步方式来持久化消息,当master挂掉后rmq无法继续接受生产消息,只是多了一些消息的备份。

  4. 多个master多个slave(同步或者异步复制)

    每个master对应一个slave,即有多对master-slave。多个master增加了集群对异常的容忍性,单个master的挂机不会影响topic下其他master上queue的写入,较之情况2更好。

“至少投递一次”语义

rocketmq实现At least Once语义,即每个消息至少投递一次。consumer先pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack 消息。实现上,采用不同的消费策略(ConsumeMessageConcurrentlyService/ConsumeMessageOrderlyService)时,对失败的处理也不一样。

从语义上看,消息被重复消费是允许出现的,不管到底是什么原因导致的。

语义实现

DefaultMQPushConsumerImpl为例,先来看下consumer拉取数据的大致流程。在consumer启动的最后一步,会调用this.mQClientFactory.rebalanceImmediately();来唤醒RebalanceService来进行首次Rebalance。在Rebalance之前consumer已经从namesrv那里获取了订阅topic的配置信息,Rebalance使用既定的分配策略计算出当前消费者可以订阅到哪些queue,分配完成后为所有订阅队列提交一个PullRequestPullMessageService#pullRequestQueue中,消费线程拿到请求开始拉取数据。

1
2
3
4
5
6
7
8
9
// PullMessageService
private void pullMessage(final PullRequest pullRequest) {
// 根据分组信息获取消费实例
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
}
}

最终还是将消费请求转回DefaultMQPushConsumerImpl进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
// 本地消费队列是否已经删除
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}

// 最后一次拉取时间
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

try {
// 服务状态是否正常
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}

// 是否已经暂停从broker继续拉取消息,哪些情况下会暂停?
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
// 延迟执行本次拉取请求
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}

long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 本地尚未消费的消息数量已经大于预定的阈值
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

// 本溪尚未消费的消息总大小是否已经超过预定的阈值
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

if (!this.consumeOrderly) {
// 如果不是顺序消费,此时要判断最老消息和最新消息的跨度,如果跨度超过预定值,延缓拉取
// processQueue内部采用TreeMap来存消息,消息的offset为key,所以本地消费存储天然有序
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
} else {
// 忽略顺序消费处理
}

final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
// 订阅的信息,具体信息看SubscriptionData类
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}

final long beginTimestamp = System.currentTimeMillis();

// 拉到消息后的回掉处理
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);

long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
// 如果拉取的消息为空,立马开始下次拉取
// 为什么消息为空,但是状态是FOUND?因为存在tag过滤,broker根据tag的hash值做初步过滤,本地根据tag的字符串
// 做过滤,当出现hash冲突时,可能出现该种情况。
// 为什么broker要用hash值呢?可以看下broker怎么为一条消息做索引(即consumeQueue是如何设计的)
// consumeQueue的每个元素是定长的,如果使用真实的tag值,无法做到定长,但hash值长度固定。
// consumeQueue可以当作是一个大数组,当知道数据的偏移(index)时,访问的时间复杂度是O(1)
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

// 统计信息
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

// 将消息放到本地消费队列,内部为TreeMap结构存储
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 提交消费请求
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}

break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
// 没有拉取到新消息或者没有符合匹配信息的消息
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
// 偏移量非法
break;
default:
break;
}
}
}

@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
// 如果是集群消费方式,从RemoteBrokerOffsetStore获取消费本地消费的偏移量
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}

// ...省略...

try {
// 通过api向broker拉取消息
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}

以上我们看到了consumer端如发起拉取及处理消息的流程,最终在callback中通过调用具体的消费服务的submitConsumeRequest方法来提交ConsumeRequest。此处以ConsumeMessageConcurrentlyService为例来看,方法的实现比较简单:将本次拉取的消费按照设置的批量消费批次大小(默认为1)进行划分,每个批次都为一个ConsumeRequest implements Runnable,然后提交至消费线程池,等待被消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// ConsumeRequest in ConsumeMessageConcurrentlyService
class ConsumeRequest implements Runnable {

public void run() {
// ...omit check...

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;

ConsumeMessageContext consumeMessageContext = null;

// ...omit hook...

long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
// 不清楚重新设置有何意义?[在这个视频中有提到](https://pan.baidu.com/s/19VDHbOnENzoGxUkhiYgj3Q)
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
// 设置开始消费时间
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
// 业务消费消息
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
// omit log
hasException = true;
}
// 判断消费状态
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}

// omit hook

if (null == status) {
// omit log
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

// omit hook and stats

if (!processQueue.isDropped()) {
// 处理消费结果进行ACK
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
}

public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();

if (consumeRequest.getMsgs().isEmpty())
return;
// stats data
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// omit
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
// 根绝ackIndex将失败消息发回到broker的retry队列
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

// 发送到retry队列失败的,本地进行延时消费
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);

this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}

// 更新消费offset
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

我们看到在消费失败时,消息会被重新投递到retry队列或者本地再次消费,在这种设计下就保证了消息至少被消费一次。

消费分配策略

以集群消费方式采用默认分配策略为例,看下如何做队列的分配。

目前看到的能够触发rebalance的入口有这么几个:

  1. consumer刚启动时,自己进行rebalance,其实这和情况2一致,自己启动意味着consumer数量变更;
  2. broker感知到consumer数量变化时,即有consumer的新增或者减少时,会推消息给相同group内的consumer;
  3. cosumer定时rebalance,默认20000毫秒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// RebalanceImpl
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING:
// omit
break;
case CLUSTERING: {
// topic下所有queue的集合, 注意!!!此处是从某个namesrv获取
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// topic下某个consumerGroup中所有consumer集合,注意!!!此处是从某个broker获取
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
// omit check

if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);

// 排序
Collections.sort(mqAll);
Collections.sort(cidAll);

// 默认 AllocateMessageQueueAveragely
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(this.consumerGroup,
this.mQClientFactory.getClientId(), mqAll, cidAll);
} catch (Throwable e) {
// omit log
return;
}

Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}

// Rebalance后,更新本地的queue信息,消费者提交PullRequest,从新队列拉取消息
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
// omit log
// When rebalance result changed, should update subscription's version to notify broker.
// 将订阅消息立马上报给所有broker,即立马发送心跳
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// AllocateMessageQueueAveragely
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
// omit check

// 当前consumer在排序后consumer中的下标
int index = cidAll.indexOf(currentCID);

// 一般情况下,messageQueue的数量不小于consumer的数量,如果messageQueue数量比consumer数量少,则会出现consumer饿死的情况。
int mod = mqAll.size() % cidAll.size();

// 1. 如果queue数量不大于cosumer数量 -> 平均数量为1
// 2. 如果queue数量大于cosumer数量:
// 能整除时 -> 平均值
// 不能整除时 -> 根据consumer在所有consumer中的排序位置确定是取平均值加1 还是 取平均值
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());

// 确定开始位置
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}

我们注意到rebalance时的cosumer列表和queue列表从不同的地方获取,这可能导致两者数据不一致,即使都从namesrv获取,因为namesrv之间没有强一致性,所以相互间数据也可能短暂不一致,这时consumer自己来做rebalance时的基础数据不一致,可能就会出现同一队列被多个consumer同时订阅的情况。可以把AllocateMessageQueueAveragely的分配代码自己捞出来做下测试。

CAP理论

CAP理论不再赘述。从CAP理论中我们可知,一致性和可用性是矛盾的。根据rocketmq的设计理念及具体实现来看,比如:namesrv之间无通信;需要通信的节点间以oneway发送通知等,可以看出rocketmq是优先保证高可用特性的,所以,出现一致性问题在所难免。我们也知道cosumer有任务定时做rebalance,所以它可以保证只要在网络或者通信恢复正常后,整个集群的数据状态最终是一致的。

总结

以上描述的场景只是rmq导致重复消费的某个特定场景,当然重复消费不至这些,比如:当某个consumer的消费consumeOffset在没有上报给远程服务(或者持久化)之前,宿主机的电源线被熊孩子给拔掉了,当其他consumer重新消费这个队列是,获取的consumeOffset便是落后的,于是也产生了消息的重复;又比如,ConsumeMessageConcurrentlyService模式下,部分为成功消费的消息会重新投递回broker,如果投递失败,本地尝试重新消费,这时消息也重复了。

我们以“又比如”描述的为基础来回顾下“至少消费一次”语义,在本地重新消费上次失败的消息时再次失败,并且此时jvm crash或者restart,那么这批消费失败的消息就不会再被消费,从这里也可以看出,rcoketmq不保证消息一定被正常消费,只保证“至少消费一次”。

书上得来终觉浅,绝知此事要躬行。

贴出的源码均基于release-4.3.2,为了更好的表达描述的重点,贴出的源代码会有所删减。

小生不才,以上如有描述有误的地方还望各位不吝赐教 !^_^!

参考

ZooKeeper和CAP理论及一致性原则

CAP 定理的含义