订单初版—7.支付和履约实现的重构文档

大纲

1.预支付到完成支付的业务流程

2.支付回调到推送履约的代码流程

3.支付回调到推送履约的双异步设计

4.发送订单履约的RocketMQ事务消息的原理

5.RocketMQ事务消息的实现原理

6.支付回调到发送履约的RocketMQ事务消息代码

7.履约场景引入Saga模式 + 状态机

8.履约流程的Seata Saga状态节点定义 + 节点流转

9.履约流程的Seata Saga失败补偿流程

10.Seata Saga状态机流转原理

11.履约系统的Seata Saga代码实现

12.履约系统的Seata Saga空回滚和悬挂问题

1.预支付到完成支付的业务流程

订单正向链路有三个核心环节:生成订单 + 支付 + 履约。

2.支付回调到推送履约的代码流程

(1)支付回调成功,更新订单状态 + 发送订单已完成支付的事务消息

(2)订单系统消费已完成支付的消息,更新订单履约状态 + 发送订单履约的消息

(3)履约系统消费触发订单履约的消息,进行具体的订单履约处理

(1)支付回调成功,更新订单状态 + 发送订单已完成支付的事务消息

@DubboService(version = "1.0.0", interfaceClass = OrderApi.class, retries = 0)
public class OrderApiImpl implements OrderApi {
    @Autowired
    private OrderService orderService;
    ...

    //支付回调接口
    @Override
    public JsonResult<Boolean> payCallback(PayCallbackRequest payCallbackRequest) {
        try {
            orderService.payCallback(payCallbackRequest);
            return JsonResult.buildSuccess(true);
        } catch (OrderBizException e) {
            log.error("biz error", e);
            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
        } catch (Exception e) {
            log.error("error", e);
            return JsonResult.buildError(e.getMessage());
        }
    }
    ...
}

@Service
public class OrderServiceImpl implements OrderService {
    @Autowired
    private OrderInfoDAO orderInfoDAO;

    @Autowired
    private OrderNoManager orderNoManager;

    @Autowired
    private DefaultProducer defaultProducer;

    @Autowired
    private RedisLock redisLock;
    ...

    //支付回调
    //支付回调有2把分布式锁的原因说明:同一笔订单在同一时间只能支付or取消
    //不可以同时对一笔订单,既发起支付,又发起取消
    @Override
    public void payCallback(PayCallbackRequest payCallbackRequest) {
        //1.入参检查
        checkPayCallbackRequestParam(payCallbackRequest);
        String orderId = payCallbackRequest.getOrderId();
        Integer payAmount = payCallbackRequest.getPayAmount();
        Integer payType = payCallbackRequest.getPayType();
        List<String> redisKeyList = Lists.newArrayList();

        //2.加支付分布式锁避免支付系统并发回调
        String orderPayKey = RedisLockKeyConstants.ORDER_PAY_KEY + orderId;
        //加取消订单分布式锁避免支付和取消订单同时操作同一笔订单
        String cancelOrderKey = RedisLockKeyConstants.CANCEL_KEY + orderId;
        redisKeyList.add(orderPayKey);
        redisKeyList.add(cancelOrderKey);
        boolean lock = redisLock.multiLock(redisKeyList);
        if (!lock) {
            throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_ERROR);
        }

        try {
            //从数据库中查询出当前订单信息
            OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);
            OrderPaymentDetailDO orderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(orderId);

            //3.校验参数
            if (orderInfoDO == null || orderPaymentDetailDO == null) {
                throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL);
            }
            if (!payAmount.equals(orderInfoDO.getPayAmount())) {
                throw new OrderBizException(OrderErrorCodeEnum.ORDER_CALLBACK_PAY_AMOUNT_ERROR);
            }

            //4.异常场景判断
            Integer orderStatus = orderInfoDO.getOrderStatus();
            if (OrderStatusEnum.CREATED.getCode().equals(orderStatus)) {
                //如果订单状态是"已创建",直接更新订单状态为已支付,并发送事务消息
                TransactionMQProducer transactionMQProducer = defaultProducer.getProducer();
                transactionMQProducer.setTransactionListener(new TransactionListener() {
                    @Override
                    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                        try {
                            orderManager.updateOrderStatusPaid(payCallbackRequest, orderInfoDO, orderPaymentDetailDO);
                            return LocalTransactionState.COMMIT_MESSAGE;
                        } catch (BaseBizException e) {
                            throw e;
                        } catch (Exception e) {
                            log.error("system error", e);
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                        }
                    }

                    //回查接口
                    @Override
                    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                        //检查订单是否是已支付
                        OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);
                        if (orderInfoDO != null && OrderStatusEnum.PAID.getCode().equals(orderInfoDO.getOrderStatus())) {
                            return LocalTransactionState.COMMIT_MESSAGE;
                        }
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                });
                //发送 "订单已完成支付" 消息
                sendPaidOrderSuccessMessage(transactionMQProducer, orderInfoDO);
            } else {
                //如果订单状态不是 "已创建"
                if (OrderStatusEnum.CANCELED.getCode().equals(orderStatus)) {
                    //如果订单那状态是取消状态
                    Integer payStatus = orderPaymentDetailDO.getPayStatus();
                    if (PayStatusEnum.UNPAID.getCode().equals(payStatus)) {
                        //调用退款
                        executeOrderRefund(orderInfoDO, orderPaymentDetailDO);
                        throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_ERROR);
                    } else if (PayStatusEnum.PAID.getCode().equals(payStatus)) {
                        if (payType.equals(orderPaymentDetailDO.getPayType())) {
                            throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_SAME_ERROR);
                        } else {
                            throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_NO_SAME_ERROR);
                        }
                    }
                } else {
                    //如果订单状态不是取消状态
                    if (PayStatusEnum.PAID.getCode().equals(orderPaymentDetailDO.getPayStatus())) {
                        if (payType.equals(orderPaymentDetailDO.getPayType())) {
                            return;
                        }
                        //调用退款
                        executeOrderRefund(orderInfoDO, orderPaymentDetailDO);
                        throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_REPEAT_ERROR);
                    }
                }
            }
        } catch (Exception e) {
            throw new OrderBizException(e.getMessage());
        } finally {
            //释放分布式锁
            redisLock.unMultiLock(redisKeyList);
        }
    }

    //发送订单已完成支付消息,触发订单进行履约
    private void sendPaidOrderSuccessMessage(TransactionMQProducer transactionMQProducer, OrderInfoDO orderInfoDO) throws MQClientException {
        String orderId = orderInfoDO.getOrderId();
        PaidOrderSuccessMessage message = new PaidOrderSuccessMessage();
        message.setOrderId(orderId);
        String topic = RocketMqConstant.PAID_ORDER_SUCCESS_TOPIC;
        byte[] body = JSON.toJSONString(message).getBytes(StandardCharsets.UTF_8);
        Message mq = new Message(topic, body);
        TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(mq, orderInfoDO);
        if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
            throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_SEND_MQ_ERROR);
        }
    }
    ...
}

