0%

rocketmq5.x-定时/延迟消息实现.md

延迟或者定时消息在使用场景上有两种:消息消费失败重试;业务直接发送定时消息。
这两种场景broker上的实现可能不同,如果开启了时间轮的配置(timerWheelEnable,默认开启),业务上的延迟消息发送时只要设置了DeliveryTimestamp字段,消息的定时服务就会使用时间轮;消费失败的重试则依然使用老的实现方式,不同的延迟等级对应不同的内部延迟队列。
下面我们分着两种场景来看下具体的实现逻辑。

消费失败重试

消息发送

delayLevel发送消息时默认是0,发送端没有地方修改这个值,broker端根据消息的ReconsumeTimes确定消息当前的DelayTimeLevel = 3 + msgExt.getReconsumeTimes(),即初始值为3。
消息被通过命令RequestCode.CONSUMER_SEND_MSG_BACK发回,ReconsumeTimes会在broker处理时加1。

发送入口ConsumeMessageConcurrentlyService#sendMessageBack,重试发送的topic以%RETRY%打头,后面跟上GID。消息发回的请求没有携带完整的原消息,只携带了messageId。
PS:consumer启动的时候,除了订阅topic本身的队列外,还会订阅topic对应的重试队列。

消息处理

broker处理逻辑入口AbstractSendMessageProcessor#consumerSendMsgBack,保存消息时,会先查出原消息,提交一个到%RETRY%GID的消息,存储时,在DefaultMessageStore#asyncPutMessage的前置处理器中,对DelayTimeLevel字段大于0的(对应消息属性中DELAY),会改变消息topic为SCHEDULE_TOPIC_XXXX,这个是系统内部topic,对消费者不可见。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void transformDelayLevelMessage(BrokerController brokerController, MessageExtBrokerInner msg) {

if (msg.getDelayTimeLevel() > brokerController.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(brokerController.getScheduleMessageService().getMaxDelayLevel());
}

// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC);
// 根据延迟等级确定消息的队列
msg.setQueueId(ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()));
}

不同级别的延迟队列消费者
默认延迟队列时间:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
每个延迟级别都对应一个consumerQueue,会根据consumerQueue的个数(配置的延迟时间的个数)启动相同数量的消费者。

消费者实际是ScheduledExecutorService控制的定时任务,每隔100ms从队列拉取一次,检查是否有过期的消息,如果没有则进入下一个定时,如果有则还原消息体,并提交对原topic中。
注意:这里的consumerQueue是天然有序的,因为消息的延迟时间以broker存储消息时间为基数,加上对应的延迟时间获得,所以,后到的消息一定在队列的后面。

延迟队列的内部消费入口是ScheduleMessageService#start,这里不再贴代码。

延迟/定时消息

消息发送

延迟消息和普通消息一样,都通过命令RequestCode.SEND_MESSAGE发出。
发送的详细见官方文档-定时/延时消息,需要关注的有两处:1.定时消息创建topic需要增加特定的参数 2.发送消息时设置对应的交付时间setDeliveryTimestamp(deliverTimeStamp)

PS:也可以看官方的时间轮基准测试怎么发的消息 org.apache.rocketmq.example.benchmark.timer.TimerProducer

消息处理

普通消息和消费失败重新投递的处理入口在SendMessageProcessor#processRequest中,对消息进行简单的处理后最后走到消息存储服务DefaultMessageStore#asyncPutMessage,4.x版本这里mq的分层做的不是很好,在存储服务里面杂糅了延时和事务消息的逻辑,使得这层不是很纯粹,5.x版本做了一些优化,把这些逻辑当做前置的hook,所以,这里处理的核心逻辑在HookUtils#handleScheduleMessage中。源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
final MessageExtBrokerInner msg) {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 判断是否事务消息相关的类型
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 判断是否系统topic:“rmq_sys_wheel_timer”,这个topic就是走时间轮时的系统内部topic
if (!isRolledTimerMessage(msg)) {
// 判断是否时间轮的延迟消息
if (checkIfTimerMessage(msg)) {
// 判断是否开启时间轮能力
if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
//wheel timer is not enabled, reject the message
return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null);
}
// 方法内计算了延迟的时间、置换了消息的topic="rmq_sys_wheel_timer"和queueId=0,mq惯用的手法
PutMessageResult transformRes = transformTimerMessage(brokerController, msg);
if (null != transformRes) {
return transformRes;
}
}
}
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
transformDelayLevelMessage(brokerController, msg);
}
}
return null;
}

public static boolean checkIfTimerMessage(MessageExtBrokerInner msg) {
if (msg.getDelayTimeLevel() > 0) {
// 如果是走延迟等级实现的的消息,清空走时间轮的相关属性,算是一个校验
if (null != msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS)) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_TIMER_DELIVER_MS);
}
if (null != msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC)) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_TIMER_DELAY_SEC);
}
if (null != msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS)) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_TIMER_DELAY_MS);
}
return false;
//return this.defaultMessageStore.getMessageStoreConfig().isTimerInterceptDelayLevel();
}
//double check
if (TimerMessageStore.TIMER_TOPIC.equals(msg.getTopic()) || null != msg.getProperty(MessageConst.PROPERTY_TIMER_OUT_MS)) {
return false;
}
// 只要有特定属性即认为是延迟消息,消息发送时传的是DeliveryTimestamp,对应MessageConst.PROPERTY_TIMER_DELIVER_MS
return null != msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS) || null != msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS) || null != msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC);
}

Hook处理后,后面topic(rmq_sys_wheel_timer)构建queue流程都一样了,我们直接跳到对应queue的消费者。

broker处理流程见下图。
rocketmq-timer-wheel

