0%

rocketmq-半入门级架构及核心流程概览

一直在用rocketmq,对他的功能和大概流程略知一些,但是比较浮,经不起稍微的推敲。是时候进一步了解下这个NB的中间件了。

这里不再赘述它的那些特性,网上一大堆,这里主要按照自己想了解的一些方面作整理,贴出部分核心代码,意图通过表现各个角色间的交互,勾画大致架构,方便以后对每个要点各个深入。如果想要引导来阅读源代码,推荐rmq源码系列,写的很有诚意,本笔记中也部分参考引用其文章。

部署结构(逻辑结构/物理结构)

消息中间件的整体看起来像我们相互邮寄明信片。李雷(Producer)通过邮局公告(Namesrv)找到邮局的地址,然后去邮局(Broker)把明信片(Message)发送给韩梅梅(Consumer)。

物理结构

  • Producer 消息生产者,负责产生消息,一般由业务系统负责产生消息。

  • Consumer 消息消费者,负责消费消息,一般是后台系统负责异步消费。

  • Broker 消息中转角色,负责存储消息,转发消息,一般也称为 Server。

  • Namesrv 注册中心,管理协调角色,维护其他角色信息。

rmq物理部署.png

逻辑结构

逻辑结构里面把所有角色都集群化了,集群化后就要牵扯到集群消费。于是,基于以上的角色,又衍生出其他的概念。

  • Producer Group 一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。

  • Consumer Group 一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。

rmq逻辑部署.png

角色实现及其交互

按照逻辑/物理部署结构,各个角色间怎么通信?怎么保活?数据怎么备份?

rmq自己对Netty进行了包装(Facade),方便各个角色使用,在工程的remoting子工程中。

namesrv(Name Server)与broker

namesrv与broker是实现后续一切的基础,producer和consumer的信息传递需要broker来中转,那么找到或者说确定一个broker便是第一步了。

namesrv(Name Server)

namesrv是很轻量的服务,简单、可集群横向扩展、无状态。他是整个mq的管家(像是Dubbo的服务发现),管理其他角色,如果没有它,整个mq将难以开展工作。

简单看下namesrv启动时的行为。它的核心代码在工程的namesrv包下,代码量很少。

namesrv的故事开始于NamesrvStartup的main方法,它主要来解析配置参数,根据配置装填并初始化和启动NamesrvControllerNamesrvController贯连整个Namesrv的业务功能,通过Netty向外暴露服务,主要负责:

  • 管理Topic和Broker的注册信息

  • 管理KV的配置(持久化)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// NamesrvController
public boolean initialize() {
// 管理KV的配置(并持久化)
this.kvConfigManager.load();

// 暴露服务
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

// netty执行线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

// 注册服务处理器
this.registerProcessor();

// kv信息打印及broker存活检测任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);

// 省略TSL配置部分
}

RouteInfoManager相当于是管家的账本,里面保存着关于topic、broker的具体信息,这些信息全部保存在内存中。其维护的信息如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//RouteInfoManager

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

/**
* BrokerData {
* private String cluster;
* private String brokerName;
* private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
* }
**/
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

/**
* BrokerLiveInfo {
* private long lastUpdateTimestamp;
* private DataVersion dataVersion;
* private Channel channel;
* private String haServerAddr;
}
**/
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

那么,这些信息从哪里来呢?且慢,我们先看下broker。

broker

broker就是例子中邮局,邮局要具备什么能力呢?

  1. 接收信件
  2. 保存信件,保证除不可抗因素外,信件不丢失
  3. 投递信件
  4. 高效工作,信件快速接受及送达
  5. 信件投递失败的处理
  6. 信件投递可回溯追踪(当然,邮局不能看内容了)
  7. 向管家(namesrv)汇报自己的运营状态

以上这些其实也就是broker要做的事情,只不过它不存在隐私一说罢了。当然他还需要有其他的特性。

  1. 支持消息的不同投递模型:Publish/Subscribe,集群分组消费等
  2. 高可用,无单点
  3. 消息有序
  4. 消息过滤
  5. 分布式事务

broker的主要源码放在broker目录下,broker(其他模块)的启动模型与namesrv一致,BrokerStartup解析配置,并根据配置装备BrokerController,然后初始化BrokerController并启动(start)。进行初始化时的主要启动一堆定时任务、存储(MessageStore)和Netty配置(注册Processor)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// BrokerController
public boolean initialize() throws CloneNotSupportedException {
boolean result = this.topicConfigManager.load();

// 消费者消费信息
result = result && this.consumerOffsetManager.load();
// 订阅组
result = result && this.subscriptionGroupManager.load();
// 过滤器
result = result && this.consumerFilterManager.load();

// 省略messageStore相关处理
result = result && this.messageStore.load();

if (result) {
// 暴露服务,启动两个Server,小端口和VIPChannel有关
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);

// 省略创建各Processor的Executor
// 注册Processor
this.registerProcessor();

// 持久化消费信息
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
BrokerController.this.consumerOffsetManager.persist();
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

// 持久化过滤器信息
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
BrokerController.this.consumerFilterManager.persist();
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

// 设置namesrv地址,如果没有设置,则从一个web服务fetch
if (this.brokerConfig.getNamesrvAddr() != null) {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
// 同步topic、consumerOffset、订阅组等信息到Salve
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
BrokerController.this.slaveSynchronize.syncAll();
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
// 如果是Master则定时打印与Salve的差异
}

// 省略TSL设置及事物初始化
}
}