@Service
public class OrderManagerImpl implements OrderManager {
    ...
    //支付回调更新订单状态
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void updateOrderStatusPaid(PayCallbackRequest payCallbackRequest, OrderInfoDO orderInfoDO, OrderPaymentDetailDO orderPaymentDetailDO) {
        //主单信息
        String orderId = payCallbackRequest.getOrderId();
        Integer preOrderStatus = orderInfoDO.getOrderStatus();
        orderInfoDO.setOrderStatus(OrderStatusEnum.PAID.getCode());
        orderInfoDAO.updateById(orderInfoDO);

        //主单支付信息
        orderPaymentDetailDO.setPayStatus(PayStatusEnum.PAID.getCode());
        orderPaymentDetailDAO.updateById(orderPaymentDetailDO);

        //新增订单状态变更日志
        OrderOperateLogDO orderOperateLogDO = new OrderOperateLogDO();
        orderOperateLogDO.setOrderId(orderId);
        orderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode());
        orderOperateLogDO.setPreStatus(preOrderStatus);
        orderOperateLogDO.setCurrentStatus(orderInfoDO.getOrderStatus());
        orderOperateLogDO.setRemark("订单支付回调操作" + orderOperateLogDO.getPreStatus() + "-" + orderOperateLogDO.getCurrentStatus());
        orderOperateLogDAO.save(orderOperateLogDO);

        //判断是否存在子订单
        List<OrderInfoDO> subOrderInfoDOList = orderInfoDAO.listByParentOrderId(orderId);
        if (subOrderInfoDOList != null && !subOrderInfoDOList.isEmpty()) {
            //先将主订单状态设置为无效订单
            Integer newPreOrderStatus = orderInfoDO.getOrderStatus();
            orderInfoDO.setOrderStatus(OrderStatusEnum.INVALID.getCode());
            orderInfoDAO.updateById(orderInfoDO);

            //新增订单状态变更日志
            OrderOperateLogDO newOrderOperateLogDO = new OrderOperateLogDO();
            newOrderOperateLogDO.setOrderId(orderId);
            newOrderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode());
            newOrderOperateLogDO.setPreStatus(newPreOrderStatus);
            newOrderOperateLogDO.setCurrentStatus(OrderStatusEnum.INVALID.getCode());
            orderOperateLogDO.setRemark("订单支付回调操作,主订单状态变更" + newOrderOperateLogDO.getPreStatus() + "-" + newOrderOperateLogDO.getCurrentStatus());
            orderOperateLogDAO.save(newOrderOperateLogDO);

            //再更新子订单的状态
            for (OrderInfoDO subOrderInfo : subOrderInfoDOList) {
                Integer subPreOrderStatus = subOrderInfo.getOrderStatus();
                subOrderInfo.setOrderStatus(OrderStatusEnum.PAID.getCode());
                orderInfoDAO.updateById(subOrderInfo);
                //更新子订单的支付明细状态
                String subOrderId = subOrderInfo.getOrderId();
                OrderPaymentDetailDO subOrderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(subOrderId);
                if (subOrderPaymentDetailDO != null) {
                    subOrderPaymentDetailDO.setPayStatus(PayStatusEnum.PAID.getCode());
                    orderPaymentDetailDAO.updateById(subOrderPaymentDetailDO);
                }

                //新增订单状态变更日志
                OrderOperateLogDO subOrderOperateLogDO = new OrderOperateLogDO();
                subOrderOperateLogDO.setOrderId(subOrderId);
                subOrderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode());
                subOrderOperateLogDO.setPreStatus(subPreOrderStatus);
                subOrderOperateLogDO.setCurrentStatus(OrderStatusEnum.PAID.getCode());
                orderOperateLogDO.setRemark("订单支付回调操作,子订单状态变更" + subOrderOperateLogDO.getPreStatus() + "-" + subOrderOperateLogDO.getCurrentStatus());
                orderOperateLogDAO.save(subOrderOperateLogDO);
            }
        }
    }
    ...
}

(2)订单系统消费已完成支付的消息,更新订单履约状态 + 发送订单履约的消息

//订单系统进行消费
@Configuration
public class ConsumerConfig {
    @Autowired
    private RocketMQProperties rocketMQProperties;

    //订单完成支付消息消费者
    @Bean("paidOrderSuccessConsumer")
    public DefaultMQPushConsumer paidOrderSuccessConsumer(PaidOrderSuccessListener paidOrderSuccessListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PAID_ORDER_SUCCESS_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(PAID_ORDER_SUCCESS_TOPIC, "*");
        consumer.registerMessageListener(paidOrderSuccessListener);
        consumer.start();
        return consumer;
    }
    ...
}

//订单系统监听订单支付完成的消息
@Component
public class PaidOrderSuccessListener implements MessageListenerConcurrently {
    @Autowired
    private OrderInfoDAO orderInfoDAO;

    @Autowired
    private OrderFulFillService orderFulFillService;

    @Autowired
    private RedisLock redisLock;

    @Autowired
    private DefaultProducer defaultProducer;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            for (MessageExt messageExt : list) {
                String message = new String(messageExt.getBody());
                PaidOrderSuccessMessage paidOrderSuccessMessage = JSON.parseObject(message, PaidOrderSuccessMessage.class);
                String orderId = paidOrderSuccessMessage.getOrderId();
                log.info("触发订单履约,orderId:{}", orderId);
                OrderInfoDO order = orderInfoDAO.getByOrderId(orderId);
                if (Objects.isNull(order)) {
                    throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL);
                }

                //1.加分布式锁 + 里面的履约前置状态校验防止消息重复消费
                String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId;
                if (!redisLock.lock(key)) {
                    log.error("order has not acquired lock,cannot fulfill, orderId={}", orderId);
                    throw new BaseBizException(OrderErrorCodeEnum.ORDER_FULFILL_ERROR);
                }