整理这张图的时候主要关注了几点:

  1. 每个任务的分工是比较明确的,任务之间两两成对构成生产-消费模型;
  2. mq惯用的替换topic手法,以此对业务消费者透明;
  3. 对超过时间轮表达范围或者超过log保存时间的定时消息,采用滚动的手段(即:先设置一个当前支持的最大超时窗口,等消息到期后,再走一遍入轮流程,通过这种滚动的方式把时间消耗掉,直到消息到时被投递回原topic)来支持,这样,延迟消息就可以支持任意长的时间了。

跟着上面的图去撸代码流程,应该没什么问题,这里贴出有关数据结构部分的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TimerWheel {
// 忽略非核心代码
public TimerWheel(String fileName, int slotsTotal, int precisionMs) throws IOException {
// 默认 TIMER_WHEEL_TTL_DAY * DAY_SECS,即 7 * 每天的秒数
this.slotsTotal = slotsTotal;
// 默认 1000ms,即 1s
this.precisionMs = precisionMs;
this.fileName = fileName;
// timerlog的文件长度,默认 slotsTotal的2倍
this.wheelLength = this.slotsTotal * 2 * Slot.SIZE;
}

// 获取指定时间的槽
public Slot getRawSlot(long timeMs) {
// 因为每个槽的大小固定,所以定位到槽位后乘以槽的大小即可知道文件中当前槽的记录位置
localBuffer.get().position(getSlotIndex(timeMs) * Slot.SIZE);
return new Slot(localBuffer.get().getLong() * precisionMs,
localBuffer.get().getLong(), localBuffer.get().getLong(), localBuffer.get().getInt(), localBuffer.get().getInt());
}

public int getSlotIndex(long timeMs) {
// 需要将时间的精度调整到与精度一致,因为timerlog的文件长度是slotsTotal的两倍,所以取slot位置时,用时间对(slotsTotal * 2)取余
return (int) (timeMs / precisionMs % (slotsTotal * 2));
}
}

使用的限制

经过以上分析,我们知道在底层设计上rocketmq是怎样支持无限期的延迟消息的,不过在当前开源release版本和商业版本中,延迟消息最长时间限制为24小时,并且不可配置。

1
2
3
4
5
6
7
8
9
10
11
// org.apache.rocketmq.proxy.grpc.v2.producer#SendMessageActivity
protected void validateDelayTime(long deliveryTimestampMs) {
// maxDelay默认为24小时
long maxDelay = ConfigurationManager.getProxyConfig().getMaxDelayTimeMills();
if (maxDelay <= 0) {
return;
}
if (deliveryTimestampMs - System.currentTimeMillis() > maxDelay) {
throw new GrpcProxyException(Code.ILLEGAL_DELIVERY_TIME, "the max delay time of message is too large, max is " + maxDelay);
}
}

遗留的疑问

时间轮一个滴答时,会通过消息中prevPos找出当前slot上的所有的消息,这些消息全部放在内存中,如果消息不需要滚动,那么还需要查出原消息,TimerRequest内存占用是可以估算的,但是原消息的大小是不定的,假设某个滴答消息量很大,broker的内存就会很吃紧,所以,想要使用RocketMQ做超时系统时,要考虑下自己的任务量以及同一时刻出现大量到期的情况。

摘自官方公众号的Q&A

  • 问题一:在我的理解中,timerLog中存储的是CommitLog中的消息的索引,但是CommitLog中的消息是有存储时间上限的,如果要收发长时间定时消息(半个月),该定时消息方案是如何避免消息丢失的问题的?

    针对长时间定时消息,该方案采用滚动的方式避免消息的丢失。举例来说,若该时间轮只能存储3天的消息(同样的,CommitLog中也只能存储3天消息),那在消息尚未到达三天时,便先将其按到时处理取出,再发回CommitLog。这样一来,消息重新进入CommitLog,销毁时间又可以重新开始计算,消息丢失的问题便解决了。

  • 问题二:如果在定时消息发送完毕后机器宕机,再次重启时该方案的时间轮和TimerLog的恢复流程是怎样的?

    由于在定时消息的接收过程中,TimerLog和TimerWheel都是有定时的持久化操作的,因此宕机对已经持久化进入磁盘的文件影响不大。在此过程中可能受到影响并需要恢复的仅有尚未进行刷盘的部分消息。对此,我们设置了Checkpoint文件,以记录TimerLog中已经被TimerWheel记录的消息offset。在重新启动时,将从该checkpoint记录的位置重新开始向后遍历TimerLog文件,并开始订正TimerWheel每一格中的头尾消息索引。

  • 问题三:在该方案中,取回定时消息时是否有可能存在大量随机读导致污染pagecache的情况?

    定时消息的写入在timerLog中是顺序的,因此有可能出现定时久的消息写在前面,而即将到时的消息出现在timerLog的尾部的情况。确实,出现这种情况时,随机读是不可避免的:当TimerWheel中的某一格到时,将前往TimerWheel中检索消息的位置,再进一步到CommitLog中取消息。若要避免这个情况,势必要对消息的写入作进一步优化:排序,或者按时间轮的定位情况写入多个文件。但是这样可能带来另一个问题:大量的随机写。正如俗话说的,“读写难两全”。由于定时消息对于写入更加敏感,所以可以牺牲一定的读性能来保障写入的速度——当然,在性能测试中,该方案的读性能同样令人满意。

小生不才,以上如有描述有误的地方还望各位不吝赐教 !^_^!

源代码版本:release-5.1.4 贴出的源代码会有所删减

参考

RIP-43 Support timing messages with arbitrary time delay
官方文档-定时/延时消息
社区在讨论什么?《Support Timing Messages with Arbitrary Time Delay》