内容简介:REST微服务的分布式事务实现-基于消息中间件
上一篇文章 REST微服务的分布式事务实现-分布式事务以及JTA介绍 中,试着带大家理解事务,然后介绍了分布式事务、它的原则和实现方式。这一部分,我们就来详细看看如何使用消息中间件来实现分布式事务。
我们还是使用之前的实例,一个订票系统的购票逻辑:
这篇教程的源代码可以从 github 上获取。
使用消息中间件实现分布式事务,也就是使用事件驱动实现。在这种方式下,Order服务不会直接调用User服务,而是往MQ上发一个消息,说明有新订单需要扣费;User服务会响应这个消息,并处理,处理完成后再发一个消息,说明有新订单需要转移票;然后就会有Ticket服务来处理。而每个服务都是在一个事务里面处理读消息、处理业务、写消息的事情。大致流程如下:
在这种方式下,订单的处理是异步的,用户发起一个订单的时候,只是生成一个正在处理的订单,然后通过消息中间件一步步的进行扣费、交票、完成订单等逻辑。而每一个服务中相应的操作,基本都是:
- 从一个队列中读取消息
- 操作相应数据库操作
- 往下一个队列中发送消息
也就是说,需要在这个方法中需要操作数据库和MQ两个资源,这正好是上一篇文章中介绍Spring内部事务和外部事务时使用的实例中的场景。下面就是大致的代码:
@JmsListener(destination = "order:new", containerFactory = "orderFactory") @Transactional public void create(OrderDTO orderDTO){ Order order = new Order(orderDTO); order.setStatus("PENDING"); orderRepository.save(order); jmsTemplate.convertAndSend("order:need_to_pay", order); }
它监听MQ的”order:new”队列,处理订单,往”order:need_to_pay”发送一个消息。然后用户服务就会接收这个消息,触发扣费流程。
在这个地方,我们可以使用JTA事务,来使用两阶段提交来实现两个资源的共同提交,但是这会影响系统的性能。而且,还需要使用的消息中间件实现了XA的规范,提供两阶段提交的功能。
这里也可以使用本地事务,这时,每个事物都会有一个JMS的Session,并使用事务。如此一来,就存在一个数据库的事物和一个JMS的事务,两个事务是相互独立并依次提交的。这样,就有可能在极少数情况下出错,但是也能采取一些错误来尽量解决。我们对上面的事务处理展开(伪代码,只是为了说明处理过程),来看看出错的情况以及该如何处理:
jmsTransaction.begin(); // get transactions from jms session dbTransaction.begin(); // get transactions from JDBC connection try { orderRepository.save(order); jmsTemplate.convertAndSend("order:need_to_pay", order); dbTransaction.commit(); jmsTransaction.commit(); } catch(Exception e) { dbTransaction.rollback(); jmsTransaction.rollback(); }
在上面的方法中,只要发生了错误,MQ消息的消费就算失败,MQ的监听器就会重新触发一次这个方法。
这时,如果错误发生在:
- 数据提交时或之前。这时,整个数据库的操作都会被重置(也可能就根本还没更新),重试的时候不需要考虑重复提交的问题,因为之前的提交都已经被回滚。
- 数据库提交成功,但是JMS提交失败。这时就需要防止重复提交来避免数据库的重复操作。
我们可以采用之前说过的token方式,在调用这个方法前,生成一个唯一的token。这里使用 Java 的UUID生成一个ID作为token。(如果这里的重复调用只是在这个服务内部重新触发,就不需要考虑分布式系统的全局一致性ID的问题。这需要根据实际情况来判断用什么样的UUID生成方式)所以,Controller里面接受购票请求如下:
@PostMapping(value = "/") @Transactional public void create(@RequestBody OrderDTO orderDTO){ String uid = UUID.randomUUID().toString(); orderDTO.setToken(uid); jmsTemplate.convertAndSend("order:new", orderDTO); }
然后在Service里面监听这个队列,处理购票:
@JmsListener(destination = "order:new", containerFactory = "orderFactory") @Transactional public void create(OrderDTO orderDTO){ if (!this.processedUIDs.contains(orderDTO.getToken())) { Order order = new Order(orderDTO); order.setStatus("PENDING"); orderRepository.save(order); orderDTO.setStatus(order.getStatus()); orderDTO.setId(order.getId()); } else { LOG.info("Duplicate jms message:{}", orderDTO); } jmsTemplate.convertAndSend("order:need_to_pay", orderDTO); processedUIDs.add(orderDTO.getToken()); }
简单来说,解决办法就是,如果是重复触发的,就略过数据库相关的处理,直接往MQ的目标队列发送需要的数据。使得整个流程能够往下走。
刚才说的是在一个服务内出错的情况,还有一种错误情况是,订单服务和用户服务已经处理完订单创建和扣费的操作,然后到了Ticket服务的时候,却发现没有票了。虽然我们可以通过合理的设计业务逻辑来避免这种问题,例如,在操作之前先检查用户余额,检查并锁票,然后进行操作数据的事情。但是,在有些情况下,很难通过业务流程的设计来完全避免这种问题。如果出现了这种的问题,我们也可以通过撤销的流程来实现,业务流程如下:
在上面的解决方案中,使用JDK的UUID类生成一个ID,实际上这个ID只是在当前的JVM内,才能够保证是唯一的。其次,在JMS的标准中,没有规定一个消息的Listener在读取一个消息失败后,重新读取的问题。在微服务环境中,如果一个应用部署了多个实例,那个这个消息有可能会被另一个实例读到。所以在上面的方案中,使用JVM内的唯一ID放在消息的内容中,它有可能被任意一个实例处理,处理失败后,又有可能被另一个实例处理。这就会出问题。所以我们需要一个分布式环境下的生成唯一ID的解决办法。例如,先获得JVM的唯一ID以后,再加上IP+端口等信息。而且,对已经处理过的ID的缓存,也需要存在分布式环境中。
所以,我们完全可以不使用两阶段提交,就实现微服务架构下的分布式事务。使用这种方式,它的优点是:
- 实现简单。结合Spring的事务,几乎不用写额外的事务相关的代码,就能够实现。我们只需要更好的服务的拆分和设计业务流程。
- 系统吞吐量高。因为数据库或MQ不会被长期的锁住,可以并发的处理更多的事务。
- 容错性好。各个服务之间通过MQ来触发协调,即使在处理一个任务的时候有一个服务停了,消息还会一直保持,直到服务起来开始监听,然后继续触发这个任务。
当然这种方式也有一些缺点,最大的问题就是异步处理的问题。用户发出一个请求后,处理该业务的服务只是简单处理,往MQ发送消息开始处理流程,然后就返回了。这时候这个任务还在处理。虽然有时候我们可以通过等的方式,等待最终处理完成的消息,然后在返回给用户。但是这样又得考虑响应时间、超时、各种错误等情况。
有些人会觉得这种方式使得开发和调试都变得复杂,在我看来,恰恰相反,这使得开发和调试都简单了。首先,根据微服务架构的设计原则,就是每个服务只负责一个功能模块;再者,根据面向对象的设计原则,一个方法只做一件事情。如果我们能够合理的拆分服务,和每一步的处理方法,这正是一个好的设计。在维护的时候,每个方法、每个步骤做什么事情,都很清楚。
说到调试,我的原则是,你应该通过单元测试来发现和解决问题,而不是调试。以上面的购票流程为例,每一个服务,通过MQ触发一个方法的时候,它的参数应该是什么、状态是什么都应该是明确的,这个方法执行完成后,会产生什么新的数据,状态会更新成什么,都应该是明确的。而这些都可以通过单元测试来很好的测试。如果你的复杂流程中的每一个都能通过单元测试进行完善的测试,那么这些方法串联到一起,不但能够很好的工作,也能应付各种异常的情况。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 分布式消息中间件-Rocketmq
- 基于消息中间件的分布式事务
- 滴滴开源分布式消息中间件产品 DDMQ
- LAF-DTX 分布式事务中间件
- 开源分布式中间件 DBLE 快速入门指南
- 原生分布式数据库与中间件的区别
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。