首页>>后端>>Spring->Spring事务同步机制扩展解决加锁和MQ发送数据同步问题

Spring事务同步机制扩展解决加锁和MQ发送数据同步问题

时间:2023-11-29 本站 点击:0

问题场景

在Spring事务中发送MQ或进行加锁操作,如果在事务还未结束时,MQ就已经发送出去或锁已经被释放掉,导致数据不一致性问题

场景一

@Transactional(propagation=Propagation.REQUIRES_NEW)publicOrdersave(Orderorder){orderRepository.save(order);//发送mqorderSender.send(order);//dosomethingreturnorder;}

上述代码中,在保存订单后立即发送MQ,但此时事务还没提交,如果do something的后续逻辑耗时较久,就会有几率,MQ消费者在接收到消息后,生产者的事务还未提交,假设消费者此时要到数据库反查order,就会查不到;

场景二

@Transactional(propagation=Propagation.REQUIRES_NEW)@Overridepublicvoidseckill(){lock.lock();try{Integerexists=orderRepository.countAllBy();log.info("当前库存:"+(limit-exists));if(exists>=limit){log.warn("下单失败了");FAIL.increment();return;}Orderorder=newOrder();order.setPrice(123L);order.setCreatedOn(LocalDateTime.now());orderRepository.save(order);log.info("下单成功了");SUCCESS.increment();}finally{lock.unlock();}}

上述代码中,虽然解锁操作lock.unlock()是放在在finally里面执行的,但实际上在使用事务时,经过AOP代理后,lock.unlock()会早于事务提交执行,因此还是存在并发问题。

解决方案

要解决上述的问题,本质上就是要让事务或解锁操作,在事务提交后再执行,可以使用手动提交事务方式,直接后置这些操作,但此种实现起来对业务代码侵入性较大,不太优雅。

这里我将介绍另一种方式——通过扩展Spring事务的同步机制来解决此问题:

Spring事务的同步机制

Spring事务的同步机制的具体原理这里就不做详细介绍了,具体可以参考https://blog.csdn.net/f641385712/article/details/91538445这篇文章

简单来说,我们可以通过自定义TransactionSynchronization,然后通过在事务方法中使用TransactionSynchronizationManager.registerSynchronization(synchronization)方法来注册事务同步器,在事务完成前后的进行相关逻辑操作;

接口TransactionSynchronization的定义如下:

publicinterfaceTransactionSynchronizationextendsOrdered,Flushable{/**Completionstatusincaseofpropercommit.*/intSTATUS_COMMITTED=0;/**Completionstatusincaseofproperrollback.*/intSTATUS_ROLLED_BACK=1;/**Completionstatusincaseofheuristicmixedcompletionorsystemerrors.*/intSTATUS_UNKNOWN=2;@OverridedefaultintgetOrder(){returnOrdered.LOWEST_PRECEDENCE;}defaultvoidsuspend(){}defaultvoidresume(){}@Overridedefaultvoidflush(){}defaultvoidbeforeCommit(booleanreadOnly){}defaultvoidbeforeCompletion(){}defaultvoidafterCommit(){}defaultvoidafterCompletion(intstatus){}}

例如,对之前场景一的代码进行改写:

@Transactional(propagation=Propagation.REQUIRES_NEW)publicOrdersave(Orderorder){orderRepository.save(order);//发送mqTransactionSynchronizationManager.registerSynchronization(newTransactionSynchronization(){@OverridepublicvoidafterCommit(){orderSender.send(order);}});//dosomethingreturnorder;}

我们注册了一个自定义的事务同步器,其中:

publicvoidafterCommit(){orderSender.send(order);}

即表示在事务完成提交后进行MQ消息发送

进一步封装

了解了Spring的事务同步机制,我们就可以对此进行一下封装

函数式接口:要执行方法

@FunctionalInterfacepublicinterfaceAction{/***执行方法*/voidexecute();}

枚举类:定义需要在事务处于何种状态时进行操作

@Getter@AllArgsConstructorpublicenumActionExecuteState{/***事务提交时执行Action*/WHEN_COMMITTED(TransactionSynchronization.STATUS_COMMITTED),/***事务回滚时执行Action*/WHEN_ROLL_BACK(TransactionSynchronization.STATUS_ROLLED_BACK),/***事务状态未知时执行Action*/WHEN_UNKNOWN(TransactionSynchronization.STATUS_UNKNOWN),/***不管事务状态为何值下都会执行Action*/WHEN_ALL(null);privateIntegerstate;publicstaticActionExecuteStategetByState(Integerstate){if(state==null){returnWHEN_ALL;}for(ActionExecuteStatee:ActionExecuteState.values()){if(Objects.equals(e.state,state)){returne;}}returnnull;}}

事务状态操作封装类

publicclassActionWrapper{/***Map:<事务状态,操作>*/privateMap<ActionExecuteState,List<Action>>actionMap;publicActionWrapper(@NotNullMap<ActionExecuteState,List<Action>>actionMap){assertactionMap!=null;this.actionMap=actionMap;}publicMap<ActionExecuteState,List<Action>>getActionMap(){returnactionMap;}}

后置的数据库事务同步处理器

@Slf4jpublicclassPostActionTransactionSynchronizationHandler{/***Supplier:用于判断当前是否出去活动事务中,默认为{@codeTransactionSynchronizationManager::isActualTransactionActive}*/privatefinalSupplier<Boolean>transactionActiveSupplier;/***通过ThreadLocal缓存写操作方法*/privatefinalThreadLocal<ActionWrapper>cache=ThreadLocal.withInitial(()->null);/***缓存当前线程中*/privatefinalThreadLocal<Boolean>initSign=ThreadLocal.withInitial(()->null);privatePostActionTransactionSynchronizationsynchronization;publicPostActionTransactionSynchronizationHandler(Supplier<Boolean>transactionActiveSupplier){this.transactionActiveSupplier=transactionActiveSupplier;}publicPostActionTransactionSynchronizationHandler(){this(TransactionSynchronizationManager::isActualTransactionActive);}publicvoidsetSynchronization(PostActionTransactionSynchronizationsynchronization){this.synchronization=synchronization;}/***判断当前是否出去活动事务中**@return*/publicbooleanisActualTransactionActive(){returntransactionActiveSupplier.get();}protectedList<Action>getActions(ActionExecuteStatestate){ActionWrapperwrapper=cache.get();if(wrapper==null||state==null){returnCollections.emptyList();}Map<ActionExecuteState,List<Action>>map=wrapper.getActionMap();if(map==null){returnCollections.emptyList();}returnmap.get(state);}/***添加一个事务后操作方法**@paramaction操作方法*@paramstateaction要运行在的事务状态*/publicsynchronizedvoidaddAction(Actionaction,ActionExecuteStatestate){registerSynchronizationIfNeed();ActionWrapperwrapper=cache.get();if(wrapper==null){wrapper=newActionWrapper(newHashMap<>(4));cache.set(wrapper);}Map<ActionExecuteState,List<Action>>map=wrapper.getActionMap();List<Action>actions=map.computeIfAbsent(state,s->newArrayList<>());actions.add(action);}privatevoidregisterSynchronizationIfNeed(){Booleanflag=initSign.get();if(flag==null||!flag){TransactionSynchronizationManager.registerSynchronization(synchronization);initSign.set(true);}}/***清理缓存*/publicvoidclear(){cache.remove();}publicvoiddoAfterAfterCompletion(intstateCode){try{ActionExecuteStatestate=ActionExecuteState.getByState(stateCode);if(state!=null){executeActionByState(state);}executeActionByState(ActionExecuteState.WHEN_ALL);}finally{this.clear();}}privatevoidexecuteActionByState(ActionExecuteStatestate){if(state==null){return;}try{List<Action>actions=getActions(state);if(CollectionUtils.isEmpty(actions)){return;}if(log.isInfoEnabled()){log.info("beginexecuteactionwithstate={}",state);}for(Actionaction:actions){if(log.isDebugEnabled()){log.debug("execute{}",action.toString());}action.execute();}}catch(Exceptione){log.error("executeactionsaftertransactioncompletionfail,state="+state,e);}}}

后置的数据库事务同步器

@Slf4jpublicclassPostActionTransactionSynchronizationimplementsTransactionSynchronization{privatePostActionTransactionSynchronizationHandlerhandler;publicPostActionTransactionSynchronization(PostActionTransactionSynchronizationHandlerhandler){this.handler=handler;}@OverridepublicintgetOrder(){returnOrdered.HIGHEST_PRECEDENCE;}@OverridepublicvoidafterCompletion(intstatus){handler.doAfterAfterCompletion(status);}}

使用方式:

定义bean

@Transactional(propagation=Propagation.REQUIRES_NEW)@Overridepublicvoidseckill(){lock.lock();try{Integerexists=orderRepository.countAllBy();log.info("当前库存:"+(limit-exists));if(exists>=limit){log.warn("下单失败了");FAIL.increment();return;}Orderorder=newOrder();order.setPrice(123L);order.setCreatedOn(LocalDateTime.now());orderRepository.save(order);log.info("下单成功了");SUCCESS.increment();}finally{lock.unlock();}}0

代码改写

场景一

@Transactional(propagation=Propagation.REQUIRES_NEW)@Overridepublicvoidseckill(){lock.lock();try{Integerexists=orderRepository.countAllBy();log.info("当前库存:"+(limit-exists));if(exists>=limit){log.warn("下单失败了");FAIL.increment();return;}Orderorder=newOrder();order.setPrice(123L);order.setCreatedOn(LocalDateTime.now());orderRepository.save(order);log.info("下单成功了");SUCCESS.increment();}finally{lock.unlock();}}1

场景二

@Transactional(propagation=Propagation.REQUIRES_NEW)@Overridepublicvoidseckill(){lock.lock();try{Integerexists=orderRepository.countAllBy();log.info("当前库存:"+(limit-exists));if(exists>=limit){log.warn("下单失败了");FAIL.increment();return;}Orderorder=newOrder();order.setPrice(123L);order.setCreatedOn(LocalDateTime.now());orderRepository.save(order);log.info("下单成功了");SUCCESS.increment();}finally{lock.unlock();}}2

示例项目地址

https://github.com/wf2311/spring-tx-post-synchronization

可以参考本项目的 src/test/java目录查看具体测试用例和用法。

参考

当 Transactional 碰到锁,有个大坑!

使用Spring的事务同步机制解决:数据库刚插入的记录却查询不到的问题


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/Spring/280.html