订单初版—7.支付和履约实现的重构文档
大纲
1.预支付到完成支付的业务流程
2.支付回调到推送履约的代码流程
3.支付回调到推送履约的双异步设计
4.发送订单履约的RocketMQ事务消息的原理
5.RocketMQ事务消息的实现原理
6.支付回调到发送履约的RocketMQ事务消息代码
7.履约场景引入Saga模式 + 状态机
8.履约流程的Seata Saga状态节点定义 + 节点流转
9.履约流程的Seata Saga失败补偿流程
10.Seata Saga状态机流转原理
11.履约系统的Seata Saga代码实现
12.履约系统的Seata Saga空回滚和悬挂问题
1.预支付到完成支付的业务流程
订单正向链路有三个核心环节:生成订单 + 支付 + 履约。
2.支付回调到推送履约的代码流程
(1)支付回调成功,更新订单状态 + 发送订单已完成支付的事务消息
(2)订单系统消费已完成支付的消息,更新订单履约状态 + 发送订单履约的消息
(3)履约系统消费触发订单履约的消息,进行具体的订单履约处理
(1)支付回调成功,更新订单状态 + 发送订单已完成支付的事务消息
@DubboService(version = "1.0.0", interfaceClass = OrderApi.class, retries = 0) public class OrderApiImpl implements OrderApi { @Autowired private OrderService orderService; ... //支付回调接口 @Override public JsonResult<Boolean> payCallback(PayCallbackRequest payCallbackRequest) { try { orderService.payCallback(payCallbackRequest); return JsonResult.buildSuccess(true); } catch (OrderBizException e) { log.error("biz error", e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("error", e); return JsonResult.buildError(e.getMessage()); } } ... } @Service public class OrderServiceImpl implements OrderService { @Autowired private OrderInfoDAO orderInfoDAO; @Autowired private OrderNoManager orderNoManager; @Autowired private DefaultProducer defaultProducer; @Autowired private RedisLock redisLock; ... //支付回调 //支付回调有2把分布式锁的原因说明:同一笔订单在同一时间只能支付or取消 //不可以同时对一笔订单,既发起支付,又发起取消 @Override public void payCallback(PayCallbackRequest payCallbackRequest) { //1.入参检查 checkPayCallbackRequestParam(payCallbackRequest); String orderId = payCallbackRequest.getOrderId(); Integer payAmount = payCallbackRequest.getPayAmount(); Integer payType = payCallbackRequest.getPayType(); List<String> redisKeyList = Lists.newArrayList(); //2.加支付分布式锁避免支付系统并发回调 String orderPayKey = RedisLockKeyConstants.ORDER_PAY_KEY + orderId; //加取消订单分布式锁避免支付和取消订单同时操作同一笔订单 String cancelOrderKey = RedisLockKeyConstants.CANCEL_KEY + orderId; redisKeyList.add(orderPayKey); redisKeyList.add(cancelOrderKey); boolean lock = redisLock.multiLock(redisKeyList); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_ERROR); } try { //从数据库中查询出当前订单信息 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); OrderPaymentDetailDO orderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(orderId); //3.校验参数 if (orderInfoDO == null || orderPaymentDetailDO == null) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL); } if (!payAmount.equals(orderInfoDO.getPayAmount())) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CALLBACK_PAY_AMOUNT_ERROR); } //4.异常场景判断 Integer orderStatus = orderInfoDO.getOrderStatus(); if (OrderStatusEnum.CREATED.getCode().equals(orderStatus)) { //如果订单状态是"已创建",直接更新订单状态为已支付,并发送事务消息 TransactionMQProducer transactionMQProducer = defaultProducer.getProducer(); transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { orderManager.updateOrderStatusPaid(payCallbackRequest, orderInfoDO, orderPaymentDetailDO); return LocalTransactionState.COMMIT_MESSAGE; } catch (BaseBizException e) { throw e; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } //回查接口 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //检查订单是否是已支付 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); if (orderInfoDO != null && OrderStatusEnum.PAID.getCode().equals(orderInfoDO.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); //发送 "订单已完成支付" 消息 sendPaidOrderSuccessMessage(transactionMQProducer, orderInfoDO); } else { //如果订单状态不是 "已创建" if (OrderStatusEnum.CANCELED.getCode().equals(orderStatus)) { //如果订单那状态是取消状态 Integer payStatus = orderPaymentDetailDO.getPayStatus(); if (PayStatusEnum.UNPAID.getCode().equals(payStatus)) { //调用退款 executeOrderRefund(orderInfoDO, orderPaymentDetailDO); throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_ERROR); } else if (PayStatusEnum.PAID.getCode().equals(payStatus)) { if (payType.equals(orderPaymentDetailDO.getPayType())) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_SAME_ERROR); } else { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_NO_SAME_ERROR); } } } else { //如果订单状态不是取消状态 if (PayStatusEnum.PAID.getCode().equals(orderPaymentDetailDO.getPayStatus())) { if (payType.equals(orderPaymentDetailDO.getPayType())) { return; } //调用退款 executeOrderRefund(orderInfoDO, orderPaymentDetailDO); throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_REPEAT_ERROR); } } } } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { //释放分布式锁 redisLock.unMultiLock(redisKeyList); } } //发送订单已完成支付消息,触发订单进行履约 private void sendPaidOrderSuccessMessage(TransactionMQProducer transactionMQProducer, OrderInfoDO orderInfoDO) throws MQClientException { String orderId = orderInfoDO.getOrderId(); PaidOrderSuccessMessage message = new PaidOrderSuccessMessage(); message.setOrderId(orderId); String topic = RocketMqConstant.PAID_ORDER_SUCCESS_TOPIC; byte[] body = JSON.toJSONString(message).getBytes(StandardCharsets.UTF_8); Message mq = new Message(topic, body); TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(mq, orderInfoDO); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_SEND_MQ_ERROR); } } ... } @Service public class OrderManagerImpl implements OrderManager { ... //支付回调更新订单状态 @Transactional(rollbackFor = Exception.class) @Override public void updateOrderStatusPaid(PayCallbackRequest payCallbackRequest, OrderInfoDO orderInfoDO, OrderPaymentDetailDO orderPaymentDetailDO) { //主单信息 String orderId = payCallbackRequest.getOrderId(); Integer preOrderStatus = orderInfoDO.getOrderStatus(); orderInfoDO.setOrderStatus(OrderStatusEnum.PAID.getCode()); orderInfoDAO.updateById(orderInfoDO); //主单支付信息 orderPaymentDetailDO.setPayStatus(PayStatusEnum.PAID.getCode()); orderPaymentDetailDAO.updateById(orderPaymentDetailDO); //新增订单状态变更日志 OrderOperateLogDO orderOperateLogDO = new OrderOperateLogDO(); orderOperateLogDO.setOrderId(orderId); orderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode()); orderOperateLogDO.setPreStatus(preOrderStatus); orderOperateLogDO.setCurrentStatus(orderInfoDO.getOrderStatus()); orderOperateLogDO.setRemark("订单支付回调操作" + orderOperateLogDO.getPreStatus() + "-" + orderOperateLogDO.getCurrentStatus()); orderOperateLogDAO.save(orderOperateLogDO); //判断是否存在子订单 List<OrderInfoDO> subOrderInfoDOList = orderInfoDAO.listByParentOrderId(orderId); if (subOrderInfoDOList != null && !subOrderInfoDOList.isEmpty()) { //先将主订单状态设置为无效订单 Integer newPreOrderStatus = orderInfoDO.getOrderStatus(); orderInfoDO.setOrderStatus(OrderStatusEnum.INVALID.getCode()); orderInfoDAO.updateById(orderInfoDO); //新增订单状态变更日志 OrderOperateLogDO newOrderOperateLogDO = new OrderOperateLogDO(); newOrderOperateLogDO.setOrderId(orderId); newOrderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode()); newOrderOperateLogDO.setPreStatus(newPreOrderStatus); newOrderOperateLogDO.setCurrentStatus(OrderStatusEnum.INVALID.getCode()); orderOperateLogDO.setRemark("订单支付回调操作,主订单状态变更" + newOrderOperateLogDO.getPreStatus() + "-" + newOrderOperateLogDO.getCurrentStatus()); orderOperateLogDAO.save(newOrderOperateLogDO); //再更新子订单的状态 for (OrderInfoDO subOrderInfo : subOrderInfoDOList) { Integer subPreOrderStatus = subOrderInfo.getOrderStatus(); subOrderInfo.setOrderStatus(OrderStatusEnum.PAID.getCode()); orderInfoDAO.updateById(subOrderInfo); //更新子订单的支付明细状态 String subOrderId = subOrderInfo.getOrderId(); OrderPaymentDetailDO subOrderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(subOrderId); if (subOrderPaymentDetailDO != null) { subOrderPaymentDetailDO.setPayStatus(PayStatusEnum.PAID.getCode()); orderPaymentDetailDAO.updateById(subOrderPaymentDetailDO); } //新增订单状态变更日志 OrderOperateLogDO subOrderOperateLogDO = new OrderOperateLogDO(); subOrderOperateLogDO.setOrderId(subOrderId); subOrderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode()); subOrderOperateLogDO.setPreStatus(subPreOrderStatus); subOrderOperateLogDO.setCurrentStatus(OrderStatusEnum.PAID.getCode()); orderOperateLogDO.setRemark("订单支付回调操作,子订单状态变更" + subOrderOperateLogDO.getPreStatus() + "-" + subOrderOperateLogDO.getCurrentStatus()); orderOperateLogDAO.save(subOrderOperateLogDO); } } } ... }
(2)订单系统消费已完成支付的消息,更新订单履约状态 + 发送订单履约的消息
//订单系统进行消费 @Configuration public class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties; //订单完成支付消息消费者 @Bean("paidOrderSuccessConsumer") public DefaultMQPushConsumer paidOrderSuccessConsumer(PaidOrderSuccessListener paidOrderSuccessListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PAID_ORDER_SUCCESS_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(PAID_ORDER_SUCCESS_TOPIC, "*"); consumer.registerMessageListener(paidOrderSuccessListener); consumer.start(); return consumer; } ... } //订单系统监听订单支付完成的消息 @Component public class PaidOrderSuccessListener implements MessageListenerConcurrently { @Autowired private OrderInfoDAO orderInfoDAO; @Autowired private OrderFulFillService orderFulFillService; @Autowired private RedisLock redisLock; @Autowired private DefaultProducer defaultProducer; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); PaidOrderSuccessMessage paidOrderSuccessMessage = JSON.parseObject(message, PaidOrderSuccessMessage.class); String orderId = paidOrderSuccessMessage.getOrderId(); log.info("触发订单履约,orderId:{}", orderId); OrderInfoDO order = orderInfoDAO.getByOrderId(orderId); if (Objects.isNull(order)) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL); } //1.加分布式锁 + 里面的履约前置状态校验防止消息重复消费 String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId; if (!redisLock.lock(key)) { log.error("order has not acquired lock,cannot fulfill, orderId={}", orderId); throw new BaseBizException(OrderErrorCodeEnum.ORDER_FULFILL_ERROR); } try { //2.进行订单履约逻辑 TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { orderFulFillService.triggerOrderFulFill(orderId); return LocalTransactionState.COMMIT_MESSAGE; } catch (BaseBizException e) { throw e; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } //回查接口 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //检查订单是否"已履约"状态 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); if (orderInfoDO != null && OrderStatusEnum.FULFILL.getCode().equals(orderInfoDO.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); ReceiveFulfillRequest receiveFulfillRequest = orderFulFillService.buildReceiveFulFillRequest(order); String topic = TRIGGER_ORDER_FULFILL_TOPIC; byte[] body = JSON.toJSONString(receiveFulfillRequest).getBytes(StandardCharsets.UTF_8); Message mq = new Message(topic, body); producer.sendMessageInTransaction(mq, order); } finally { redisLock.unlock(key); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); //本地业务逻辑执行失败,触发消息重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } @Service public class OrderFulFillServiceImpl implements OrderFulFillService { @Autowired private OrderInfoDAO orderInfoDAO; @Autowired private OrderOperateLogDAO orderOperateLogDAO; ... @Transactional(rollbackFor = Exception.class) @Override public void triggerOrderFulFill(String orderId) throws OrderBizException { //1.查询订单 OrderInfoDO order = orderInfoDAO.getByOrderId(orderId); if (Objects.isNull(order)) { return; } //2.校验订单是否已支付 OrderStatusEnum orderStatus = OrderStatusEnum.getByCode(order.getOrderStatus()); if (!OrderStatusEnum.PAID.equals(orderStatus)) { log.info("order has not been paid,cannot fulfill, orderId={}", order.getOrderId()); return; } //3.更新订单状态为:已履约 orderInfoDAO.updateOrderStatus(orderId, OrderStatusEnum.PAID.getCode(), OrderStatusEnum.FULFILL.getCode()); //4.并插入一条订单变更记录 orderOperateLogDAO.save(orderOperateLogFactory.get(order, OrderStatusChangeEnum.ORDER_FULFILLED)); } ... }
(3)履约系统消费触发订单履约的消息,进行具体的订单履约处理
//履约系统消费 @Configuration public class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties; //触发订单履约消息消费者 @Bean("triggerOrderFulfillConsumer") public DefaultMQPushConsumer triggerOrderFulfillConsumer(TriggerOrderFulfillTopicListener triggerOrderFulfillTopicListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(TRIGGER_ORDER_FULFILL_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(TRIGGER_ORDER_FULFILL_TOPIC, "*"); consumer.registerMessageListener(triggerOrderFulfillTopicListener); consumer.start(); return consumer; } ... } //监听并消费订单履约消息 @Component public class TriggerOrderFulfillTopicListener implements MessageListenerConcurrently { @Autowired private FulfillService fulfillService;//履约服务 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); ReceiveFulfillRequest request = JSON.parseObject(message, ReceiveFulfillRequest.class); fulfillService.receiveOrderFulFill(request); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } @Service public class FulfillServiceImpl implements FulfillService { ... @Override public Boolean receiveOrderFulFill(ReceiveFulfillRequest request) { log.info("接受订单履约成功,request={}", JSONObject.toJSONString(request)); String orderId = request.getOrderId(); //加分布式锁(防止重复触发履约) String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId; boolean lock = redisLock.lock(key); if (!lock) { throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_ERROR); } try { //1.幂等:校验orderId是否已经履约过 if (orderFulfilled(request.getOrderId())) { log.info("该订单已履约!!!,orderId={}", request.getOrderId()); return true; } //2.saga状态机,触发wms捡货和tms发货 StateMachineEngine stateMachineEngine = (StateMachineEngine) springApplicationContext.getBean("stateMachineEngine"); Map<String, Object> startParams = new HashMap<>(3); startParams.put("receiveFulfillRequest", request); //配置的saga状态机 json的name //位于/resources/statelang/order_fulfull.json String stateMachineName = "order_fulfill"; log.info("开始触发saga流程,stateMachineName={}", stateMachineName); StateMachineInstance inst = stateMachineEngine.startWithBusinessKey(stateMachineName, null, null, startParams); if (ExecutionStatus.SU.equals(inst.getStatus())) { log.info("订单履约流程执行完毕. xid={}", inst.getId()); } else { log.error("订单履约流程执行异常. xid={}", inst.getId()); throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_IS_ERROR); } return true; } finally { redisLock.unlock(key); } } ... }
3.支付回调到推送履约的双异步设计
(1)双异步设计的原因
(2)履约系统通过MQ对订单进行异步履约
(3)双异步设计的目的总结
(1)双异步设计的原因
第一个异步:订单支付回调后,订单系统先发一个支付完成的消息到MQ。
第二个异步:订单系统消费到支付完成消息后进行更新,再发消息到MQ。
为什么需要双异步:为什么订单系统收到支付回调后要先发一个消息到MQ,然后再由自己监听消费,并发送真正触发履约的消息到MQ。
由于更新订单状态和更新订单履约状态都是由订单系统执行的,为什么不按如下处理:订单系统收到支付回调后,先把订单状态更新为已完成支付,然后紧接着把订单履约状态更新为已履约,最后再发送订单履约消息到MQ让履约系统消费该消息进行发货。
原因一:从业务语义来说,支付回调后的主要操作其实是更新订单状态为支付完成,而对订单进行履约并不属于支付回调的主要工作。支付回调最多是触发履约系统进行订单履约,所以支付回调成功后,在支付回调中更新订单履约状态并不合适。因此从业务语义来说,支付回调后发送一个订单支付完成的消息到MQ即可,不应有更多其他操作。
原因二:从扩展性角度来说,由于订单支付完成是一个非常关键的消息,所以可能以后会在订单支付完成后,需要进行更多的业务处理。也就是说,可能以后会有更多系统需要监听和消费订单支付完成消息,如会员系统、营销系统、数据分析系统等都需要消费订单支付完成的消息。
(2)履约系统通过MQ对订单进行异步履约
因为对订单进行履约,涉及到仓库、仓储、库存的调度,需要通知物流公司进行配送等,这个过程非常耗时及复杂,所以触发履约系统对订单进行履约不能使用同步来实现。
(3)双异步设计的目的总结
第一个异步是为了可扩展
第二个异步是为了提升性能
4.发送订单履约的RocketMQ事务消息的原理
(1)如何让更新数据库 + 发送消息到MQ是一致的
(2)发送订单履约的RocketMQ事务消息的原理
(1)如何让更新数据库 + 发送消息到MQ是一致的
问题一:如何保证更新订单状态为已完成 + 发送订单支付完成消息是一致的。也就是更新数据库 + 发送消息到MQ,要么同时成功,要么同时失败。
问题二:如何保证更新订单履约状态为已履约 + 发送触发订单履约消息是一致的。同样是更新数据库 + 发送消息到MQ,要么同时成功,要么同时失败。
为了解决更新数据库 + 发送消息到MQ是一致的,可以使用RocketMQ的事务机制。
(2)发送订单履约的RocketMQ事务消息的原理
5.RocketMQ事务消息的实现原理
6.支付回调到发送履约的RocketMQ事务消息代码
(1)支付回调成功后发送支付完成的事务消息
(2)消费订单支付完成消息时发送触发订单履约的事务消息
(1)支付回调成功后发送支付完成的事务消息
@Service public class OrderServiceImpl implements OrderService { ... //支付回调 //支付回调有2把分布式锁的原因说明:同一笔订单在同一时间只能支付or取消 //不可以同时对一笔订单,既发起支付,又发起取消 @Override public void payCallback(PayCallbackRequest payCallbackRequest) { ... try { //从数据库中查询出当前订单信息 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); OrderPaymentDetailDO orderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(orderId); //3.校验参数 if (orderInfoDO == null || orderPaymentDetailDO == null) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL); } if (!payAmount.equals(orderInfoDO.getPayAmount())) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CALLBACK_PAY_AMOUNT_ERROR); } //4.异常场景判断 Integer orderStatus = orderInfoDO.getOrderStatus(); if (OrderStatusEnum.CREATED.getCode().equals(orderStatus)) { //如果订单状态是"已创建",直接更新订单状态为已支付,并发送事务消息 TransactionMQProducer transactionMQProducer = defaultProducer.getProducer(); transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { orderManager.updateOrderStatusPaid(payCallbackRequest, orderInfoDO, orderPaymentDetailDO); return LocalTransactionState.COMMIT_MESSAGE; } catch (BaseBizException e) { throw e; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } //回查 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //检查订单是否是已支付 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); if (orderInfoDO != null && OrderStatusEnum.PAID.getCode().equals(orderInfoDO.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); //发送 "订单已完成支付" 消息 sendPaidOrderSuccessMessage(transactionMQProducer, orderInfoDO); } else { ... } } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { //释放分布式锁 redisLock.unMultiLock(redisKeyList); } } //发送订单已完成支付消息,触发订单进行履约 private void sendPaidOrderSuccessMessage(TransactionMQProducer transactionMQProducer, OrderInfoDO orderInfoDO) throws MQClientException { String orderId = orderInfoDO.getOrderId(); PaidOrderSuccessMessage message = new PaidOrderSuccessMessage(); message.setOrderId(orderId); String topic = RocketMqConstant.PAID_ORDER_SUCCESS_TOPIC; byte[] body = JSON.toJSONString(message).getBytes(StandardCharsets.UTF_8); Message mq = new Message(topic, body); TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(mq, orderInfoDO); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_SEND_MQ_ERROR); } } ... }
(2)消费订单支付完成消息时发送触发订单履约的事务消息
//订单系统监听订单支付完成后的消息 @Component public class PaidOrderSuccessListener implements MessageListenerConcurrently { @Autowired private OrderInfoDAO orderInfoDAO; @Autowired private OrderFulFillService orderFulFillService; @Autowired private RedisLock redisLock; @Autowired private DefaultProducer defaultProducer; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); PaidOrderSuccessMessage paidOrderSuccessMessage = JSON.parseObject(message, PaidOrderSuccessMessage.class); String orderId = paidOrderSuccessMessage.getOrderId(); log.info("触发订单履约,orderId:{}", orderId); OrderInfoDO order = orderInfoDAO.getByOrderId(orderId); if (Objects.isNull(order)) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL); } //1.加分布式锁 + 里面的履约前置状态校验防止消息重复消费 String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId; if (!redisLock.lock(key)) { log.error("order has not acquired lock,cannot fulfill, orderId={}", orderId); throw new BaseBizException(OrderErrorCodeEnum.ORDER_FULFILL_ERROR); } try { //2.进行订单履约逻辑 TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { orderFulFillService.triggerOrderFulFill(orderId); return LocalTransactionState.COMMIT_MESSAGE; } catch (BaseBizException e) { throw e; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //检查订单是否"已履约"状态 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); if (orderInfoDO != null && OrderStatusEnum.FULFILL.getCode().equals(orderInfoDO.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); ReceiveFulfillRequest receiveFulfillRequest = orderFulFillService.buildReceiveFulFillRequest(order); String topic = TRIGGER_ORDER_FULFILL_TOPIC; byte[] body = JSON.toJSONString(receiveFulfillRequest).getBytes(StandardCharsets.UTF_8); Message mq = new Message(topic, body); producer.sendMessageInTransaction(mq, order); } finally { redisLock.unlock(key); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); //本地业务逻辑执行失败,触发消息重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }
7.履约场景引入Saga模式 + 状态机
(1)进行订单履约时使用Saga模式的流程简介
(2)Saga模式的流程定义文件和状态机
(3)Saga模式需要自定义补偿逻辑
(4)Saga模式的优点和缺点及适用场景
(1)进行订单履约时使用Saga模式的流程简介
Saga模式运行时,会按照一个顺序流程去运行长事务。比如第一步更新订单履约数据,第二步调度发货,第三步物流配送。如果运行到某一步时出现异常,则会从这一步开始往回顺序执行补偿逻辑。比如先补偿第三步,再补偿第二步,最后补偿第一步。
(2)Saga模式的流程定义文件和状态机
使用Saga模式的分布式事务在启动时,需要有一个Saga模式的流程定义文件。比如履约系统需要有一个本地文件:Saga模式的流程定义文件。
在该文件中,需定义好整个长事务的流程:第一步执行的逻辑是A,第二步执行的逻辑是B,第三步执行的逻辑是C。第一步回滚的逻辑是a,第二步回滚的逻辑是b,第三步回滚的逻辑是c。
Saga模式启动时,也需要向Seata Server注册全局事务XID。Saga模式启动后,便会根据状态机来判断流程定义文件中的每一步,应该执行正向还是执行逆向,也就是会通过状态机来控制正向和逆向。
(3)Saga模式需要自定义补偿逻辑
一.AT模式下的补偿逻辑
分支事务写入本地事务的数据前,会先生成undo log。当某分支事务出现异常,已提交分支事务需要进行回滚补偿时,会根据生成的undo log来进行回滚补偿。
二.TCC模式下的补偿逻辑
先执行try逻辑,如果try逻辑执行成功则执行commit逻辑来进行提交,如果try逻辑执行失败则执行cancel逻辑进行回滚补偿。
三.Saga模式的补偿逻辑
每个分支事务都有正向的逻辑,每一个正向逻辑都会搭配一个逆向补偿逻辑,Saga模式需要我们自定义实现这些逆向补偿逻辑。
(4)Saga模式的优点和缺点及适用场景
优点是:执行正向链路的分支事务时,不再需要获取全局锁。缺点是:由于没有全局锁,当多个分布式事务并发执行时,不能写隔离。Saga模式的应用场景是:一个业务链路里,需要很多系统层层调用。
一.长事务使用AT模式可能导致持有全局锁时间长
这种长事务如果使用AT模式,由于需要获取全局锁,而长事务链路又过长,从而会导致持有全局锁的时间也过长。即便对某数据的全局锁竞争并不激烈,但也需要过长时间才能释放,所以其并发能力特别低。
二.长事务使用TCC模式需要改造工作量较大
这种长事务如果使用TCC模式,则会对代码产生很大的侵入性。因为需要对长事务接口改造成三个接口:try、commit、cancel。而且长事务的业务链路比较长,改造业务链路的各接口工作量过大了。所以常见于异构存储的场景,不太常见于长事务。
三.Saga模式特别适用于老系统下的长事务
因此Saga模式适用于长事务的场景,特别是老系统下的长事务。如果不希望对老系统有较大侵入性的代码改造,而且希望稍微修改一点代码就能让老系统支持分布式事务,那么就可以使用Saga模式。因为Saga模式可以独立地对老系统定义补偿逻辑,不需要像TCC模式那样去改造接口,也不用像AT模式那样侵入性添加undo log表。
8.履约流程的Seata Saga状态节点定义 + 节点流转
(1)流程定义文件分析
(2)流程定义文件中对应的方法
(1)流程定义文件分析
在履约系统的resources/statelang目录下,有一个JSON文件,该JSON文件order_fullfill.json便是Saga模式流程定义文件。
{ "Name": "order_fulfill", "Comment": "订单履约流程", "StartState": "FulfillService", "Version": "1.0.0", "States": { //第一个节点 "FulfillService": { "Type": "ServiceTask", "ServiceName": "fulfillSagaService",//所在服务 "ServiceMethod": "createFulfillOrder",//正向执行方法 "CompensateState": "CreateFulfillOrderCompensate",//逆向补偿方法 "Next": "ChoiceWmsState",//下一个节点 "Input": [//请求参数 "$.[receiveFulfillRequest]" ], "Output": {//返回结果 "CreateFulfillOrderResult": "$.#root" }, "Status": { "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN }, "Catch": [ { "Exceptions": [ "java.lang.Throwable" ], "Next": "CompensationTrigger"//抛异常时执行的下一个节点 } ] }, //状态机,判断节点 "ChoiceWmsState": { "Type": "Choice", "Choices": [ { "Expression": "[CreateFulfillOrderResult] == true", "Next": "WmsService"//上一个节点返回true,就执行下一个节点 } ], "Default": "Fail" }, //第二个节点 "WmsService": { "Type": "ServiceTask", "ServiceName": "wmsSageService",//所在服务 "ServiceMethod": "pickGoods",//正向执行方法 "CompensateState": "WmsPickGoodsCompensate",//逆向补偿方法 "Next": "ChoiceTmsState",//下一个节点 "Input": [//请求参数 "$.[receiveFulfillRequest]" ], "Output": {//返回结果 "WmsPickGoodsResult": "$.#root" }, "Status": { "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN }, "Catch": [ { "Exceptions": [ "java.lang.Throwable" ], "Next": "CompensationTrigger"//抛异常时执行的下一个节点 } ] }, //状态机,判断节点 "ChoiceTmsState": { "Type": "Choice", "Choices": [ { "Expression": "[WmsPickGoodsResult] == true", "Next": "TmsService"//上一个节点返回true,就执行下一个节点 } ], "Default": "Fail" }, //第三个节点 "TmsService": { "Type": "ServiceTask", "ServiceName": "tmsSagaService",//所在服务 "ServiceMethod": "sendOut",//正向执行方法 "CompensateState": "TmsSendOutCompensate",//逆向补偿方法 "Input": [//请求参数 "$.[receiveFulfillRequest]" ], "Output": {//返回结果 "TmsSendOutResult": "$.#root" }, "Status": { "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN }, "Catch": [ { "Exceptions": [ "java.lang.Throwable" ], "Next": "CompensationTrigger"//抛异常时执行的下一个节点 } ], "Next": "Succeed"//下一个节点是Succeed节点 }, //第一个节点的补偿逻辑 "CreateFulfillOrderCompensate": { "Type": "ServiceTask", "ServiceName": "fulfillSagaService", "ServiceMethod": "createFulfillOrderCompensate", "Input": [ "$.[receiveFulfillRequest]" ] }, //第二个节点的补偿逻辑 "WmsPickGoodsCompensate": { "Type": "ServiceTask", "ServiceName": "wmsSageService", "ServiceMethod": "pickGoodsCompensate", "Input": [ "$.[receiveFulfillRequest]" ] }, //第三个节点的补偿逻辑 "TmsSendOutCompensate": { "Type": "ServiceTask", "ServiceName": "tmsSagaService", "ServiceMethod": "sendOutCompensate", "Input": [ "$.[receiveFulfillRequest]" ] }, //补偿触发后的下一个节点 "CompensationTrigger": { "Type": "CompensationTrigger", "Next": "Fail"//下一个节点是Fail节点 }, //成功节点 "Succeed": { "Type": "Succeed" }, //失败节点 "Fail": { "Type": "Fail", "ErrorCode": "500", "Message": "订单履约异常!!" } } }
(2)流程定义文件中对应的方法
需要注意的是:触发调用各节点补偿方法时,都是由履约系统通过本地或者RPC来调用的。
@Service("fulfillSagaService") public class FulfillSagaServiceImpl implements FulfillSagaService { @Autowired private FulfillService fulfillService; @Override public Boolean createFulfillOrder(ReceiveFulfillRequest request) { log.info("创建履约单,request={}", JSONObject.toJSONString(request)); String fulfillException = request.getFulfillException(); if (StringUtils.isNotBlank(fulfillException) && fulfillException.equals("true")) { throw new FulfillBizException("创建履约单异常!"); } //创建履约单 fulfillService.createFulfillOrder(request); return true; } @Override public Boolean createFulfillOrderCompensate(ReceiveFulfillRequest request) { log.info("补偿创建履约单,request={}", JSONObject.toJSONString(request)); //取消履约单 fulfillService.cancelFulfillOrder(request.getOrderId()); log.info("补偿创建履约单结束,request={}", JSONObject.toJSONString(request)); return true; } } @Service("wmsSageService") public class WmsSageServiceImpl implements WmsSagaService { @DubboReference(version = "1.0.0", retries = 0) private WmsApi wmsApi; @Override public Boolean pickGoods(ReceiveFulfillRequest request) { log.info("捡货,request={}", JSONObject.toJSONString(request)); //调用wms系统进行捡货 JsonResult<PickDTO> jsonResult = wmsApi.pickGoods(buildPickGoodsRequest(request)); log.info("捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR); } return true; } @Override public Boolean pickGoodsCompensate(ReceiveFulfillRequest request) { log.info("补偿捡货,request={}", JSONObject.toJSONString(request)); //调用wms系统进行捡货 JsonResult<Boolean> jsonResult = wmsApi.cancelPickGoods(request.getOrderId()); log.info("补偿捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR); } return true; } ... } @Service("tmsSagaService") public class TmsSagaServiceImpl implements TmsSagaService { @DubboReference(version = "1.0.0", retries = 0) private TmsApi tmsApi; @Autowired private OrderFulfillDAO orderFulfillDAO; @Override public Boolean sendOut(ReceiveFulfillRequest request) { log.info("发货,request={}", JSONObject.toJSONString(request)); //1.调用tms进行发货 JsonResult<SendOutDTO> jsonResult = tmsApi.sendOut(buildSendOutRequest(request)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR); } log.info("发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR); } //2.查询履约单 OrderFulfillDO orderFulfill = orderFulfillDAO.getOne(request.getOrderId()); //3.存储物流单号 String logisticsCode = jsonResult.getData().getLogisticsCode(); orderFulfillDAO.saveLogisticsCode(orderFulfill.getFulfillId(), logisticsCode); return true; } @Override public Boolean sendOutCompensate(ReceiveFulfillRequest request) { log.info("补偿发货,request={}", JSONObject.toJSONString(request)); //调用tms进行补偿发货 JsonResult<Boolean> jsonResult = tmsApi.cancelSendOut(request.getOrderId()); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR); } log.info("补偿发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR); } return true; } ... }
9.履约流程的Seata Saga失败补偿流程
10.Seata Saga状态机流转原理
11.履约系统的Seata Saga代码实现
(1)进行流程定义
(2)配置状态机
(3)实现正向逆向方法
(4)驱动Saga状态机开始执行长事务
(1)进行流程定义
{ "Name": "order_fulfill", "Comment": "订单履约流程", "StartState": "FulfillService", "Version": "1.0.0", "States": { //第一个节点 "FulfillService": { "Type": "ServiceTask", "ServiceName": "fulfillSagaService",//所在服务 "ServiceMethod": "createFulfillOrder",//正向执行方法 "CompensateState": "CreateFulfillOrderCompensate",//逆向补偿方法 "Next": "ChoiceWmsState",//下一个节点 "Input": [//请求参数 "$.[receiveFulfillRequest]" ], "Output": {//返回结果 "CreateFulfillOrderResult": "$.#root" }, "Status": { "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN }, "Catch": [ { "Exceptions": [ "java.lang.Throwable" ], "Next": "CompensationTrigger"//抛异常时执行的下一个节点 } ] }, //状态机,判断节点 "ChoiceWmsState": { "Type": "Choice", "Choices": [ { "Expression": "[CreateFulfillOrderResult] == true", "Next": "WmsService"//上一个节点返回true,就执行下一个节点 } ], "Default": "Fail" }, //第二个节点 "WmsService": { "Type": "ServiceTask", "ServiceName": "wmsSageService",//所在服务 "ServiceMethod": "pickGoods",//正向执行方法 "CompensateState": "WmsPickGoodsCompensate",//逆向补偿方法 "Next": "ChoiceTmsState",//下一个节点 "Input": [//请求参数 "$.[receiveFulfillRequest]" ], "Output": {//返回结果 "WmsPickGoodsResult": "$.#root" }, "Status": { "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN }, "Catch": [ { "Exceptions": [ "java.lang.Throwable" ], "Next": "CompensationTrigger"//抛异常时执行的下一个节点 } ] }, //状态机,判断节点 "ChoiceTmsState": { "Type": "Choice", "Choices": [ { "Expression": "[WmsPickGoodsResult] == true", "Next": "TmsService"//上一个节点返回true,就执行下一个节点 } ], "Default": "Fail" }, //第三个节点 "TmsService": { "Type": "ServiceTask", "ServiceName": "tmsSagaService",//所在服务 "ServiceMethod": "sendOut",//正向执行方法 "CompensateState": "TmsSendOutCompensate",//逆向补偿方法 "Input": [//请求参数 "$.[receiveFulfillRequest]" ], "Output": {//返回结果 "TmsSendOutResult": "$.#root" }, "Status": { "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN }, "Catch": [ { "Exceptions": [ "java.lang.Throwable" ], "Next": "CompensationTrigger"//抛异常时执行的下一个节点 } ], "Next": "Succeed"//下一个节点是Succeed节点 }, //第一个节点的补偿逻辑 "CreateFulfillOrderCompensate": { "Type": "ServiceTask", "ServiceName": "fulfillSagaService", "ServiceMethod": "createFulfillOrderCompensate", "Input": [ "$.[receiveFulfillRequest]" ] }, //第二个节点的补偿逻辑 "WmsPickGoodsCompensate": { "Type": "ServiceTask", "ServiceName": "wmsSageService", "ServiceMethod": "pickGoodsCompensate", "Input": [ "$.[receiveFulfillRequest]" ] }, //第三个节点的补偿逻辑 "TmsSendOutCompensate": { "Type": "ServiceTask", "ServiceName": "tmsSagaService", "ServiceMethod": "sendOutCompensate", "Input": [ "$.[receiveFulfillRequest]" ] }, //补偿触发后的下一个节点 "CompensationTrigger": { "Type": "CompensationTrigger", "Next": "Fail"//下一个节点是Fail节点 }, //成功节点 "Succeed": { "Type": "Succeed" }, //失败节点 "Fail": { "Type": "Fail", "ErrorCode": "500", "Message": "订单履约异常!!" } } }
(2)配置状态机
//配置数据源 @Configuration public class DataSourceConfiguration { @ConfigurationProperties(prefix = "spring.datasource") @Bean public DruidDataSource druidDataSource() { return new DruidDataSource(); } @Bean(name = "transactionManager") @Primary public DataSourceTransactionManager transactionManager(@Qualifier("druidDataSource") DruidDataSource druidDataSource) { return new DataSourceTransactionManager(druidDataSource); } } //配置Saga状态机 @Configuration public class StateMachineConfiguration { @Bean public ThreadPoolExecutorFactoryBean threadExecutor() { ThreadPoolExecutorFactoryBean threadExecutor = new ThreadPoolExecutorFactoryBean(); threadExecutor.setThreadNamePrefix("SAGA_ASYNC_EXE_"); threadExecutor.setCorePoolSize(1); threadExecutor.setMaxPoolSize(20); return threadExecutor; } @Bean public DbStateMachineConfig dbStateMachineConfig(ThreadPoolExecutorFactoryBean threadExecutor, DruidDataSource druidDataSource) throws IOException { DbStateMachineConfig dbStateMachineConfig = new DbStateMachineConfig(); //设置数据源 dbStateMachineConfig.setDataSource(druidDataSource); //设置状态机的线程池 dbStateMachineConfig.setThreadPoolExecutor((ThreadPoolExecutor) threadExecutor.getObject()); //设置状态机的配置文件 dbStateMachineConfig.setResources(new PathMatchingResourcePatternResolver().getResources("classpath*:statelang/*.json")); //设置开启异步化 dbStateMachineConfig.setEnableAsync(true); //设置当前Saga长事务所属的分组 dbStateMachineConfig.setTxServiceGroup("demo-eshop-fulfill-group"); return dbStateMachineConfig; } //Saga状态机实例 @Bean public ProcessCtrlStateMachineEngine stateMachineEngine(DbStateMachineConfig dbStateMachineConfig) { ProcessCtrlStateMachineEngine stateMachineEngine = new ProcessCtrlStateMachineEngine(); stateMachineEngine.setStateMachineConfig(dbStateMachineConfig); return stateMachineEngine; } @Bean public StateMachineEngineHolder stateMachineEngineHolder(ProcessCtrlStateMachineEngine stateMachineEngine) { StateMachineEngineHolder stateMachineEngineHolder = new StateMachineEngineHolder(); stateMachineEngineHolder.setStateMachineEngine(stateMachineEngine); return stateMachineEngineHolder; } }
(3)实现正向逆向方法
@Service("fulfillSagaService") public class FulfillSagaServiceImpl implements FulfillSagaService { @Autowired private FulfillService fulfillService; @Override public Boolean createFulfillOrder(ReceiveFulfillRequest request) { log.info("创建履约单,request={}", JSONObject.toJSONString(request)); String fulfillException = request.getFulfillException(); if (StringUtils.isNotBlank(fulfillException) && fulfillException.equals("true")) { throw new FulfillBizException("创建履约单异常!"); } //创建履约单 fulfillService.createFulfillOrder(request); return true; } @Override public Boolean createFulfillOrderCompensate(ReceiveFulfillRequest request) { log.info("补偿创建履约单,request={}", JSONObject.toJSONString(request)); //取消履约单 fulfillService.cancelFulfillOrder(request.getOrderId()); log.info("补偿创建履约单结束,request={}", JSONObject.toJSONString(request)); return true; } } @Service("wmsSageService") public class WmsSageServiceImpl implements WmsSagaService { @DubboReference(version = "1.0.0", retries = 0) private WmsApi wmsApi; @Override public Boolean pickGoods(ReceiveFulfillRequest request) { log.info("捡货,request={}", JSONObject.toJSONString(request)); //调用wms系统进行捡货 JsonResult<PickDTO> jsonResult = wmsApi.pickGoods(buildPickGoodsRequest(request)); log.info("捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR); } return true; } @Override public Boolean pickGoodsCompensate(ReceiveFulfillRequest request) { log.info("补偿捡货,request={}", JSONObject.toJSONString(request)); //调用wms系统进行捡货 JsonResult<Boolean> jsonResult = wmsApi.cancelPickGoods(request.getOrderId()); log.info("补偿捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR); } return true; } ... } @Service("tmsSagaService") public class TmsSagaServiceImpl implements TmsSagaService { @DubboReference(version = "1.0.0", retries = 0) private TmsApi tmsApi; @Autowired private OrderFulfillDAO orderFulfillDAO; @Override public Boolean sendOut(ReceiveFulfillRequest request) { log.info("发货,request={}", JSONObject.toJSONString(request)); //1.调用tms进行发货 JsonResult<SendOutDTO> jsonResult = tmsApi.sendOut(buildSendOutRequest(request)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR); } log.info("发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR); } //2.查询履约单 OrderFulfillDO orderFulfill = orderFulfillDAO.getOne(request.getOrderId()); //3.存储物流单号 String logisticsCode = jsonResult.getData().getLogisticsCode(); orderFulfillDAO.saveLogisticsCode(orderFulfill.getFulfillId(), logisticsCode); return true; } @Override public Boolean sendOutCompensate(ReceiveFulfillRequest request) { log.info("补偿发货,request={}", JSONObject.toJSONString(request)); //调用tms进行补偿发货 JsonResult<Boolean> jsonResult = tmsApi.cancelSendOut(request.getOrderId()); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR); } log.info("补偿发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR); } return true; } ... }
(4)驱动Saga状态机开始执行长事务
//监听并消费订单履约消息 @Component public class TriggerOrderFulfillTopicListener implements MessageListenerConcurrently { @Autowired private FulfillService fulfillService;//履约服务 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); ReceiveFulfillRequest request = JSON.parseObject(message, ReceiveFulfillRequest.class); fulfillService.receiveOrderFulFill(request); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } @Service public class FulfillServiceImpl implements FulfillService { ... @Override public Boolean receiveOrderFulFill(ReceiveFulfillRequest request) { log.info("接受订单履约成功,request={}", JSONObject.toJSONString(request)); String orderId = request.getOrderId(); //加分布式锁(防止重复触发履约) String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId; boolean lock = redisLock.lock(key); if (!lock) { throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_ERROR); } try { //1.幂等:校验orderId是否已经履约过 if (orderFulfilled(request.getOrderId())) { log.info("该订单已履约!!!,orderId={}", request.getOrderId()); return true; } //2.获取Saga状态机,触发wms捡货和tms发货 StateMachineEngine stateMachineEngine = (StateMachineEngine) springApplicationContext.getBean("stateMachineEngine"); Map<String, Object> startParams = new HashMap<>(3); startParams.put("receiveFulfillRequest", request); //配置的Saga状态机json的name,位于/resources/statelang/order_fulfull.json String stateMachineName = "order_fulfill"; log.info("开始触发saga流程,stateMachineName={}", stateMachineName); //通过状态机启动长事务流程 StateMachineInstance inst = stateMachineEngine.startWithBusinessKey(stateMachineName, null, null, startParams); if (ExecutionStatus.SU.equals(inst.getStatus())) { log.info("订单履约流程执行完毕. xid={}", inst.getId()); } else { log.error("订单履约流程执行异常. xid={}", inst.getId()); throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_IS_ERROR); } return true; } finally { redisLock.unlock(key); } } ... }
12.履约系统的Seata Saga空回滚和悬挂问题
(1)空回滚问题
(2)空悬挂问题
(1)空回滚问题
正向操作没有执行成功,但也进行了逆向操作。解决方案是:通过自定义Holder把正向操作记录下来,如果发现空回滚,则进行记录,且不能进行回滚。
(2)空悬挂问题
逆向操作先执行,之后才执行正向操作。解决方案是:通过自定义Holder把正向操作记录下来,确保执行逆向操作时,已经执行过正向操作。
Saga的空回滚和空悬挂与TCC的解决思路一样。
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等