如何解决事务与非事务的数据一致性问题

栏目: 数据库 · 发布时间: 6年前

内容简介:【51CTO.com原创稿件】作为拥有线上线下大数据的智慧零售平台,苏宁的系统对于并发和高效要求非常高。针对各种苛刻的场景,苏宁都有相应的解决方案。苏宁的售后订单系统每天要处理大量订单的创建,修改以及数据分发的操作。

【51CTO.com原创稿件】

1、业务场景

作为拥有线上线下大数据的智慧零售平台,苏宁的系统对于并发和高效要求非常高。针对各种苛刻的场景,苏宁都有相应的解决方案。

苏宁的售后订单系统每天要处理大量订单的创建,修改以及数据分发的操作。

为了保证高效,我们的数据经过分库分表存储于数据库集群中,同时根据一定的算法将部分活跃订单缓存在Redis,保证订单处理的效率。分发数据时,我们通过苏宁自研的MQ消息平台 (WindQ)向下游系统分发消息,处理效率上可以做到准实时,即消息能够及时被下游接收,并立即通过反馈接口反馈回来,避免实时接口调用时可能发生因网络,下游处理效率问题带来的阻塞。

基于以上的背景,我们遇到了这样的场景:

1. 在创建订单的时候,我们要保存订单到数据库和缓存。

2. 同时要将下发下游的消息保存到数据库,并通过WindQ发往下游系统。

3. 下游系统返回接收数据的结果后,需要根据返回的结果,对保存到数据库的数据进行删除操作。

如何解决事务与非事务的数据一致性问题

图1:业务模型

虽然逻辑简单,但由于数据库操作是在一个事务中,而 Redis 和发消息队列的操作则并非能靠事务控制。

  • 如果缓存成功,但是事务失败,则可能导致我们在系统有了一份异常的订单缓存。而实际上这个订单并不存在。
  • 另外当消息发送到下游了以后,如果下游处理速度非常快,处理结果立刻返回,处理返回结果的程序要去删除一条已经发送成功数据。此时,有可能本地的事务尚未提交,那么,删除操作就做了无用功。最后当事务提交以后,那条应该删掉的数据,就会一直在待处理表中,变成异常数据。

如何解决事务与非事务的数据一致性问题

图2:异常场景

说明:数据通过消息队列发送下游系统,同时保存一份到数据库是为了保证发送队列异常或者下游在接收、处理时发生异常的情况,可以通过数据库保存的数据进行补偿处理的一种机制,当下游系统反馈数据接收正常后,将该数据删除。最终保证上下游数据一致。

业务场景模拟

系统由Business对外提供服务,在此过程中通过OrdeSaver和MessageSender执行具体数据处理功能。

  • Business为业务的入口,所有的业务逻辑由此开始。
  • OrderSaver执行的业务是保存订单以及缓存订单到Redis。
  • MessageSender执行的业务是保存下发数据到下发表中,并将数据发送到WindQ消息队列中。

Business

/**  
 * 完整的业务,有多个数据库操作,以及数据库以外的需要延迟执行的业务逻辑  
 */   
public class Business  {   
    public void saveInfo(Map<String, Object> map){   
        System.out.println("业务开始 事务开启  保存数据操作开始" );   
        new OrderSaver().saveOrderInfo(map);   
        new MessageSender().saveMessageForSend(map);   
        System.out.println("业务结束 事务提交 保存数据操作结束");   
    }   
}   

OrderSaver

/**  
  * 保存服务单的业务逻辑  
  */   
public class OrderSaver {   
           
public void saveOrderInfo(Map<String, Object> map,ExecutorHandler executorHandler){   
       System.out.println("Save order info to datebase");   
    System.out.println("Cache order info into redis");   
  }   
}    

MessageSender

/**  
 * 下发数据的业务逻辑  
 */   
public class MessageSender {   
 
    public void saveMessageForSend(Map<String, Object> map,ExecutorHandler executorHandler){   
     //保存数据到数据库   
        System.out.println("Save create message to datebase.");   
        System.out.println("Send message to windq");   
    }   
}  

业务接口调用

public class Sample {   
    //此处模拟业务接口被调用的情况   
    public static void main(String[] args) {   
    Business business = new Business();   
    Map<String,Object> param = new HashMap<>();   
    business.saveInfo(param);   
   }   
}  

输出结果