                try {
                    //2.进行订单履约逻辑
                    TransactionMQProducer producer = defaultProducer.getProducer();
                    producer.setTransactionListener(new TransactionListener() {
                        @Override
                        public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                            try {
                                orderFulFillService.triggerOrderFulFill(orderId);
                                return LocalTransactionState.COMMIT_MESSAGE;
                            } catch (BaseBizException e) {
                                throw e;
                            } catch (Exception e) {
                                log.error("system error", e);
                                return LocalTransactionState.ROLLBACK_MESSAGE;
                            }
                        }

                        //回查接口
                        @Override
                        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                            //检查订单是否"已履约"状态
                            OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);
                            if (orderInfoDO != null && OrderStatusEnum.FULFILL.getCode().equals(orderInfoDO.getOrderStatus())) {
                                return LocalTransactionState.COMMIT_MESSAGE;
                            }
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                        }
                    });
                    ReceiveFulfillRequest receiveFulfillRequest = orderFulFillService.buildReceiveFulFillRequest(order);
                    String topic = TRIGGER_ORDER_FULFILL_TOPIC;
                    byte[] body = JSON.toJSONString(receiveFulfillRequest).getBytes(StandardCharsets.UTF_8);
                    Message mq = new Message(topic, body);
                    producer.sendMessageInTransaction(mq, order);
                } finally {
                    redisLock.unlock(key);
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            log.error("consumer error", e);
            //本地业务逻辑执行失败,触发消息重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

@Service
public class OrderFulFillServiceImpl implements OrderFulFillService {
    @Autowired
    private OrderInfoDAO orderInfoDAO;

    @Autowired
    private OrderOperateLogDAO orderOperateLogDAO;
    ...

    @Transactional(rollbackFor = Exception.class)
    @Override
    public void triggerOrderFulFill(String orderId) throws OrderBizException {
        //1.查询订单
        OrderInfoDO order = orderInfoDAO.getByOrderId(orderId);
        if (Objects.isNull(order)) {
            return;
        }

        //2.校验订单是否已支付
        OrderStatusEnum orderStatus = OrderStatusEnum.getByCode(order.getOrderStatus());
        if (!OrderStatusEnum.PAID.equals(orderStatus)) {
            log.info("order has not been paid,cannot fulfill, orderId={}", order.getOrderId());
            return;
        }

        //3.更新订单状态为:已履约
        orderInfoDAO.updateOrderStatus(orderId, OrderStatusEnum.PAID.getCode(), OrderStatusEnum.FULFILL.getCode());

        //4.并插入一条订单变更记录
        orderOperateLogDAO.save(orderOperateLogFactory.get(order, OrderStatusChangeEnum.ORDER_FULFILLED));
    }
    ...
}

(3)履约系统消费触发订单履约的消息,进行具体的订单履约处理

//履约系统消费
@Configuration
public class ConsumerConfig {
    @Autowired
    private RocketMQProperties rocketMQProperties;

    //触发订单履约消息消费者
    @Bean("triggerOrderFulfillConsumer")
    public DefaultMQPushConsumer triggerOrderFulfillConsumer(TriggerOrderFulfillTopicListener triggerOrderFulfillTopicListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(TRIGGER_ORDER_FULFILL_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(TRIGGER_ORDER_FULFILL_TOPIC, "*");
        consumer.registerMessageListener(triggerOrderFulfillTopicListener);
        consumer.start();
        return consumer;
    }
    ...
}

//监听并消费订单履约消息
@Component
public class TriggerOrderFulfillTopicListener implements MessageListenerConcurrently {
    @Autowired
    private FulfillService fulfillService;//履约服务

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            for (MessageExt messageExt : list) {
                String message = new String(messageExt.getBody());
                ReceiveFulfillRequest request = JSON.parseObject(message, ReceiveFulfillRequest.class);
                fulfillService.receiveOrderFulFill(request);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            log.error("consumer error", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

@Service
public class FulfillServiceImpl implements FulfillService {
    ...
    @Override
    public Boolean receiveOrderFulFill(ReceiveFulfillRequest request) {
        log.info("接受订单履约成功,request={}", JSONObject.toJSONString(request));
        String orderId = request.getOrderId();

        //加分布式锁(防止重复触发履约)
        String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId;
        boolean lock = redisLock.lock(key);
        if (!lock) {
            throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_ERROR);
        }

        try {
            //1.幂等:校验orderId是否已经履约过
            if (orderFulfilled(request.getOrderId())) {
                log.info("该订单已履约!!!,orderId={}", request.getOrderId());
                return true;
            }

            //2.saga状态机,触发wms捡货和tms发货
            StateMachineEngine stateMachineEngine = (StateMachineEngine) springApplicationContext.getBean("stateMachineEngine");
            Map<String, Object> startParams = new HashMap<>(3);
            startParams.put("receiveFulfillRequest", request);

            //配置的saga状态机 json的name
            //位于/resources/statelang/order_fulfull.json
            String stateMachineName = "order_fulfill";
            log.info("开始触发saga流程,stateMachineName={}", stateMachineName);
            StateMachineInstance inst = stateMachineEngine.startWithBusinessKey(stateMachineName, null, null, startParams);
            if (ExecutionStatus.SU.equals(inst.getStatus())) {
                log.info("订单履约流程执行完毕. xid={}", inst.getId());
            } else {
                log.error("订单履约流程执行异常. xid={}", inst.getId());
                throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_IS_ERROR);
            }
            return true;
        } finally {
            redisLock.unlock(key);
        }
    }
    ...
}

3.支付回调到推送履约的双异步设计

(1)双异步设计的原因

(2)履约系统通过MQ对订单进行异步履约

(3)双异步设计的目的总结

(1)双异步设计的原因

第一个异步:订单支付回调后,订单系统先发一个支付完成的消息到MQ。

第二个异步:订单系统消费到支付完成消息后进行更新,再发消息到MQ。

为什么需要双异步:为什么订单系统收到支付回调后要先发一个消息到MQ,然后再由自己监听消费,并发送真正触发履约的消息到MQ。

由于更新订单状态和更新订单履约状态都是由订单系统执行的,为什么不按如下处理:订单系统收到支付回调后,先把订单状态更新为已完成支付,然后紧接着把订单履约状态更新为已履约,最后再发送订单履约消息到MQ让履约系统消费该消息进行发货。

原因一:从业务语义来说,支付回调后的主要操作其实是更新订单状态为支付完成,而对订单进行履约并不属于支付回调的主要工作。支付回调最多是触发履约系统进行订单履约,所以支付回调成功后,在支付回调中更新订单履约状态并不合适。因此从业务语义来说,支付回调后发送一个订单支付完成的消息到MQ即可,不应有更多其他操作。

原因二:从扩展性角度来说,由于订单支付完成是一个非常关键的消息,所以可能以后会在订单支付完成后,需要进行更多的业务处理。也就是说,可能以后会有更多系统需要监听和消费订单支付完成消息,如会员系统、营销系统、数据分析系统等都需要消费订单支付完成的消息。

(2)履约系统通过MQ对订单进行异步履约

因为对订单进行履约,涉及到仓库、仓储、库存的调度,需要通知物流公司进行配送等,这个过程非常耗时及复杂,所以触发履约系统对订单进行履约不能使用同步来实现。

(3)双异步设计的目的总结

第一个异步是为了可扩展

第二个异步是为了提升性能

4.发送订单履约的RocketMQ事务消息的原理

(1)如何让更新数据库 + 发送消息到MQ是一致的

(2)发送订单履约的RocketMQ事务消息的原理

(1)如何让更新数据库 + 发送消息到MQ是一致的

问题一:如何保证更新订单状态为已完成 + 发送订单支付完成消息是一致的。也就是更新数据库 + 发送消息到MQ,要么同时成功,要么同时失败。

问题二:如何保证更新订单履约状态为已履约 + 发送触发订单履约消息是一致的。同样是更新数据库 + 发送消息到MQ,要么同时成功,要么同时失败。

为了解决更新数据库 + 发送消息到MQ是一致的,可以使用RocketMQ的事务机制。

(2)发送订单履约的RocketMQ事务消息的原理

5.RocketMQ事务消息的实现原理

6.支付回调到发送履约的RocketMQ事务消息代码

(1)支付回调成功后发送支付完成的事务消息

(2)消费订单支付完成消息时发送触发订单履约的事务消息

(1)支付回调成功后发送支付完成的事务消息

@Service
public class OrderServiceImpl implements OrderService {
    ...
    //支付回调
    //支付回调有2把分布式锁的原因说明:同一笔订单在同一时间只能支付or取消
    //不可以同时对一笔订单,既发起支付,又发起取消
    @Override
    public void payCallback(PayCallbackRequest payCallbackRequest) {
        ...
        try {
            //从数据库中查询出当前订单信息
            OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);
            OrderPaymentDetailDO orderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(orderId);

            //3.校验参数
            if (orderInfoDO == null || orderPaymentDetailDO == null) {
                throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL);
            }
            if (!payAmount.equals(orderInfoDO.getPayAmount())) {
                throw new OrderBizException(OrderErrorCodeEnum.ORDER_CALLBACK_PAY_AMOUNT_ERROR);
            }

            //4.异常场景判断
            Integer orderStatus = orderInfoDO.getOrderStatus();
            if (OrderStatusEnum.CREATED.getCode().equals(orderStatus)) {
                //如果订单状态是"已创建",直接更新订单状态为已支付,并发送事务消息
                TransactionMQProducer transactionMQProducer = defaultProducer.getProducer();
                transactionMQProducer.setTransactionListener(new TransactionListener() {
                    @Override
                    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                        try {
                            orderManager.updateOrderStatusPaid(payCallbackRequest, orderInfoDO, orderPaymentDetailDO);
                            return LocalTransactionState.COMMIT_MESSAGE;
                        } catch (BaseBizException e) {
                            throw e;
                        } catch (Exception e) {
                            log.error("system error", e);
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                        }
                    }

                    //回查
                    @Override
                    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                        //检查订单是否是已支付
                        OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);
                        if (orderInfoDO != null && OrderStatusEnum.PAID.getCode().equals(orderInfoDO.getOrderStatus())) {
                            return LocalTransactionState.COMMIT_MESSAGE;
                        }
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                });
                //发送 "订单已完成支付" 消息
                sendPaidOrderSuccessMessage(transactionMQProducer, orderInfoDO);
            } else {
                ...
            }
        } catch (Exception e) {
            throw new OrderBizException(e.getMessage());
        } finally {
            //释放分布式锁
            redisLock.unMultiLock(redisKeyList);
        }
    }

    //发送订单已完成支付消息,触发订单进行履约
    private void sendPaidOrderSuccessMessage(TransactionMQProducer transactionMQProducer, OrderInfoDO orderInfoDO) throws MQClientException {
        String orderId = orderInfoDO.getOrderId();
        PaidOrderSuccessMessage message = new PaidOrderSuccessMessage();
        message.setOrderId(orderId);
        String topic = RocketMqConstant.PAID_ORDER_SUCCESS_TOPIC;
        byte[] body = JSON.toJSONString(message).getBytes(StandardCharsets.UTF_8);
        Message mq = new Message(topic, body);
        TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(mq, orderInfoDO);
        if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
            throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_SEND_MQ_ERROR);
        }
    }
    ...
}

