如何实现用MQ解决分布式事务原创
金蝶云社区-sharkv
sharkv
15人赞赏了该文章 1万次浏览 未经作者许可,禁止转载编辑于2022年04月15日 11:25:20

关键字:分布式事务、最终一致、MQ

一、需求

功能性需求:单据提交后,自动执行审核操作

非功能性需求:单据提交失败,审核也失败数据一致

单据提交成功,审核失败,无法回滚提交数据不一致×

二、思路与方案

2.1 分析思路

当应用/服务/数据库间产生分布式事务时,若对数据时效性要求不是太严格的情况下可以通过MQ来实现事务最终一致性。设计需要满足以下要求:

          分布式事务框架确保在微服务架构下多应用实例分布于不同的节点之上的事务最终一致性。

          使用可靠消息队列MQ,保证上下游应用数据操作的最终一致性。即将本地业务逻辑和消息发送记录放在一个事务中,保证本地操作和消息记录要么两者都成功或者都失败。下游应用向消息系统订阅该消息,收到消息后执行相应操作。

          从本质上讲是将分布式事务转换为两个本地事务,然后依靠下游业务的重试机制达到最终一致性。

 

2.2 实现方案

2.2.1工作原理图

重要参数说明:

1ready:消息已到达MQ,等待消费

2unacknowledged:消息正在被消费,未确认

1.png



图 1

2.2.2图解说明

下面对一些关键节点进行代码展示或说明:

消息发送者

       1、5:消息发送记录需要在外层业务逻辑的事务保护中发生

       3、4:将事务的xid写入消息及消息发送表t_dtx_trans中,若表、索引等不存在则会动态创建。(该表理论上每个库都会有)

2.png

图 2

线程捡漏:若发送消息到MQ的过程失败,但消息发送记录成功保存到t_dtx_trans,则通过线程捡漏可以扫描该表,并通过state字段过滤识别需要重新投递的消息。

3.png


图 3

消息接受者

Ⅰ:消费者线程获取消息进行消费,消息状态由ready变为unacknowledged

Ⅱ:根据消息中记录的xid查询t_dtx_trans表知道事务是否已提交。若已提交则正常消费,并改变unacknowledged状态;若事务还未提交,则会将该消息重新投递至队列头部,等待消费。当重试次数达到一定的max次数后则会从队列中discard丢弃消息。

4.png


图 4

Ⅲ:默认自动确认。建议手动确认acker.ack(messgeID);acker.deny(messgeID);acker.discard(messgeID);

2.2.3设计关键点

保证可靠事件投递和避免重复消费

      消息MQ发送事务性支持:每个服务原子性的业务操作和消息发布事件

      消息队列的可靠性投递,确保事件传递至少一次(消息确认机制ACK)

      下游业务执行的顺序及幂等性控制

      消息可靠持久化

2.2.4缺点

      若采用publishInDbTranscation,在消费者侧,注意一下Ⅱ这个步骤,当发送者事务提交失败时,unacknowledged的消息直接又压回队列的头部了,当unack的消息个数>=消费者个数时,消费者一致循环处理unacknowledged的消息,导致后面reday的消息都阻塞了,只能等到重试次数上限后,自动删除消息后,方可消费后面的ready消息。

      缺少可视化的事务维护界面,若重试达到次数需要手工干预的时候,业务人员不方便排查及处理

 

三、实现过程

1.打开单据设计器,编辑提交操作,添加插件kd.demo.sci.opplugign.AutoAuditOP

5.png


图 5

package kd.demo.sci.opplugign;
 
import kd.bos.dataentity.entity.DynamicObject;
import kd.bos.entity.plugin.AbstractOperationServicePlugIn;
import kd.bos.entity.plugin.args.EndOperationTransactionArgs;
import kd.bos.mq.MQFactory;
import kd.bos.mq.MessagePublisher;
 
public class AutoAuditOP extends AbstractOperationServicePlugIn {
   @Override
   public void endOperationTransaction(EndOperationTransactionArgs e) {
      MessagePublisher mp = MQFactory.get().createSimplePublisher("kded_tc", "erkai_queue");
      try {
          for(DynamicObject data:e.getDataEntities()) {
             //p1:该操作所在的分库
             //p2:自定义数据
             mp.publishInDbTranscation("secd",data.get("id") );
          }
      } finally {
          mp.close();
      }
   }
}
2.实现消费者
public class DemoConsumer implements MessageConsumer {
   Log log = LogFactory.getLog(getClass());
   @Override
   public void onMessage(Object id, String messageId, boolean resend, MessageAcker acker) {
      log.info("自定义DemoConsumer开始消费");
      try {
          
          OperationResult result = OperationServiceHelper.executeOperate("audit", "kded_testt", new Object[] {id}, OperateOption.create());
          List<OperateErrorInfo> allErrorInfo = result.getAllErrorInfo();
          if(allErrorInfo.size()<=0) {
             acker.ack(messageId);
             return;
          }
          
          QFilter q1 = new QFilter("id",QCP.equals,id);
          QFilter q2 = new QFilter("billstatus",QCP.equals,"C");
          //如果这条数据已经审核了则废弃消息不再进行重试
          boolean discard = QueryServiceHelper.exists("kded_testt", new QFilter[] {q1,q2});
          if (discard){
             acker.discard(messageId);//废弃
             log.info(id+"已经审核,此消息丢弃");
          } else{
             acker.deny(messageId);//告诉mq重发这条消息
             log.error(result.getMessage());
          }
      } catch (Exception e) {
          acker.deny(messageId);//告诉mq重发这条消息
          // 记录异常原因,并写业务日志
          log.error(e);
      }
   }
 
}

四、效果图

1. 点击提交

6.png


图 6

2.手动构造网络异常,捕获异常进行重试

7.png


图 7

8.png


图 8

9.png


图 9

3.去掉异常,模拟网络环境OK后,逻辑正常,消息确认

10.png


图 10

4.最终结果-已审核

11.png


图 11


五、开发环境版本

V4.0.0.14

六、参考资料

【开发平台】指导手册

学习成长中心

 

 

 

 


赞 15