public void start() throws Exception {
// 省略 启动初始化的那些服务

// 向所有namesrv注册自己
this.registerBrokerAll(true, false, true);

// 定时向所有namesrv注册自己
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
}

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
// 封装broker上的topic信息
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

// 调用API向namesrv注册
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}

此时,我们可以看到的交互,每个broker启动时会先对namesrv寻址,然后把自己的信息注册到所有的namesrv上,并定时维护(心跳),namesrv维护的关于broker的信息来源于此,为了应对不同的场景需要,namesrv把数据封装成不同的结构,方面维护。namesrv关于请求的处理在DefaultRequestProcessor中,透过其processRequest方法我们可以明确其要承担的全部责任,比如:broker注册/注销,增删改查topic路由,KV信息维护等。broker注册的信息包括但是不限于以下内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
"topicConfigSerializeWrapper": {
"topicConfigTable":{
"topic_?":{
"defaultReadQueueNums":"16",
"defaultWriteQueueNums":"16",
"topicName":"",
"readQueueNums":"",
"writeQueueNums":"",
"perm":"",
"topicFilterType":"",
"topicSysFlag":"",
"order":""
},
},
"dataVersion":{
"timestamp":"xxxx",
"counter":"xxxx"
}
},
"filterServerList":[
"",//filterServerAddr
],
"clusterName":"",
"brokerAddr":"",
"brokerName":"",
"brokerId":""
}

namesrv与producer

公告(namesrv)维护好了,李雷(producer)就可以通过它来获取邮局(broker)的信息了。

先来看下producer的启动时主要做了些什么事情,producer的主要代码放在工程的client子工程下的producer包中。抛开事务不谈,我们的入口在DefaultMQProducer的start方法上。
下面列出启动时我们关心的业务,启动流程时序图,网上也不少了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// DefaultMQProducerImpl
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// groupName合法性校验
this.checkConfig();

// MixAll.CLIENT_INNER_PRODUCER_GROUP 用来发送消费者消费失败的消息到重试队列
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
// 如果不是内部构建的
this.defaultMQProducer.changeInstanceNameToPID();
}

// MQClientManager 以 clientId(ip@pid) 维度保证 MQClientInstance 的单例
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

// 向 MQClientInstance 注册自己
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("...");
}

// Just for testing or demo program
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

if (startFactory) {
// 启动MQClientInstance
mQClientFactory.start();
}

this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("...");
default:
break;
}

// 向broker发送心跳&&上传filter信息
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

这里我们需要说下rmq中client(Producer、Consumer和Admin)的基本结构,不同角色的client通过Composite一个MQClientInstance的方式封装各自不同的逻辑,所以他们的启动流程大概一致,Consumer的复杂一些,但主流程无差别:

  1. 通过clientId获取MQClientInstance实例
  2. 自己角色特有的逻辑
  3. MQClientInstance注册自己
  4. 启动MQClientInstance,如果MQClientInstance已启动则返回
  5. 通过MQClientInstance方法同步更新自己角色需要的信息

按照以上总结我们着重看下MQClientInstance的start做了些什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// MQClientInstance
public void start() throws MQClientException {

switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;

// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}

// Start request-response channel
// 启动remotingClient,开启与外部交互的通道
this.mQClientAPIImpl.start();

// Start various schedule tasks
this.startScheduledTask();

// Start pull service
// push模式下拉消息服务
this.pullMessageService.start();

// Start rebalance service
// 开启负载均衡消费队列服务
this.rebalanceService.start();

// Start push service
// 上个代码片段提到的 CLIENT_INNER_PRODUCER_GROUP 对应的Producer
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
case RUNNING:
case SHUTDOWN_ALREADY:
default:
break;
}
}

private void startScheduledTask() {

// 如果没有配置namesrv的地址,默认从web服务定时抓取
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}

// 从namesrv(NamesrvController)获取topic的路由信息
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

// 和broker的交互
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
// 清理已离线的Broker
MQClientInstance.this.cleanOfflineBroker();
// 向所有Broker发送心跳,心跳中带有consumer相关信息:clientId,Subscription等
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