(2)消费订单支付完成消息时发送触发订单履约的事务消息

//订单系统监听订单支付完成后的消息
@Component
public class PaidOrderSuccessListener implements MessageListenerConcurrently {
    @Autowired
    private OrderInfoDAO orderInfoDAO;

    @Autowired
    private OrderFulFillService orderFulFillService;

    @Autowired
    private RedisLock redisLock;

    @Autowired
    private DefaultProducer defaultProducer;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            for (MessageExt messageExt : list) {
                String message = new String(messageExt.getBody());
                PaidOrderSuccessMessage paidOrderSuccessMessage = JSON.parseObject(message, PaidOrderSuccessMessage.class);
                String orderId = paidOrderSuccessMessage.getOrderId();
                log.info("触发订单履约,orderId:{}", orderId);

                OrderInfoDO order = orderInfoDAO.getByOrderId(orderId);
                if (Objects.isNull(order)) {
                    throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL);
                }

                //1.加分布式锁 + 里面的履约前置状态校验防止消息重复消费
                String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId;
                if (!redisLock.lock(key)) {
                    log.error("order has not acquired lock,cannot fulfill, orderId={}", orderId);
                    throw new BaseBizException(OrderErrorCodeEnum.ORDER_FULFILL_ERROR);
                }
              