业务开始 事务开启 保存数据操作开始

Save order info to datebase

Cache order info into redis

Save create message to datebase.

Send message to windq

业务结束 事务提交 保存数据操作结束

如何解决事务与非事务的数据一致性问题

图3:常规输出

以上场景模拟了常规情况下,我们的业务处理流程,此时也就会相应地出现我们上面所描述地异常。而我们所期望的处理结果应该是:

业务开始 事务开启 保存数据操作开始

Save order info to datebase

Save create message to datebase.

业务结束 事务提交 保存数据操作结束

Cache order info into redis

Send message to windq

如何解决事务与非事务的数据一致性问题

图4:期望的结果

2、解决方案

该问题如何处理呢,这个时候我们就该将缓存到Redis以及下发WindQ的操作延迟到事务提交后处理。这样在事务没有提交前Resdis中不会有数据,WindQ也不会将数据下发。假如事务失败,后续也可以根据异常进行后续处理。即便事务成功缓存Redis或者发送WindQ出错,也可以根据存入数据库的数据进行后续的补偿处理。

2.1 处理方案一:代码转移

我们首先会想到通过代码转移的方式,将逻辑代码移动到事务外面。但这个时候问题来了......

2.1.1 问题一:逻辑割裂

我们为了描述业务模型,将现实场景尽量简化,从模型上看,是可以将两段非事务方法挪到事务外面来操作。但是,由于保存订单和缓存Redis是一套操作,其使用的数据是一致的,保存下发的消息和发送WindQ也是一对呼应的操作。代码是写在一起的,符合逻辑和业务要求有改动时也能够一并处理,拆开的话,对统一数据的处理上给人的感觉就不是一气呵成了。

某些业务中并非只有一两个成对的操作,将多个成对操作的事务-非事务关联型逻辑强行拆开,显得规模浩大,这种方式变得不可取。

另外原先的处理方案中,保存数据库、缓存以及发送WindQ处理的数据是一致的,而拆开写的话,就会导致数据要从前传到后。要保证数据能从里面传到外面也将成为一个问题。

2.2 处理方案二:延迟执行的模型

为了解决第一方案的问题,我们做了以下的设计。

数据要往数据库中存的时候,我们可以先把要处理的数据和要做的动作先定义好,放到一个容器中去,在事务提交后,我们再拿到这个容器,统一将之前定义好的操作和数据取出来,按要求执行。

具体怎么做呢?

经过一番思考,我们构建出了一个模型

ExecutorHandler - Executor

  • Executor 可执行对象,用于定义一个需要执行的逻辑。比如将数据通过WindQ发送,或者将订单刷入缓存。
  • ExecutorHandler容器类,内部保存了一个Executor的列表。

代码逻辑

  1. 在业务代码中,我们将需要执行的业务操作封装到Executor中。
  2. 定义好以后,通过ExecutorHandler的add方法,添加到容器中去。
  3. 在业务逻辑执行的过程中,先进行数据库操作,而非数据库操作只是在对应的位置进行定义,在整个事务完成以后,通过ExecutorHandler的handle方法,遍历所有的Executor对象,执行需要延迟的非事务操作。

如何解决事务与非事务的数据一致性问题

图5:业务模型

Executor

public  interface Executor {   
    void execute();   
}    

ExecutorHandler

public class ExecutorHandler {   
   //需要执行的业务处理对象列表   
   private List<Executor> executors;   
       
   public void handle(){   
     if(!(null == executors || executors.isEmpty())){   
           for(Executor  executor : executors){   
              executor.execute();   
            }   
      }   
   }   
   public void add(Executor executor){   
      if(null == executors){   
           executors = new ArrayList<>();   
      }   
        executors.add(executor);   
    }   
}  

业务接口调用

public class Sample {   
    //此处模拟业务接口被调用的情况   
     public static void main(String[] args) {   
        Business business = new Business();   
         ExecutorHandler handler = new ExecutorHandler();   
         Map<String, Object> param  =  new HashMap<String, Object>();   
         //执行业务方法,开启事务,保存数据   
         business.saveInfo(param, handler);   
         //执行延迟执行的方法   
         handler.handle();   
    }      
 }   

输出结果

业务开始 事务开启 保存数据操作开始

Save order info to datebase

Save create message to datebase.

业务结束 事务提交 保存数据操作结束

Cache order info into redis

Send message to windq

Business

