一直在用rocketmq,对他的功能和大概流程略知一些,但是比较浮,经不起稍微的推敲。是时候进一步了解下这个NB的中间件了。
这里不再赘述它的那些特性,网上一大堆,这里主要按照自己想了解的一些方面作整理,贴出部分核心代码,意图通过表现各个角色间的交互,勾画大致架构,方便以后对每个要点各个深入。如果想要引导来阅读源代码,推荐rmq源码系列 ,写的很有诚意,本笔记中也部分参考引用其文章。
部署结构(逻辑结构/物理结构) 消息中间件的整体看起来像我们相互邮寄明信片。李雷(Producer)通过邮局公告(Namesrv)找到邮局的地址,然后去邮局(Broker)把明信片(Message)发送给韩梅梅(Consumer)。
物理结构
Producer 消息生产者,负责产生消息,一般由业务系统负责产生消息。
Consumer 消息消费者,负责消费消息,一般是后台系统负责异步消费。
Broker 消息中转角色,负责存储消息,转发消息,一般也称为 Server。
Namesrv 注册中心,管理协调角色,维护其他角色信息。
逻辑结构 逻辑结构里面把所有角色都集群化了,集群化后就要牵扯到集群消费。于是,基于以上的角色,又衍生出其他的概念。
角色实现及其交互 按照逻辑/物理部署结构,各个角色间怎么通信?怎么保活?数据怎么备份?
rmq自己对Netty进行了包装(Facade ),方便各个角色使用,在工程的remoting 子工程中。
namesrv(Name Server)与broker namesrv与broker是实现后续一切的基础,producer和consumer的信息传递需要broker来中转,那么找到或者说确定一个broker便是第一步了。
namesrv(Name Server) namesrv是很轻量的服务,简单、可集群横向扩展、无状态。他是整个mq的管家(像是Dubbo的服务发现),管理其他角色,如果没有它,整个mq将难以开展工作。
简单看下namesrv启动时的行为。它的核心代码在工程的namesrv 包下,代码量很少。
namesrv的故事开始于NamesrvStartup
的main方法,它主要来解析配置参数,根据配置装填并初始化和启动NamesrvController
。NamesrvController
贯连整个Namesrv的业务功能,通过Netty向外暴露服务,主要负责:
管理Topic和Broker的注册信息
管理KV的配置(持久化)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public boolean initialize () { this .kvConfigManager.load(); this .remotingServer = new NettyRemotingServer (this .nettyServerConfig, this .brokerHousekeepingService); this .remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl ("RemotingExecutorThread_" )); this .registerProcessor(); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable () { @Override public void run () { NamesrvController.this .routeInfoManager.scanNotActiveBroker(); } }, 5 , 10 , TimeUnit.SECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable () { @Override public void run () { NamesrvController.this .kvConfigManager.printAllPeriodically(); } }, 1 , 10 , TimeUnit.MINUTES); }
RouteInfoManager
相当于是管家的账本,里面保存着关于topic、broker的具体信息,这些信息全部保存在内存中。其维护的信息如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private final HashMap<String, List<QueueData>> topicQueueTable;, String> brokerAddrs; * } **/ private final HashMap<String, BrokerData> brokerAddrTable;private final HashMap<String, Set<String>> clusterAddrTable;private final HashMap<String, BrokerLiveInfo> brokerLiveTable;private final HashMap<String, List<String>> filterServerTable;
那么,这些信息从哪里来呢?且慢,我们先看下broker。
broker broker就是例子中邮局,邮局要具备什么能力呢?
接收信件
保存信件,保证除不可抗因素外,信件不丢失
投递信件
高效工作,信件快速接受及送达
信件投递失败的处理
信件投递可回溯追踪(当然,邮局不能看内容了)
向管家(namesrv)汇报自己的运营状态
以上这些其实也就是broker要做的事情,只不过它不存在隐私一说罢了。当然他还需要有其他的特性。
支持消息的不同投递模型:Publish/Subscribe,集群分组消费等
高可用,无单点
消息有序
消息过滤
分布式事务
broker的主要源码放在broker目录下,broker(其他模块)的启动模型与namesrv一致,BrokerStartup
解析配置,并根据配置装备BrokerController
,然后初始化BrokerController
并启动(start)。进行初始化时的主要启动一堆定时任务、存储(MessageStore)和Netty配置(注册Processor)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public boolean initialize () throws CloneNotSupportedException { boolean result = this .topicConfigManager.load(); result = result && this .consumerOffsetManager.load(); result = result && this .subscriptionGroupManager.load(); result = result && this .consumerFilterManager.load(); result = result && this .messageStore.load(); if (result) { this .remotingServer = new NettyRemotingServer (this .nettyServerConfig, this .clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this .nettyServerConfig.clone(); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2 ); this .fastRemotingServer = new NettyRemotingServer (fastConfig, this .clientHousekeepingService); this .registerProcessor(); this .scheduledExecutorService.scheduleAtFixedRate(() -> { BrokerController.this .consumerOffsetManager.persist(); }, 1000 * 10 , this .brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); this .scheduledExecutorService.scheduleAtFixedRate(() -> { BrokerController.this .consumerFilterManager.persist(); }, 1000 * 10 , 1000 * 10 , TimeUnit.MILLISECONDS); if (this .brokerConfig.getNamesrvAddr() != null ) { this .brokerOuterAPI.updateNameServerAddressList(this .brokerConfig.getNamesrvAddr()); } else if (this .brokerConfig.isFetchNamesrvAddrByAddressServer()) { this .scheduledExecutorService.scheduleAtFixedRate(() -> { BrokerController.this .brokerOuterAPI.fetchNameServerAddr(); }, 1000 * 10 , 1000 * 60 * 2 , TimeUnit.MILLISECONDS); } if (BrokerRole.SLAVE == this .messageStoreConfig.getBrokerRole()) { this .scheduledExecutorService.scheduleAtFixedRate(() -> { BrokerController.this .slaveSynchronize.syncAll(); }, 1000 * 10 , 1000 * 60 , TimeUnit.MILLISECONDS); } else { } } } public void start () throws Exception { this .registerBrokerAll(true , false , true ); this .scheduledExecutorService.scheduleAtFixedRate(() -> { BrokerController.this .registerBrokerAll(true , false , brokerConfig.isForceRegister()); }, 1000 * 10 , Math.max(10000 , Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000 )), TimeUnit.MILLISECONDS); } public synchronized void registerBrokerAll (final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { TopicConfigSerializeWrapper topicConfigWrapper = this .getTopicConfigManager().buildTopicConfigSerializeWrapper(); doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); }
此时,我们可以看到的交互,每个broker启动时会先对namesrv寻址,然后把自己的信息注册到所有的namesrv上,并定时维护(心跳),namesrv维护的关于broker的信息来源于此,为了应对不同的场景需要,namesrv把数据封装成不同的结构,方面维护。namesrv关于请求的处理在DefaultRequestProcessor
中,透过其processRequest方法我们可以明确其要承担的全部责任,比如:broker注册/注销,增删改查topic路由,KV信息维护等。broker注册的信息包括但是不限于以下内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 { "topicConfigSerializeWrapper" : { "topicConfigTable" : { "topic_?" : { "defaultReadQueueNums" : "16" , "defaultWriteQueueNums" : "16" , "topicName" : "" , "readQueueNums" : "" , "writeQueueNums" : "" , "perm" : "" , "topicFilterType" : "" , "topicSysFlag" : "" , "order" : "" } , } , "dataVersion" : { "timestamp" : "xxxx" , "counter" : "xxxx" } } , "filterServerList" : [ "" , ] , "clusterName" : "" , "brokerAddr" : "" , "brokerName" : "" , "brokerId" : "" }
namesrv与producer 公告(namesrv)维护好了,李雷(producer)就可以通过它来获取邮局(broker)的信息了。
先来看下producer的启动时主要做了些什么事情,producer的主要代码放在工程的client子工程下的producer包中。抛开事务不谈,我们的入口在DefaultMQProducer
的start方法上。 下面列出启动时我们关心的业务,启动流程时序图,网上也不少了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public void start (final boolean startFactory) throws MQClientException { switch (this .serviceState) { case CREATE_JUST: this .serviceState = ServiceState.START_FAILED; this .checkConfig(); if (!this .defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this .defaultMQProducer.changeInstanceNameToPID(); } this .mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this .defaultMQProducer, rpcHook); boolean registerOK = mQClientFactory.registerProducer(this .defaultMQProducer.getProducerGroup(), this ); if (!registerOK) { this .serviceState = ServiceState.CREATE_JUST; throw new MQClientException ("..." ); } this .topicPublishInfoTable.put(this .defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo ()); if (startFactory) { mQClientFactory.start(); } this .serviceState = ServiceState.RUNNING; break ; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException ("..." ); default : break ; } this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); }
这里我们需要说下rmq中client(Producer、Consumer和Admin)的基本结构,不同角色的client通过Composite 一个MQClientInstance
的方式封装各自不同的逻辑,所以他们的启动流程大概一致,Consumer的复杂一些,但主流程无差别:
通过clientId获取MQClientInstance
实例
自己角色特有的逻辑
向MQClientInstance
注册自己
启动MQClientInstance
,如果MQClientInstance
已启动则返回
通过MQClientInstance
方法同步更新自己角色需要的信息
按照以上总结我们着重看下MQClientInstance
的start做了些什么。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public void start () throws MQClientException { switch (this .serviceState) { case CREATE_JUST: this .serviceState = ServiceState.START_FAILED; if (null == this .clientConfig.getNamesrvAddr()) { this .mQClientAPIImpl.fetchNameServerAddr(); } this .mQClientAPIImpl.start(); this .startScheduledTask(); this .pullMessageService.start(); this .rebalanceService.start(); this .defaultMQProducer.getDefaultMQProducerImpl().start(false ); this .serviceState = ServiceState.RUNNING; break ; case START_FAILED: throw new MQClientException ("The Factory object[" + this .getClientId() + "] has been created before, and failed." , null ); case RUNNING: case SHUTDOWN_ALREADY: default : break ; } } private void startScheduledTask () { if (null == this .clientConfig.getNamesrvAddr()) { this .scheduledExecutorService.scheduleAtFixedRate(() -> { MQClientInstance.this .mQClientAPIImpl.fetchNameServerAddr(); }, 1000 * 10 , 1000 * 60 * 2 , TimeUnit.MILLISECONDS); } this .scheduledExecutorService.scheduleAtFixedRate(() -> { MQClientInstance.this .updateTopicRouteInfoFromNameServer(); }, 10 , this .clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); this .scheduledExecutorService.scheduleAtFixedRate(() -> { MQClientInstance.this .cleanOfflineBroker(); MQClientInstance.this .sendHeartbeatToAllBrokerWithLock(); }, 1000 , this .clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); this .scheduledExecutorService.scheduleAtFixedRate(() -> { MQClientInstance.this .persistAllConsumerOffset(); }, 1000 * 10 , this .clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); this .scheduledExecutorService.scheduleAtFixedRate(() -> { MQClientInstance.this .adjustThreadPool(); }, 1 , 1 , TimeUnit.MINUTES); }
MQClientInstance
启动时会启动一个从namesrv定时搜集本地所有订阅(consumer)的和发送(producer)的topic,然后依次通过mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3)
来进行更新topic的路由信息。
问题:负载均衡怎么做的?
namesrv与consumer 正如上文说的那样,Producer和Consumer都组合了MQClientInstance
,而MQClientInstance
来完成其和namesrv的交互,所以它的过程和producer是一致的,他们需要维护的信息也是一致的。
producer与broker producer与broker的交互有以下几个关注点:producer支持发送消息的方式?发送失败producer怎么处理?broker怎么处理producer的消息?
消息发送 producer发送方式有SYNC、ASYNC、ONEWAY,这些方式定义在CommunicationMode
中,但是这块从DefaultMQProducer
定义的send方法命名中没有明确区分同步和异步,在我当前的版本中ASYNC的实现存在问题(It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.)。
我们以最简单的入口(DefaultMQProducer#send(Message)
)大致看下producer的发送流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 private SendResult sendDefaultImpl ( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) { final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; TopicPublishInfo topicPublishInfo = this .tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false ; MessageQueue mq = null ; Exception exception = null ; SendResult sendResult = null ; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this .defaultMQProducer.getRetryTimesWhenSendFailed() : 1 ; int times = 0 ; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this .selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null ) { mq = mqSelected; try { beginTimestampPrev = System.currentTimeMillis(); long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true ; break ; } sendResult = this .sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this .updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false ); } catch (Exception e) { this .updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true ); } } else { break ; } } } }
那么发送消息的怎么确认发送队列呢?sendLatencyFaultEnable是什么?
先通过topic找到对应的topicPublishInfo,顾名思义topicPublishInfo里面维护了该topic相关的broker和队列信息,在没有开启sendLatencyFaultEnable情况下(默认),按照递增取模的方式选择即将要发送的队列,如果开启了sendLatencyFaultEnable,再取模后还需要判断当前队列的broker是否可用。上面代码中我们可以看到发送消息后,不管成功还是失败都会更新一个叫做Item的状态,这个Item指的就是broker,rmq根据此次消耗的时间来更新该broker不可用的时间,以此达到对不稳定broker的规避。具体代码可看·MQFaultStrategy·。
消息接收 上面说到BrokerController
时,我们未贴出关于broker的NettyServer关于Processor的注册部分。在这部分中,注册了不同的Processor:
SendMessageProcessor 处理client发送的消息
PullMessageProcessor 处理client来拉取消息
QueryMessageProcessor 处理对消息的查询
EndTransactionProcessor 处理事务消息
…
这些Processor分别处理不同的RequestCode
。我们这次关心的是SendMessageProcessor
。
处理的流程比较简单,这里只列出基本流程,可从SendMessageProcessor#processRequest
看起。
检查broker是否可以对外提供服务(是否到了配置的对外服务时间)
消息合法性校验、broker是否可写、消息写入队列是否存在等检查
如果是消费失败重试消息则处理其是否要进入死信队列
转换请求信息封装为MessageExtBrokerInner
将MessageExtBrokerInner
发给MessageStore
进行持久化
处理MessageStore
返回的结果,并根据结果用BrokerStatsManager
做统计更新
封装相应信息并返回。
韩梅梅终于要收到信了。和现实一样,要么邮递员送给你,要么你自己上邮局去取信。这两种方式对应了rmq中的两种去消息方式:push(DefaultMQPushConsumer
)和pull(DefaultMQPullConsumer
)。这两种方式在定义上是有区别的,但实现上实际都是pull,只是对pull的处理不同。说到这里就不得不提长轮询了,pull方式我们客户端需要自己起一个线程定时去broker拉取信息,当broker没有消息可消费时立刻返回;push方式与pull流程一致,但是broker对其请求处理不同,当broker此刻无消息可消费时,broker会hold住当前请求,直到有消息返回或者到了超时时间返回。戳这里(–>长轮询–< 和–>长轮询进阶–< )更深入了解长轮询,感谢伟大的互联网让信息容易共享。
此处以push模式下的MessageListenerConcurrently
消费策略来梳理下获取消息的流程。DefaultMQPushConsumer
相比DefaultMQProducerImpl
多启动了几个服务:负载均衡服务(consumer平均分配队列)、维护offset服务、消息消费服务(consumeMessageService)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public synchronized void start () throws MQClientException { this .updateTopicSubscribeInfoWhenSubscriptionChanged(); this .mQClientFactory.checkClientInBroker(); this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this .mQClientFactory.rebalanceImmediately(); }
consumer定时拉取服务是在MQClientInstance
中启动的一个叫做pullMessageService(PullMessageService
)的定时服务,这个服务监听在一个pullRequestQueue队列上,当有拉取请求时,根据PullRequest
选取对应分组的DefaultMQPushConsumerImpl
进行拉取。
1 2 3 4 5 6 7 8 9 public class PullRequest { private String consumerGroup; private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset; private boolean lockedFirst = false ; }
PullRequest
中有两个Queue,messageQueue是指要从broker哪个队列拉数据,ProcessQueue是拉数据成功后存放数据的队列。有了processQueue便可以做一些简单的流控,目前可以根据processQueue的消息数量或者消息的大小来决定是否停止本次拉取,并设置下次拉取的延迟而不是立即开始下次拉取。如果流控通过,consumer开始通过pullAPIWrapper向broker拉去信息,拉取成功后提交一个消费任务(默认非CONSUME_MESSAGE_DIRECTLY,如果此时线程池等待队列已满,任务延迟提交),消费任务回掉我们注册的listener,消息至此投递完成。
投递完成后,消费的结果怎么处理?我们刚提到ProcessQueue可以进行流控,那么合适出队以消费的消息?消费失败又需要做什么处理?这里不在展开,可见ConsumeMessageConcurrentlyService
类中。
除了怎么去消费消息外,对consumer而言还有另外一个需要解决的问题:怎么给采用CLUSTERING方式的消费群组成员平均分配队列?答案在AllocateMessageQueueAveragely
中,该策略保证消息队列被平均分配给同消费组内的消费者(图片来源 )。
消息存储 数据的落盘、存储、数据结构、目录结构这些请戳–>broker的数据持久化<–
源代码入口 CommitLog#putMessage(final MessageExtBrokerInner msg)
重要的一些基础 MappedFile-Java 、Java NIO 、Btree
此处贴上一张关于broker数据结构及流转的图,更直观来看他的数据结构流向(图片来源 )。
broker有这几种角色ASYNC_MASTER、SYNC_MASTER、SLAVE,消息肯定都是写到MASTER上的,然后同步给SLAVE。
同步的方式分为SYNC和ASYNC两种:
ASYNC,消息写完后就直接返回,后台线程去做同步的操作;
SYNC,等到同步完成后返回。
这两种方式也称作异步复制和同步双写。HA的整个过程只在store模块做的,是基于JDK的nio来写的,没有依赖remoting模块。
除此之外,broker启动时,如果当前broker是SLAVE,则会启动一个定时任务来同步topic等信息。
1 2 3 4 5 6 7 public void syncAll () { this .syncTopicConfig(); this .syncConsumerOffset(); this .syncDelayOffset(); this .syncSubscriptionGroupConfig(); }
结语 以上只是对整体架构和主流成的简单认知,关于其中的细节问题,就需要我们带着疑问去看源码了。
问题: 怎么处理定时消息? 消费失败的消息怎么处理? 链接是否像dubbo那样复用? broker的数据结构是怎样的? broker怎么保证存储高吞吐? broker怎么过滤消息?
“古人学问无遗力,少壮工夫老始成。纸上得来终觉浅,绝知此事要躬行。” – 陆游《冬夜读书示子聿》
以上贴出源代码基于incubator-rocketmq release-4.3.2版本,为了方便均有删减改,但不影响实际流程。
小生不才,以上如有描述有误的地方还望各位不吝赐教 !^_^!
参考资料 RocketMQ 原理简介
rocketmq 用户指南
RocketMQ 最佳实践
译-apache-rocketmq用户指南
誓嘉(rmq作者)文档
RocketMQ 运维指令
rmq源码系列
RocketMQ源码学习(三)-Broker(与Producer交互部分)
Java NIO-阅读笔记及总结
深入浅出MappedByteBuffer
rmq数据结构转换图片来源