RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚) 原 荐

栏目: 后端 · 发布时间: 5年前

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

  阿里云云栖社区 发布于 23分钟前

字数 1041

阅读 2

收藏 0

程序员们,在北上广你还能买房吗? >>> RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚) 原 荐

摘要: 事务消息提交或回滚的实现原理就是根据commitlogOffset找到消息,如果是提交动作,就恢复原消息的主题与队列,再次存入commitlog文件进而转到消息消费队列,供消费者消费,然后将原预处理消息存入一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC,代表该消息已被处理;回滚消息与提交事务消息不同的是,提交事务消息会将消息恢复原主题与队列,再次存储在commitlog文件中。

本文将重点分析RocketMQ Broker如何处理事务消息提交、回滚命令,根据前面的介绍,其入口EndTransactionProcessor#processRequest:

OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {        // @1
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);    // @2
      if (result.getResponseCode() == ResponseCode.SUCCESS) {  // @3
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);    // @4
          if (res.getCode() == ResponseCode.SUCCESS) {
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());     // @5
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());    // @6
                RemotingCommand sendResult = sendFinalMessage(msgInner);                              // @7
                if (sendResult.getCode() == ResponseCode.SUCCESS) {             
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());    // @8
                }
                return sendResult;
           }
          return res;
     }
}

代码@1:如果请求为提交事务,进入事务消息提交处理流程。

代码@2:提交消息,别被这名字误导了,该方法主要是根据commitLogOffset从commitlog文件中查找消息返回OperationResult实例:

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚) 原 荐
  • private MessageExt prepareMessage :消息对象。
  • private int responseCode:查找结果。
  • private String responseRemark :错误提示。

代码@3:如果成功查找到消息,则继续处理,否则返回给客户端,消息未找到错误信息。

代码@4:验证消息必要字段。

验证消息的生产组与请求信息中的生产者组是否一致。

验证消息的队列偏移量(queueOffset)与请求信息中的偏移量是否一致。

验证消息的commitLogOffset与请求信息中的CommitLogOffset是否一致。

代码@5:调用endMessageTransaction方法,该方法主要的目的就是恢复事务消息的真实的主题、队列,并设置事务ID。

代码@6:设置消息的相关属性,这一步应该直接在endMessageTransaction中实现就好,统一恢复原消息的数量,特别关注的是取消了事务相关的系统标记。

代码@7:发送最终消息,其实现原理非常简单,调用MessageStore将消息存储在commitlog文件中,此时的消息,会被转发到原消息主题对应的消费队列,被消费者消费。

代码@8:删除预处理消息(prepare),其实是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理(提交或回滚)。

上述就是事务消息提交的流程,事务回滚类似,接下来大概分析一下事务消息回滚的流程。

EndTransactionProcessor#processRequest
 else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
       result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);    // @1
       if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());   // @2
            }
           return res;
       }
}

代码@1:回滚消息,其实内部就是根据commitlogOffset查找消息。

代码@2:将消息存储在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表该消息已被处理,与提交事务消息不同的是,提交事务消息会将消息恢复原主题与队列,再次存储在commitlog文件中。

事务消息在Broker服务端的提交回滚流程就介绍到这了。其核心实现就是根据commitlogOffset找到消息,如果是提交动作,就恢复原消息的主题与队列,再次存入commitlog文件进而转到消息消费队列,供消费者消费,然后将原预处理消息存入一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC,代表该消息已被处理;回滚消息与提交事务消息不同的是,提交事务消息会将消息恢复原主题与队列,再次存储在commitlog文件中。

作者: 丁威

原文链接

本文为云栖社区原创内容,未经允许不得转载。

© 著作权归作者所有

共有人打赏支持

上一篇: 利用MaxCompute建立数据仓库的超强实战手册

下一篇: RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚) 原 荐

阿里云云栖社区

粉丝 75

博文 840

码字总数 1818934

作品 0

朝阳

提问

相关文章 最新文章

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

摘要: 事务消息提交或回滚的实现原理就是根据commitlogOffset找到消息,如果是提交动作,就恢复原消息的主题与队列,再次存入commitlog文件进而转到消息消费队列,供消费者消费,然后将原预...

zhaowei121

今天

0

0

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

摘要: RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想。 RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以...

阿里云云栖社区

今天

0

0

RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

摘要: 初步展示了事务消息的发送流程,总的说来,RocketMQ的事务消息发送使用二阶段提交思路,首先,在消息发送时,先发送消息类型为Prepread类型的消息,然后在将该消息成功存入到消息服务...

zhaowei121

今天

0

0

rocketmq事务消息入门介绍

说明 周五的时候发了篇:Rocketmq4.3支持事务啦!!!,趁着周末的时候把相关内容看了下,下面的主要内容就是关于RocketMQ事务相关内容介绍了。 说明: 今天这篇仅仅是入门介绍,并没有涉及到...

匠心零度

2018/10/29

0

0

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚) 原 荐
RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查

摘要: 本文详细分析了RocketMQ事务消息实现原理中的事务状态回查实现,RocketMQ会默认一分钟的频率处理消息状态为Prepare的消息,通过调用消息生产者的事务状态查询接口得知消息的事务状态,...

zhaowei121

今天

0

0

没有更多内容

加载失败,请刷新页面

加载更多
RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地...

阿里云官方博客

21分钟前

3

0

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚) 原 荐
RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

摘要: 事务消息提交或回滚的实现原理就是根据commitlogOffset找到消息,如果是提交动作,就恢复原消息的主题与队列,再次存入commitlog文件进而转到消息消费队列,供消费者消费,然后将原预...

阿里云云栖社区

23分钟前

2

0

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚) 原 荐
前后端分离架构中接口测试最佳实践

背景 你是否遇到过这种情况:前后端分离体系中,后端开发人员通知你接口有改动,前端代码需要做相应调整。but,改了啥没有明确说明,这时候需要一个个页面去点击,一个个按钮去人肉测试,好不...

dkvirus

27分钟前

4

0

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚) 原 荐
如何实现对hashmap的 排序 操作?

我们都知道,JAVA的HashMap数据结构是一个散列存储模型,采用拉链法存储<K,V>数据。其内存模型如下: 采用散列存储将每一个Entry存入后,由于散列算出的hash码不定,所以无法得出存入元素的先...

阮少爷

32分钟前

1

0

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚) 原 荐
webuploader 图片上传样式美化

工具类: webuploader 效果 上传前: 上传后悬浮: html <div class="ip-group float"> <label>上传封面:</label> <div class="upload-cover" id="fileList" th:attr="data-bucket=$......

lemos

34分钟前

2

0

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚) 原 荐

没有更多内容

加载失败,请刷新页面

加载更多

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Programming in Haskell

Programming in Haskell

Graham Hutton / Cambridge University Press / 2007-1-18 / GBP 34.99

Haskell is one of the leading languages for teaching functional programming, enabling students to write simpler and cleaner code, and to learn how to structure and reason about programs. This introduc......一起来看看 《Programming in Haskell》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

html转js在线工具
html转js在线工具

html转js在线工具