/**  
 * 完整的业务,有多个数据库操作,以及数据库以外的需要延迟执行的业务逻辑  
 */   
public class Business  {   
    public void saveInfo(Map<String, Object> map,ExecutorHandler executorHandler){   
        System.out.println("业务开始 事务开启  保存数据操作开始" );   
        new OrderSaver().saveOrderInfo(map,executorHandler);   
        new MessageSender().saveMessageForSend(map,executorHandler);   
        System.out.println("业务结束 事务提交 保存数据操作结束");   
    }   
}   

MessageSender

/**  
 * 下发数据的业务逻辑  
 */   
public class MessageSender {   
       
    public void saveMessageForSend(Map<String, Object> map,ExecutorHandler executorHandler){   
        //保存数据到数据库   
        System.out.println("Save create message to datebase.");   
        //将要延迟执行的业务逻辑定义好,注册到容器中去   
        executorHandler.add(new Executor() {   
            @Override   
            public void execute() {   
                System.out.println("Send message to windq");   
            }   
        });   
    }   
}    

OrderSaver

 /**  
  * 保存服务单的业务逻辑  
   */   
 public class OrderSaver {   
         
     public void saveOrderInfo(Map<String, Object> map,ExecutorHandler executorHandler){   
         System.out.println("Save order info to datebase");   
          //这就是所谓的回调函数   
         executorHandler.add(new Executor() {   
            @Override   
            public void execute() {   
                System.out.println("Cache order info into redis");   
            }   
        });   
    }   
 }  

2.2.1 问题二:参数传递

以上的方案,解决了延迟执行的问题,但是,此刻我们发现,由于要使用ExecutorHandler,这个时候就需要随时随地将该对象传递下去,要考虑如何降低该对象的侵略性。

  • 静态变量:在使用中需要考虑同步和清理的问题,很容易在多线程的情况下使得逻辑变得混乱,不采用。
  • 成员变量:同样也存在着数据清理的问题,不推荐使用,不采用。

2.2.2 问题二处理方案:使用ThreadLocal参数传递

有没有生命周期是整个线程内的呢?这时我们就需要用到ThreadLocal了。

通过ThreadLocal来获取ExecutorHandler 可以作为有效的解决方案。

由于ThreadLocal对象最终在使用完的时候需要remove掉,因此,该方法需要集中统一调用。

实现时,我们定义了HandlerThreadLocal类。

HandlerThreadLocal对象负责通过 ThreadLocal的get方法来获取线程本地的ExecutorHandler对象,并执行其 handle方法(具体实现可以参照后面的代码)。

执行完业务操作以后,通过调用remove方法将其销毁。

2.2.3 异常的捕捉和处理DelayedCallHandler。

由于ThreadLocal的remove方法是一定需要被执行,因此该方法应该放在一个try- catch - finally 块的finally段中,保证其不被遗漏。

  1. DelayedCallHandler通过handle()方法调用业务逻辑。
  2. 在调用完业务逻辑后,调用ExecutorHandler的handle()方法,执行已经注册到延迟调用容器中的业务方法。
  3. 最后在finally中将ThreadLocal 对象remove掉。

整个DelayedCallHandler的handle方法就是一个完整的try- catch - finally 块。

2.2.4 标准化定义:DelayablelService需要延迟调用的业务类

由于DelayedCallHandler已经模块化,业务方法最好也定义成一个具体的方法名(doBusiness),所有的业务处理类,实现DelayedCallHandler接口,在doBusiness方法中调用有事务的业务逻辑。

3、最终实现方案

基于处理方案二的分析,最后我们使用ThreadLocal来传递业务数据。

我们通过ThreadLocal executorHandler来传递数据。

在业务逻辑MessageSender 、OrderSaver中通过executorHandler将需要延迟执行的业务定义好。

在HandlerThreadLocal中,使用 executorHandler处理之前定义好的逻辑。

这样做将事务和非事务分开,不再以方法参数的方式向下游传递数据,使得数据传递得以结构,处理起来更加优雅。

示例代码如下。

业务接口调用

 public static void main(String[] args) {   
      Business b = new Business();   
      Map<String, Object> map  =  new HashMap<String, Object>();   
      DelayedCallHandler<Map<String, Object>> bu = new DelayedCallHandler<Map<String, Object>>();   
      bu.handle(b,map);   
 }    

输出结果:

