前言
之前有转载过一篇关于分布式事务最终一致的MQ实现的文章,当时也是碰到了分布式事务的情形,最后按照文章的思路利用rmq实现了数据的最终一致。不太清楚分布式事务的,可以先看下这边文章了解下。
PS:本篇默认你已经了解rmq的一些基础并看过部分源代码,建议在看该篇时,先看下官方的文档RocketMQ事务消息。
牵涉到的分布式的话题一般都会提到CAP,从知乎上打捞来一份比较好解释。
1 2 3 4 5 6 7 8 9
| P 意指分区容忍性。 所谓分区指的是网络分区的意思。详细一点解释,比如你有A B两台服务器,它们之间是有通信的,突然,不知道为什么,它们之间的网络链接断掉了。 好了,那么现在本来AB在同一个网络现在发生了网络分区,变成了A所在的A网络和B所在的B网络。所谓的分区容忍性,就是说一个数据服务的多台服务器在发生了上述情况的时候, 依然能继续提供服务。所以显而易见的,P是大前提,如果P发生了,咱们的数据服务直接不服务了,还谈个毛的可用性和一致性呢。因此CAP要解释成,当P发生的时候,A和C只能而选一。 举个简单的例子,A服务器B服务器同步数据,现在A B之间网络断掉了,那么现在发来A一个写入请求,但是B却没有相关的请求,显然,如果A不写,保持一致性,那么我们就失去了A的服务, 但是如果A写了,跟B的数据就不一致了,我们自然就丧失了一致性。这里设计就涉及到架构师的选择了。注意这里的一致性是强一致性,意思是AB的数据时刻都是同步的, 如果我们放弃了强一致性,不代表我们的数据就是一定是不一致的了,我们可以让A先写入本地,等到通信恢复了再同步给B,这就是所谓的最终一致性,长远的看我们的数据还是一致的, 我们只是在某一个时间窗口里数据不一致罢了。如果这个时间窗口小过了用户逻辑处理的时间。那么其实对于用户来说根本毛都感觉不到。 最终一致性有个很有意思的协议叫gossip就跟传八卦一个意思,我就把我收到里信息里我本地没有的部分加到我本地,再把这个信息发出去,那么长远的看,网络时好时坏, 但是最终所有人都会有所有的信息。因此我们还是能够保证数据的最终一致性的。综上,CAP应该描述成,当发生网络分区的时候,如果我们要继续服务,那么强一致性和可用性只能2选1。
|
事务消息之前的实现
rmq不提供事务消息之前,通过“本地事务存根+rmq消息语义”来实现事务的最终一致性。
如上图,其实就是每个涉及到分布式事务的应用自己要有一张表来存储需要发送的消息,保证消息一定可以在本地事务完成后可以被发送到broker端。然后依赖“至少消费一次”的rmq语义来确保消息的投递,当然,消费端需要做幂等设计。
这么做除了各个业务段冗余一张表和一个兜底任务外,也无不妥。
rmq实现
事务消息的主要目标:确保本地事务执行完成后,一定会有通知到broker端。要达成这个目标,broker就需要知道每个在执行的事务,并且能在事务超时未发送结束消息时主动去问询事务执行的情况,核对完事务执行情况后再决定是否将事务完成消息推送给consumer。那rmq怎么收集执行的本地事务呢?这个当然需要事务本身在开启前进行上报。简单来讲事务消息就是要实现预约-履约-回查
的场景。
rmq事务消息围绕着两个topic展开RMQ_SYS_TRANS_HALF_TOPIC
(half_topic)和RMQ_SYS_TRANS_OP_HALF_TOPIC
(op_half_topic)。
1 2 3
| `RMQ_SYS_TRANS_HALF_TOPIC`用来记录一个事务; `RMQ_SYS_TRANS_OP_HALF_TOPIC`用来记录事务的执行状态。 这两个topic均为rmq的系统topic。
|
结合上图,描述下事务消息的一般流程:
发送开启事务消息(half消息)。
服务端响应消息响应写入结果。该步骤中会默认将目标topic和queueId进行替换,源topic和queueId当作属性值记到消息体中,half消息对生产和消费方均不可见。
根据发送结果执行本地事务,并发送(异步发送)本地事务执行的状态。
根据本地事务状态执行Commit或者Rollback。如果是Commit,就取出half消息,新建消息,并拷贝half消息的内容,同时把topic和queueId设置为half消息属性值中的topic和queueId,即:还原源消息,然后将新建的消息进行存储并在op_half_topic
创建操作记录,此时,消费方可以看到并消费事务消息。
PS:改变消息topic是rmq的常用方法,延时消息也是靠这种方式实现。
Producer
rmq在生产Client中使用模版消息封装了事务开启消息及实务完结消息的发送,开发者只需要实现自己本地事务和定义事务消息数据结构即可(DefaultMQProducerImpl#sendMessageInTransaction
)。
事务消息存在三种状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public enum LocalTransactionState {
COMMIT_MESSAGE,
ROLLBACK_MESSAGE,
UNKNOW, }
|
至于状态对应的场景,自己撸下代码吧,比较简单。
Broker
Broker端对Producer事务响应的处理代码在EndTransactionProcessor
中,逻辑也比较简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| OperationResult result = new OperationResult(); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; } }
|
不管事务结束消息成功与否,总会记录一条操作消息(RMQ_SYS_TRANS_OP_HALF_TOPIC
(op_half_topic)),这个topic究竟有什么用呢?
我们知道rmq对文件是连续写随机读的,这样就意味着我们不可能想操作数据库那样可以update/delete一条记录,所以rmq在处理事务消息的消息补偿-回查
逻辑时就利用RMQ_SYS_TRANS_HALF_TOPIC
(half_topic)和RMQ_SYS_TRANS_OP_HALF_TOPIC
(op_half_topic)这两个topic来判断哪些消息处理完成了,哪些消息需要发起回查。
在broker启动时,会创建一个任务(消费者)来定时消费这两个topic,具体消费代码在TransactionalMessageServiceImpl#check()
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
| String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); for (MessageQueue messageQueue : msgQueues) { long startTime = System.currentTimeMillis(); MessageQueue opQueue = getOpQueue(messageQueue); long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); List<Long> doneOpOffset = new ArrayList<>(); HashMap<Long, Long> removeMap = new HashMap<>(); PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
int getMessageNullCount = 1; long newOffset = halfOffset; long i = halfOffset; while (true) { if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { break; } if (removeMap.containsKey(i)) { removeMap.remove(i); } else { GetResult getResult = getHalfMsg(messageQueue, i); MessageExt msgExt = getResult.getMsg(); if (msgExt == null) { if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) { break; } if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) { break; } else { i = getResult.getPullResult().getNextBeginOffset(); newOffset = i; continue; } } if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { listener.resolveDiscardMsg(msgExt); newOffset = i + 1; i++; continue; } if (msgExt.getStoreTimestamp() >= startTime) { log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp())); break; }
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); long checkImmunityTime = transactionTimeout; String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { newOffset = i + 1; i++; continue; } } } else { if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) { break; } } List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) { if (!putBackHalfMsgQueue(msgExt, i)) { continue; } listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); continue; } } newOffset = i + 1; i++; } if (newOffset != halfOffset) { transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); } long newOpOffset = calculateOpOffset(doneOpOffset, opOffset); if (newOpOffset != opOffset) { transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); } }
|
上面这段代码便是补偿逻辑的核心了,看懂这块代码,整个事务消息的处理也就基本明白的差不多了。
总结
在看这段代码时我有一个疑问:
- 为什么消息回查时,要重新提交一条消息到half队列呢?
自己思考的答案:
- 因为rmq顺序写,同时消费进度需要向前推进,不能够因为某个消息有问题影响消息的处理(单线程处理),基于rmq的底层实现和特性,提交一条新的消息除了占用一些存储空间外,处理问题的复杂度和时间消耗均能得到保证。
小生不才,以上如有描述有误的地方还望各位不吝赐教 !^_^!
源代码版本:release-4.5.0 贴出的源代码会有所删减
参考
CAP
seata
消息事务样例