// 持久化消费偏移量(持久化到远程或者本地)
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
MQClientInstance.this.persistAllConsumerOffset();
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

// 调整push方式下的拉取线程数
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
MQClientInstance.this.adjustThreadPool();
}, 1, 1, TimeUnit.MINUTES);
}

MQClientInstance启动时会启动一个从namesrv定时搜集本地所有订阅(consumer)的和发送(producer)的topic,然后依次通过mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3)来进行更新topic的路由信息。

问题:负载均衡怎么做的?

namesrv与consumer

正如上文说的那样,Producer和Consumer都组合了MQClientInstance,而MQClientInstance来完成其和namesrv的交互,所以它的过程和producer是一致的,他们需要维护的信息也是一致的。

producer与broker

producer与broker的交互有以下几个关注点:producer支持发送消息的方式?发送失败producer怎么处理?broker怎么处理producer的消息?

消息发送

producer发送方式有SYNC、ASYNC、ONEWAY,这些方式定义在CommunicationMode中,但是这块从DefaultMQProducer定义的send方法命名中没有明确区分同步和异步,在我当前的版本中ASYNC的实现存在问题(It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.)。

我们以最简单的入口(DefaultMQProducer#send(Message))大致看下producer的发送流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// DefaultMQProducerImpl
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) {

// 省略合法性检查

final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
// 选择发送topic信息,此步骤数据由·MQClientInstance·中启动的定时任务维护
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 同步模式下,总发送次数确定,默认失败重试2次,也就是共尝试发送3次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 选择本次发送的队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
try {
beginTimestampPrev = System.currentTimeMillis();
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
// 超时中断
break;
}
// 封装信息,并调用ClientAPI发送出去
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
// 更新本次borker可用性,如果sendLatencyFaultEnable开启
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
// 如果SYNC模式下发送失败进行重试,ASYNC和ONEWAY模式下直接返回null
} catch (Exception e) {
// 根据不同的异常类型确认处理方式:重试、中断、返回
// 更新本次borker可用性,如果sendLatencyFaultEnable开启
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
}
} else {
// 没有可用队列不进行发送
break;
}
}
// 省略异常处理
}
// 省略异常处理
}

那么发送消息的怎么确认发送队列呢?sendLatencyFaultEnable是什么?

先通过topic找到对应的topicPublishInfo,顾名思义topicPublishInfo里面维护了该topic相关的broker和队列信息,在没有开启sendLatencyFaultEnable情况下(默认),按照递增取模的方式选择即将要发送的队列,如果开启了sendLatencyFaultEnable,再取模后还需要判断当前队列的broker是否可用。上面代码中我们可以看到发送消息后,不管成功还是失败都会更新一个叫做Item的状态,这个Item指的就是broker,rmq根据此次消耗的时间来更新该broker不可用的时间,以此达到对不稳定broker的规避。具体代码可看·MQFaultStrategy·。

消息接收

上面说到BrokerController时,我们未贴出关于broker的NettyServer关于Processor的注册部分。在这部分中,注册了不同的Processor:

  1. SendMessageProcessor 处理client发送的消息
  2. PullMessageProcessor 处理client来拉取消息
  3. QueryMessageProcessor 处理对消息的查询
  4. EndTransactionProcessor 处理事务消息

这些Processor分别处理不同的RequestCode。我们这次关心的是SendMessageProcessor

处理的流程比较简单,这里只列出基本流程,可从SendMessageProcessor#processRequest看起。

  1. 检查broker是否可以对外提供服务(是否到了配置的对外服务时间)
  2. 消息合法性校验、broker是否可写、消息写入队列是否存在等检查
  3. 如果是消费失败重试消息则处理其是否要进入死信队列
  4. 转换请求信息封装为MessageExtBrokerInner
  5. MessageExtBrokerInner发给MessageStore进行持久化
  6. 处理MessageStore返回的结果,并根据结果用BrokerStatsManager做统计更新
  7. 封装相应信息并返回。

consumer与broker

韩梅梅终于要收到信了。和现实一样,要么邮递员送给你,要么你自己上邮局去取信。这两种方式对应了rmq中的两种去消息方式:push(DefaultMQPushConsumer)和pull(DefaultMQPullConsumer)。这两种方式在定义上是有区别的,但实现上实际都是pull,只是对pull的处理不同。说到这里就不得不提长轮询了,pull方式我们客户端需要自己起一个线程定时去broker拉取信息,当broker没有消息可消费时立刻返回;push方式与pull流程一致,但是broker对其请求处理不同,当broker此刻无消息可消费时,broker会hold住当前请求,直到有消息返回或者到了超时时间返回。戳这里(–>长轮询–<–>长轮询进阶–<)更深入了解长轮询,感谢伟大的互联网让信息容易共享。