                try {
                    //2.进行订单履约逻辑
                    TransactionMQProducer producer = defaultProducer.getProducer();
                    producer.setTransactionListener(new TransactionListener() {
                        @Override
                        public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                            try {
                                orderFulFillService.triggerOrderFulFill(orderId);
                                return LocalTransactionState.COMMIT_MESSAGE;
                            } catch (BaseBizException e) {
                                throw e;
                            } catch (Exception e) {
                                log.error("system error", e);
                                return LocalTransactionState.ROLLBACK_MESSAGE;
                            }
                        }

                        @Override
                        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                            //检查订单是否"已履约"状态
                            OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);
                            if (orderInfoDO != null && OrderStatusEnum.FULFILL.getCode().equals(orderInfoDO.getOrderStatus())) {
                                return LocalTransactionState.COMMIT_MESSAGE;
                            }
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                        }
                    });

                    ReceiveFulfillRequest receiveFulfillRequest = orderFulFillService.buildReceiveFulFillRequest(order);
                    String topic = TRIGGER_ORDER_FULFILL_TOPIC;
                    byte[] body = JSON.toJSONString(receiveFulfillRequest).getBytes(StandardCharsets.UTF_8);
                    Message mq = new Message(topic, body);
                    producer.sendMessageInTransaction(mq, order);
                } finally {
                    redisLock.unlock(key);
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            log.error("consumer error", e);
            //本地业务逻辑执行失败,触发消息重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

7.履约场景引入Saga模式 + 状态机

(1)进行订单履约时使用Saga模式的流程简介

(2)Saga模式的流程定义文件和状态机

(3)Saga模式需要自定义补偿逻辑

(4)Saga模式的优点和缺点及适用场景

(1)进行订单履约时使用Saga模式的流程简介

Saga模式运行时,会按照一个顺序流程去运行长事务。比如第一步更新订单履约数据,第二步调度发货,第三步物流配送。如果运行到某一步时出现异常,则会从这一步开始往回顺序执行补偿逻辑。比如先补偿第三步,再补偿第二步,最后补偿第一步。

(2)Saga模式的流程定义文件和状态机

使用Saga模式的分布式事务在启动时,需要有一个Saga模式的流程定义文件。比如履约系统需要有一个本地文件:Saga模式的流程定义文件。

在该文件中,需定义好整个长事务的流程:第一步执行的逻辑是A,第二步执行的逻辑是B,第三步执行的逻辑是C。第一步回滚的逻辑是a,第二步回滚的逻辑是b,第三步回滚的逻辑是c。

Saga模式启动时,也需要向Seata Server注册全局事务XID。Saga模式启动后,便会根据状态机来判断流程定义文件中的每一步,应该执行正向还是执行逆向,也就是会通过状态机来控制正向和逆向。

(3)Saga模式需要自定义补偿逻辑

一.AT模式下的补偿逻辑

分支事务写入本地事务的数据前,会先生成undo log。当某分支事务出现异常,已提交分支事务需要进行回滚补偿时,会根据生成的undo log来进行回滚补偿。

二.TCC模式下的补偿逻辑

先执行try逻辑,如果try逻辑执行成功则执行commit逻辑来进行提交,如果try逻辑执行失败则执行cancel逻辑进行回滚补偿。

三.Saga模式的补偿逻辑

每个分支事务都有正向的逻辑,每一个正向逻辑都会搭配一个逆向补偿逻辑,Saga模式需要我们自定义实现这些逆向补偿逻辑。

(4)Saga模式的优点和缺点及适用场景

优点是:执行正向链路的分支事务时,不再需要获取全局锁。缺点是:由于没有全局锁,当多个分布式事务并发执行时,不能写隔离。Saga模式的应用场景是:一个业务链路里,需要很多系统层层调用。

一.长事务使用AT模式可能导致持有全局锁时间长

这种长事务如果使用AT模式,由于需要获取全局锁,而长事务链路又过长,从而会导致持有全局锁的时间也过长。即便对某数据的全局锁竞争并不激烈,但也需要过长时间才能释放,所以其并发能力特别低。

二.长事务使用TCC模式需要改造工作量较大

这种长事务如果使用TCC模式,则会对代码产生很大的侵入性。因为需要对长事务接口改造成三个接口:try、commit、cancel。而且长事务的业务链路比较长,改造业务链路的各接口工作量过大了。所以常见于异构存储的场景,不太常见于长事务。

三.Saga模式特别适用于老系统下的长事务

因此Saga模式适用于长事务的场景,特别是老系统下的长事务。如果不希望对老系统有较大侵入性的代码改造,而且希望稍微修改一点代码就能让老系统支持分布式事务,那么就可以使用Saga模式。因为Saga模式可以独立地对老系统定义补偿逻辑,不需要像TCC模式那样去改造接口,也不用像AT模式那样侵入性添加undo log表。

8.履约流程的Seata Saga状态节点定义 + 节点流转

(1)流程定义文件分析

(2)流程定义文件中对应的方法

(1)流程定义文件分析

在履约系统的resources/statelang目录下,有一个JSON文件,该JSON文件order_fullfill.json便是Saga模式流程定义文件。

{
    "Name": "order_fulfill",
    "Comment": "订单履约流程",
    "StartState": "FulfillService",
    "Version": "1.0.0",
    "States": {
        //第一个节点
        "FulfillService": {
            "Type": "ServiceTask",
            "ServiceName": "fulfillSagaService",//所在服务
            "ServiceMethod": "createFulfillOrder",//正向执行方法
            "CompensateState": "CreateFulfillOrderCompensate",//逆向补偿方法
            "Next": "ChoiceWmsState",//下一个节点
            "Input": [//请求参数
                "$.[receiveFulfillRequest]"
            ],
            "Output": {//返回结果
                "CreateFulfillOrderResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU
                "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA
                "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN
            },
            "Catch": [
                {
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"//抛异常时执行的下一个节点
                }
            ]
        },
        //状态机,判断节点
        "ChoiceWmsState": {
            "Type": "Choice",
            "Choices": [
                {
                    "Expression": "[CreateFulfillOrderResult] == true",
                    "Next": "WmsService"//上一个节点返回true,就执行下一个节点
                }
            ],
            "Default": "Fail"
        },
        //第二个节点
        "WmsService": {
            "Type": "ServiceTask",
            "ServiceName": "wmsSageService",//所在服务
            "ServiceMethod": "pickGoods",//正向执行方法
            "CompensateState": "WmsPickGoodsCompensate",//逆向补偿方法
            "Next": "ChoiceTmsState",//下一个节点
            "Input": [//请求参数
                "$.[receiveFulfillRequest]"
            ],
            "Output": {//返回结果
                "WmsPickGoodsResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU
                "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA
                "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN
            },
            "Catch": [
                {
                    "Exceptions": [
                      "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"//抛异常时执行的下一个节点
                }
            ]
        },
        //状态机,判断节点
        "ChoiceTmsState": {
            "Type": "Choice",
            "Choices": [
                {
                    "Expression": "[WmsPickGoodsResult] == true",
                    "Next": "TmsService"//上一个节点返回true,就执行下一个节点
                }
            ],
            "Default": "Fail"
        },
        //第三个节点
        "TmsService": {
            "Type": "ServiceTask",
            "ServiceName": "tmsSagaService",//所在服务
            "ServiceMethod": "sendOut",//正向执行方法
            "CompensateState": "TmsSendOutCompensate",//逆向补偿方法
            "Input": [//请求参数
                "$.[receiveFulfillRequest]"
            ],
            "Output": {//返回结果
                "TmsSendOutResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU
                "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA
                "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN
            },
            "Catch": [
                {
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"//抛异常时执行的下一个节点
                }
            ],
            "Next": "Succeed"//下一个节点是Succeed节点
        },
        //第一个节点的补偿逻辑
        "CreateFulfillOrderCompensate": {
            "Type": "ServiceTask",
            "ServiceName": "fulfillSagaService",
            "ServiceMethod": "createFulfillOrderCompensate",
            "Input": [
                "$.[receiveFulfillRequest]"
            ]
        },
        //第二个节点的补偿逻辑
        "WmsPickGoodsCompensate": {
            "Type": "ServiceTask",
            "ServiceName": "wmsSageService",
            "ServiceMethod": "pickGoodsCompensate",
            "Input": [
                "$.[receiveFulfillRequest]"
            ]
        },
        //第三个节点的补偿逻辑
        "TmsSendOutCompensate": {
            "Type": "ServiceTask",
            "ServiceName": "tmsSagaService",
            "ServiceMethod": "sendOutCompensate",
            "Input": [
                "$.[receiveFulfillRequest]"
            ]
        },
        //补偿触发后的下一个节点
        "CompensationTrigger": {
            "Type": "CompensationTrigger",
            "Next": "Fail"//下一个节点是Fail节点
        },
        //成功节点
        "Succeed": {
            "Type": "Succeed"
        },
        //失败节点
        "Fail": {
            "Type": "Fail",
            "ErrorCode": "500",
            "Message": "订单履约异常!!"
        }
    }
}

(2)流程定义文件中对应的方法

需要注意的是:触发调用各节点补偿方法时,都是由履约系统通过本地或者RPC来调用的。

@Service("fulfillSagaService")
public class FulfillSagaServiceImpl implements FulfillSagaService {
    @Autowired
    private FulfillService fulfillService;

    @Override
    public Boolean createFulfillOrder(ReceiveFulfillRequest request) {
        log.info("创建履约单,request={}", JSONObject.toJSONString(request));
        String fulfillException = request.getFulfillException();
        if (StringUtils.isNotBlank(fulfillException) && fulfillException.equals("true")) {
            throw new FulfillBizException("创建履约单异常!");
        }
        //创建履约单
        fulfillService.createFulfillOrder(request);
        return true;
    }

    @Override
    public Boolean createFulfillOrderCompensate(ReceiveFulfillRequest request) {
        log.info("补偿创建履约单,request={}", JSONObject.toJSONString(request));
        //取消履约单
        fulfillService.cancelFulfillOrder(request.getOrderId());
        log.info("补偿创建履约单结束,request={}", JSONObject.toJSONString(request));
        return true;
    }
}

@Service("wmsSageService")
public class WmsSageServiceImpl implements WmsSagaService {
    @DubboReference(version = "1.0.0", retries = 0)
    private WmsApi wmsApi;

    @Override
    public Boolean pickGoods(ReceiveFulfillRequest request) {
        log.info("捡货,request={}", JSONObject.toJSONString(request));
        //调用wms系统进行捡货
        JsonResult<PickDTO> jsonResult = wmsApi.pickGoods(buildPickGoodsRequest(request));
        log.info("捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));
        if (!jsonResult.getSuccess()) {
            throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR);
        }
        return true;
    }

    @Override
    public Boolean pickGoodsCompensate(ReceiveFulfillRequest request) {
        log.info("补偿捡货,request={}", JSONObject.toJSONString(request));
        //调用wms系统进行捡货
        JsonResult<Boolean> jsonResult = wmsApi.cancelPickGoods(request.getOrderId());
        log.info("补偿捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));
        if (!jsonResult.getSuccess()) {
            throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR);
        }
        return true;
    }
    ...
}

@Service("tmsSagaService")
public class TmsSagaServiceImpl implements TmsSagaService {
    @DubboReference(version = "1.0.0", retries = 0)
    private TmsApi tmsApi;

    @Autowired
    private OrderFulfillDAO orderFulfillDAO;

    @Override
    public Boolean sendOut(ReceiveFulfillRequest request) {
        log.info("发货,request={}", JSONObject.toJSONString(request));

        //1.调用tms进行发货
        JsonResult<SendOutDTO> jsonResult = tmsApi.sendOut(buildSendOutRequest(request));
        if (!jsonResult.getSuccess()) {
            throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR);
        }
        log.info("发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));
        if (!jsonResult.getSuccess()) {
            throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR);
        }

        //2.查询履约单
        OrderFulfillDO orderFulfill = orderFulfillDAO.getOne(request.getOrderId());

        //3.存储物流单号
        String logisticsCode = jsonResult.getData().getLogisticsCode();
        orderFulfillDAO.saveLogisticsCode(orderFulfill.getFulfillId(), logisticsCode);
        return true;
    }

    @Override
    public Boolean sendOutCompensate(ReceiveFulfillRequest request) {
        log.info("补偿发货,request={}", JSONObject.toJSONString(request));

        //调用tms进行补偿发货
        JsonResult<Boolean> jsonResult = tmsApi.cancelSendOut(request.getOrderId());
        if (!jsonResult.getSuccess()) {
            throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR);
        }
        log.info("补偿发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));

        if (!jsonResult.getSuccess()) {
            throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR);
        }
        return true;
    }
    ...
}