业务开始 事务开启 保存数据操作开始

Save order info to datebase

Save create message to datebase.

业务结束 事务提交 保存数据操作结束

Cache order info into redis

Send message to windq

HandlerThreadLocal

  public class HandlerThreadLocal  {   
         
     public static final ThreadLocal<ExecutorHandler> executorHandler = new ThreadLocal<ExecutorHandler>(){   
          protected ExecutorHandler initialValue(){   
             return new ExecutorHandler();   
          }   
      };   
     public static final void handle(){   
             
         executorHandler.get().handle();   
    }   
       
     public static final void remove(){   
         executorHandler.remove();   
     }   
}   

DelayedCallHandler

 public  class DelayedCallHandler<T> {   
    public void handle(DelayablelService<T> buisnes,T param){             
         try {   
            //先执行业务操作   
             buisnes.doBusiness(param);   
            //执行延迟执行的业务   
            HandlerThreadLocal.handle();   
       } catch (Exception e) {   
            //处理异常   
        }finally {   
           HandlerThreadLocal.remove();   
       }   
    }         
 }  

DelayablelService1.

public interface DelayablelService { 
 
 public void doBusiness(T param); 
 
}  

Business

  /**  
   * 完整的业务,有多个数据库操作,以及数据库以外的需要延迟执行的业务逻辑  
   */   
  public class Business implements DelayablelService< Map<String, Object> > {   
         
      @Override   
      public void doBusiness(Map<String, Object> map){   
          saveInfo(map);   
     }   
        
     public void saveInfo(Map<String, Object> map){   
         System.out.println("业务开始 事务开启  保存数据操作开始" );   
         new OrderSaver().saveOrderInfo();   
        new MessageSender().saveMessageForSend();   
         System.out.println("业务结束 事务提交 保存数据操作结束");   
     }   
 }    

MessageSender

  /**  
   * 下发数据的业务逻辑  
  */   
  public class MessageSender {   
      public void saveMessageForSend(){   
         ExecutorHandler executorHandler = HandlerThreadLocal.executorHandler.get();   
         System.out.println("Save create message to datebase.");   
         executorHandler.add(new Executor() {   
             @Override   
            public void execute() {   
                System.out.println("Send message to windq");   
             }   
        });   
     }   
}    

OrderSaver

  /**  
  * 保存服务单的业务逻辑  
   */   
  public class OrderSaver {   
         
      public void saveOrderInfo(){   
         System.out.println("Save order info to datebase");   
            
         ExecutorHandler executorHandler = HandlerThreadLocal.executorHandler.get();   
        //这就是所谓的回调函数   
         executorHandler.add(new Executor() {   
                
             @Override   
            public void execute() {   
                System.out.println("Cache order info into redis");   
             }   
         });   
     }   
 }    

4、总结

使用延迟执行的模型,解决了在一个业务逻辑中既有数据库事务的操作,又有相关的非事务操作时,事务失败或未提交而非事务操作成功导致的数据不一致问题。

文中提到的逻辑割裂和参数传递的问题,都是在比较复杂的场景下才有的。苏宁售后订单业务中此类逻辑常有出现,因此我们就这些问题进行了分析、讨论,得出这样的解决方案。并非所有的系统和业务都需要这样。任何解决方案都要因情况而定,避免画蛇添足。

在使用该模型时,有使用到匿名内部类和线程局部变量(ThreadLocal),在使用时,有一定的注意事项,ThreadLocal在使用结束后要通过其remove()方法移除,使用时需要留意。

作者:

王海勇,苏宁科技集团苏宁云软件公司售后研发中心技术经理。从事 Java 开发多年,擅长业务抽象及业务架构设计,2016年9月加入苏宁,参与售后服务域订单平台、时效平台等系统平台的研发工作。在苏宁巨大业务量的场景下,保证系统稳定、安全、高效地提供服务。

【51CTO原创稿件,合作站点转载请注明原文作者和出处为51CTO.com】

【责任编辑:庞桂玉 TEL:(010)68476606】


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Head First Web Design

Head First Web Design

Ethan Watrall、Jeff Siarto / O’Reilly Media, Inc. / 2009-01-02 / USD 49.99

Want to know how to make your pages look beautiful, communicate your message effectively, guide visitors through your website with ease, and get everything approved by the accessibility and usability ......一起来看看 《Head First Web Design》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试