此处以push模式下的MessageListenerConcurrently消费策略来梳理下获取消息的流程。
DefaultMQPushConsumer相比DefaultMQProducerImpl多启动了几个服务:负载均衡服务(consumer平均分配队列)、维护offset服务、消息消费服务(consumeMessageService)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// DefaultMQPushConsumer
public synchronized void start() throws MQClientException {
// 省略配置检查及服务的启动

// 更新订阅topic信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();

// 每个topic随机选取一个broker检查subscription语法是否正确,请求code:RequestCode.CHECK_CLIENT_CONFIG
this.mQClientFactory.checkClientInBroker();

// 向broker发送心跳,心跳携带client相关信息
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

// 进行负载均衡,默认策略为`AllocateMessageQueueAveragely`,调整完后发起第一个pullRequest
this.mQClientFactory.rebalanceImmediately();
}

consumer定时拉取服务是在MQClientInstance中启动的一个叫做pullMessageService(PullMessageService)的定时服务,这个服务监听在一个pullRequestQueue队列上,当有拉取请求时,根据PullRequest选取对应分组的DefaultMQPushConsumerImpl进行拉取。

1
2
3
4
5
6
7
8
9
public class PullRequest {
private String consumerGroup;
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private long nextOffset;
private boolean lockedFirst = false;

// ...
}

PullRequest中有两个Queue,messageQueue是指要从broker哪个队列拉数据,ProcessQueue是拉数据成功后存放数据的队列。有了processQueue便可以做一些简单的流控,目前可以根据processQueue的消息数量或者消息的大小来决定是否停止本次拉取,并设置下次拉取的延迟而不是立即开始下次拉取。如果流控通过,consumer开始通过pullAPIWrapper向broker拉去信息,拉取成功后提交一个消费任务(默认非CONSUME_MESSAGE_DIRECTLY,如果此时线程池等待队列已满,任务延迟提交),消费任务回掉我们注册的listener,消息至此投递完成。

投递完成后,消费的结果怎么处理?我们刚提到ProcessQueue可以进行流控,那么合适出队以消费的消息?消费失败又需要做什么处理?这里不在展开,可见ConsumeMessageConcurrentlyService类中。

除了怎么去消费消息外,对consumer而言还有另外一个需要解决的问题:怎么给采用CLUSTERING方式的消费群组成员平均分配队列?答案在AllocateMessageQueueAveragely中,该策略保证消息队列被平均分配给同消费组内的消费者(图片来源)。

rmq消费者结构

消息存储

数据的落盘、存储、数据结构、目录结构这些请戳–>broker的数据持久化<–

源代码入口 CommitLog#putMessage(final MessageExtBrokerInner msg)
重要的一些基础 MappedFile-JavaJava NIOBtree

此处贴上一张关于broker数据结构及流转的图,更直观来看他的数据结构流向(图片来源)。
rmq数据结构转换.png

broker与broker(HA)

broker有这几种角色ASYNC_MASTER、SYNC_MASTER、SLAVE,消息肯定都是写到MASTER上的,然后同步给SLAVE。

同步的方式分为SYNC和ASYNC两种:

  • ASYNC,消息写完后就直接返回,后台线程去做同步的操作;
  • SYNC,等到同步完成后返回。

这两种方式也称作异步复制和同步双写。HA的整个过程只在store模块做的,是基于JDK的nio来写的,没有依赖remoting模块。

除此之外,broker启动时,如果当前broker是SLAVE,则会启动一个定时任务来同步topic等信息。

1
2
3
4
5
6
7
// SlaveSynchronize
public void syncAll() {
this.syncTopicConfig();
this.syncConsumerOffset();
this.syncDelayOffset();
this.syncSubscriptionGroupConfig();
}

结语

以上只是对整体架构和主流成的简单认知,关于其中的细节问题,就需要我们带着疑问去看源码了。

问题:
怎么处理定时消息?
消费失败的消息怎么处理?
链接是否像dubbo那样复用?
broker的数据结构是怎样的?
broker怎么保证存储高吞吐?
broker怎么过滤消息?

“古人学问无遗力,少壮工夫老始成。纸上得来终觉浅,绝知此事要躬行。” – 陆游《冬夜读书示子聿》

以上贴出源代码基于incubator-rocketmq release-4.3.2版本,为了方便均有删减改,但不影响实际流程。

小生不才,以上如有描述有误的地方还望各位不吝赐教 !^_^!

参考资料

RocketMQ 原理简介

rocketmq 用户指南

RocketMQ 最佳实践

译-apache-rocketmq用户指南

誓嘉(rmq作者)文档

RocketMQ 运维指令

rmq源码系列

RocketMQ源码学习(三)-Broker(与Producer交互部分)

Java NIO-阅读笔记及总结

深入浅出MappedByteBuffer

rmq数据结构转换图片来源