9.履约流程的Seata Saga失败补偿流程

10.Seata Saga状态机流转原理

11.履约系统的Seata Saga代码实现

(1)进行流程定义

(2)配置状态机

(3)实现正向逆向方法

(4)驱动Saga状态机开始执行长事务

(1)进行流程定义

{
    "Name": "order_fulfill",
    "Comment": "订单履约流程",
    "StartState": "FulfillService",
    "Version": "1.0.0",
    "States": {
        //第一个节点
        "FulfillService": {
            "Type": "ServiceTask",
            "ServiceName": "fulfillSagaService",//所在服务
            "ServiceMethod": "createFulfillOrder",//正向执行方法
            "CompensateState": "CreateFulfillOrderCompensate",//逆向补偿方法
            "Next": "ChoiceWmsState",//下一个节点
            "Input": [//请求参数
                "$.[receiveFulfillRequest]"
            ],
            "Output": {//返回结果
                "CreateFulfillOrderResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU
                "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA
                "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN
            },
            "Catch": [
                {
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"//抛异常时执行的下一个节点
                }
            ]
        },
        //状态机,判断节点
        "ChoiceWmsState": {
            "Type": "Choice",
            "Choices": [
                {
                    "Expression": "[CreateFulfillOrderResult] == true",
                    "Next": "WmsService"//上一个节点返回true,就执行下一个节点
                }
            ],
            "Default": "Fail"
        },
        //第二个节点
        "WmsService": {
            "Type": "ServiceTask",
            "ServiceName": "wmsSageService",//所在服务
            "ServiceMethod": "pickGoods",//正向执行方法
            "CompensateState": "WmsPickGoodsCompensate",//逆向补偿方法
            "Next": "ChoiceTmsState",//下一个节点
            "Input": [//请求参数
                "$.[receiveFulfillRequest]"
            ],
            "Output": {//返回结果
                "WmsPickGoodsResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU
                "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA
                "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN
            },
            "Catch": [
                {
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"//抛异常时执行的下一个节点
                }
            ]
        },
        //状态机,判断节点
        "ChoiceTmsState": {
            "Type": "Choice",
            "Choices": [
                {
                    "Expression": "[WmsPickGoodsResult] == true",
                    "Next": "TmsService"//上一个节点返回true,就执行下一个节点
                }
            ],
            "Default": "Fail"
        },
        //第三个节点
        "TmsService": {
            "Type": "ServiceTask",
            "ServiceName": "tmsSagaService",//所在服务
            "ServiceMethod": "sendOut",//正向执行方法
            "CompensateState": "TmsSendOutCompensate",//逆向补偿方法
            "Input": [//请求参数
                "$.[receiveFulfillRequest]"
            ],
            "Output": {//返回结果
                "TmsSendOutResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU
                "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA
                "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN
            },
            "Catch": [
                {
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"//抛异常时执行的下一个节点
                }
            ],
            "Next": "Succeed"//下一个节点是Succeed节点
        },
        //第一个节点的补偿逻辑
        "CreateFulfillOrderCompensate": {
            "Type": "ServiceTask",
            "ServiceName": "fulfillSagaService",
            "ServiceMethod": "createFulfillOrderCompensate",
            "Input": [
                "$.[receiveFulfillRequest]"
            ]
        },
        //第二个节点的补偿逻辑
        "WmsPickGoodsCompensate": {
            "Type": "ServiceTask",
            "ServiceName": "wmsSageService",
            "ServiceMethod": "pickGoodsCompensate",
            "Input": [
                "$.[receiveFulfillRequest]"
            ]
        },
        //第三个节点的补偿逻辑
        "TmsSendOutCompensate": {
            "Type": "ServiceTask",
            "ServiceName": "tmsSagaService",
            "ServiceMethod": "sendOutCompensate",
            "Input": [
                "$.[receiveFulfillRequest]"
            ]
        },
        //补偿触发后的下一个节点
        "CompensationTrigger": {
            "Type": "CompensationTrigger",
            "Next": "Fail"//下一个节点是Fail节点
        },
        //成功节点
        "Succeed": {
            "Type": "Succeed"
        },
        //失败节点
        "Fail": {
            "Type": "Fail",
            "ErrorCode": "500",
            "Message": "订单履约异常!!"
        }
    }
}

(2)配置状态机

//配置数据源
@Configuration
public class DataSourceConfiguration {
    @ConfigurationProperties(prefix = "spring.datasource")
    @Bean
    public DruidDataSource druidDataSource() {
        return new DruidDataSource();
    }

    @Bean(name = "transactionManager")
    @Primary
    public DataSourceTransactionManager transactionManager(@Qualifier("druidDataSource") DruidDataSource druidDataSource) {
        return new DataSourceTransactionManager(druidDataSource);
    }
}

//配置Saga状态机
@Configuration
public class StateMachineConfiguration {
    @Bean
    public ThreadPoolExecutorFactoryBean threadExecutor() {
        ThreadPoolExecutorFactoryBean threadExecutor = new ThreadPoolExecutorFactoryBean();
        threadExecutor.setThreadNamePrefix("SAGA_ASYNC_EXE_");
        threadExecutor.setCorePoolSize(1);
        threadExecutor.setMaxPoolSize(20);
        return threadExecutor;
    }

    @Bean
    public DbStateMachineConfig dbStateMachineConfig(ThreadPoolExecutorFactoryBean threadExecutor, DruidDataSource druidDataSource) throws IOException {
        DbStateMachineConfig dbStateMachineConfig = new DbStateMachineConfig();
        //设置数据源
        dbStateMachineConfig.setDataSource(druidDataSource);
        //设置状态机的线程池
        dbStateMachineConfig.setThreadPoolExecutor((ThreadPoolExecutor) threadExecutor.getObject());
        //设置状态机的配置文件
        dbStateMachineConfig.setResources(new PathMatchingResourcePatternResolver().getResources("classpath*:statelang/*.json"));
        //设置开启异步化
        dbStateMachineConfig.setEnableAsync(true);
        //设置当前Saga长事务所属的分组
        dbStateMachineConfig.setTxServiceGroup("demo-eshop-fulfill-group");
        return dbStateMachineConfig;
    }

    //Saga状态机实例
    @Bean
    public ProcessCtrlStateMachineEngine stateMachineEngine(DbStateMachineConfig dbStateMachineConfig) {
        ProcessCtrlStateMachineEngine stateMachineEngine = new ProcessCtrlStateMachineEngine();
        stateMachineEngine.setStateMachineConfig(dbStateMachineConfig);
        return stateMachineEngine;
    }

    @Bean
    public StateMachineEngineHolder stateMachineEngineHolder(ProcessCtrlStateMachineEngine stateMachineEngine) {
        StateMachineEngineHolder stateMachineEngineHolder = new StateMachineEngineHolder();
        stateMachineEngineHolder.setStateMachineEngine(stateMachineEngine);
        return stateMachineEngineHolder;
    }
}

(3)实现正向逆向方法

@Service("fulfillSagaService")
public class FulfillSagaServiceImpl implements FulfillSagaService {
    @Autowired
    private FulfillService fulfillService;

    @Override
    public Boolean createFulfillOrder(ReceiveFulfillRequest request) {
        log.info("创建履约单,request={}", JSONObject.toJSONString(request));
        String fulfillException = request.getFulfillException();
        if (StringUtils.isNotBlank(fulfillException) && fulfillException.equals("true")) {
            throw new FulfillBizException("创建履约单异常!");
        }
        //创建履约单
        fulfillService.createFulfillOrder(request);
        return true;
    }

    @Override
    public Boolean createFulfillOrderCompensate(ReceiveFulfillRequest request) {
        log.info("补偿创建履约单,request={}", JSONObject.toJSONString(request));
        //取消履约单
        fulfillService.cancelFulfillOrder(request.getOrderId());
        log.info("补偿创建履约单结束,request={}", JSONObject.toJSONString(request));
        return true;
    }
}

@Service("wmsSageService")
public class WmsSageServiceImpl implements WmsSagaService {
    @DubboReference(version = "1.0.0", retries = 0)
    private WmsApi wmsApi;

    @Override
    public Boolean pickGoods(ReceiveFulfillRequest request) {
        log.info("捡货,request={}", JSONObject.toJSONString(request));
        //调用wms系统进行捡货
        JsonResult<PickDTO> jsonResult = wmsApi.pickGoods(buildPickGoodsRequest(request));
        log.info("捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));
        if (!jsonResult.getSuccess()) {
            throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR);
        }
        return true;
    }

    @Override
    public Boolean pickGoodsCompensate(ReceiveFulfillRequest request) {
        log.info("补偿捡货,request={}", JSONObject.toJSONString(request));
        //调用wms系统进行捡货
        JsonResult<Boolean> jsonResult = wmsApi.cancelPickGoods(request.getOrderId());
        log.info("补偿捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));
        if (!jsonResult.getSuccess()) {
            throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR);
        }
        return true;
    }
    ...
}

@Service("tmsSagaService")
public class TmsSagaServiceImpl implements TmsSagaService {
    @DubboReference(version = "1.0.0", retries = 0)
    private TmsApi tmsApi;

    @Autowired
    private OrderFulfillDAO orderFulfillDAO;

    @Override
    public Boolean sendOut(ReceiveFulfillRequest request) {
        log.info("发货,request={}", JSONObject.toJSONString(request));
        //1.调用tms进行发货
        JsonResult<SendOutDTO> jsonResult = tmsApi.sendOut(buildSendOutRequest(request));
        if (!jsonResult.getSuccess()) {
            throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR);
        }
        log.info("发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));
        if (!jsonResult.getSuccess()) {
            throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR);
        }
        //2.查询履约单
        OrderFulfillDO orderFulfill = orderFulfillDAO.getOne(request.getOrderId());
        //3.存储物流单号
        String logisticsCode = jsonResult.getData().getLogisticsCode();
        orderFulfillDAO.saveLogisticsCode(orderFulfill.getFulfillId(), logisticsCode);
        return true;
    }

    @Override
    public Boolean sendOutCompensate(ReceiveFulfillRequest request) {
        log.info("补偿发货,request={}", JSONObject.toJSONString(request));
        //调用tms进行补偿发货
        JsonResult<Boolean> jsonResult = tmsApi.cancelSendOut(request.getOrderId());
        if (!jsonResult.getSuccess()) {
            throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR);
        }
        log.info("补偿发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));
        if (!jsonResult.getSuccess()) {
            throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR);
        }
        return true;
    }
    ...
}

(4)驱动Saga状态机开始执行长事务

//监听并消费订单履约消息
@Component
public class TriggerOrderFulfillTopicListener implements MessageListenerConcurrently {
    @Autowired
    private FulfillService fulfillService;//履约服务

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            for (MessageExt messageExt : list) {
                String message = new String(messageExt.getBody());
                ReceiveFulfillRequest request = JSON.parseObject(message, ReceiveFulfillRequest.class);
                fulfillService.receiveOrderFulFill(request);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            log.error("consumer error", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

@Service
public class FulfillServiceImpl implements FulfillService {
    ...
    @Override
    public Boolean receiveOrderFulFill(ReceiveFulfillRequest request) {
        log.info("接受订单履约成功,request={}", JSONObject.toJSONString(request));
        String orderId = request.getOrderId();

        //加分布式锁(防止重复触发履约)
        String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId;
        boolean lock = redisLock.lock(key);
        if (!lock) {
            throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_ERROR);
        }

        try {
            //1.幂等:校验orderId是否已经履约过
            if (orderFulfilled(request.getOrderId())) {
                log.info("该订单已履约!!!,orderId={}", request.getOrderId());
                return true;
            }

            //2.获取Saga状态机,触发wms捡货和tms发货
            StateMachineEngine stateMachineEngine = (StateMachineEngine) springApplicationContext.getBean("stateMachineEngine");
            Map<String, Object> startParams = new HashMap<>(3);
            startParams.put("receiveFulfillRequest", request);

            //配置的Saga状态机json的name,位于/resources/statelang/order_fulfull.json
            String stateMachineName = "order_fulfill";
            log.info("开始触发saga流程,stateMachineName={}", stateMachineName);

            //通过状态机启动长事务流程
            StateMachineInstance inst = stateMachineEngine.startWithBusinessKey(stateMachineName, null, null, startParams);
            if (ExecutionStatus.SU.equals(inst.getStatus())) {
                log.info("订单履约流程执行完毕. xid={}", inst.getId());
            } else {
                log.error("订单履约流程执行异常. xid={}", inst.getId());
                throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_IS_ERROR);
            }
            return true;
        } finally {
            redisLock.unlock(key);
        }
    }
    ...
}

12.履约系统的Seata Saga空回滚和悬挂问题

(1)空回滚问题

(2)空悬挂问题

(1)空回滚问题

正向操作没有执行成功,但也进行了逆向操作。解决方案是:通过自定义Holder把正向操作记录下来,如果发现空回滚,则进行记录,且不能进行回滚。

(2)空悬挂问题

逆向操作先执行,之后才执行正向操作。解决方案是:通过自定义Holder把正向操作记录下来,确保执行逆向操作时,已经执行过正向操作。

Saga的空回滚和空悬挂与TCC的解决思路一样。

后端技术栈的基础修养 文章被收录于专栏

详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等

全部评论

相关推荐

点赞 评论 收藏
分享
吴offer选手:下午mt一来就告警说项目来不及,估计明天拿了权限就要参与开发了 已老实
实习生的蛐蛐区
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务