9.2 分布式事务解决方案
面试重要程度:⭐⭐⭐⭐⭐
常见提问方式: "如何保证分布式系统的数据一致性?"
技术深度: 分布式理论、事务模式、框架实现
预计阅读时间:35分钟
🎯 分布式事务基础
什么是分布式事务
分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。
典型场景:
@Service public class OrderService { // 场景:用户下单需要操作多个服务 public void createOrder(CreateOrderRequest request) { // 1. 订单服务:创建订单 Order order = orderRepository.save(new Order(request)); // 2. 库存服务:扣减库存 inventoryService.deductStock(request.getProductId(), request.getQuantity()); // 3. 账户服务:扣减余额 accountService.deductBalance(request.getUserId(), request.getAmount()); // 4. 积分服务:增加积分 pointService.addPoints(request.getUserId(), request.getPoints()); // 问题:如何保证这4个操作要么全部成功,要么全部失败? } }
ACID特性的挑战
public class DistributedTransactionChallenges { /** * 原子性挑战:多服务调用的一致性 */ public void atomicityChallenge() { try { serviceA.operation(); // 成功 serviceB.operation(); // 失败 - 如何回滚serviceA? serviceC.operation(); // 未执行 } catch (Exception e) { // 分布式环境下的回滚复杂性 } } /** * 一致性挑战:状态同步问题 */ public void consistencyChallenge() { // 订单状态:已创建 // 库存状态:已扣减 // 支付状态:处理中 // 如何保证最终一致? } }
🔄 分布式事务解决方案
1. 两阶段提交(2PC)
原理:
- 准备阶段:协调者询问所有参与者是否可以提交
- 提交阶段:根据参与者响应决定提交或回滚
@Component public class TwoPhaseCommitCoordinator { private List<TransactionParticipant> participants; public boolean executeTransaction(DistributedTransaction transaction) { String transactionId = UUID.randomUUID().toString(); try { // 阶段1:准备阶段 boolean canCommit = preparePhase(transactionId, transaction); if (canCommit) { // 阶段2:提交阶段 return commitPhase(transactionId); } else { rollbackPhase(transactionId); return false; } } catch (Exception e) { rollbackPhase(transactionId); return false; } } private boolean preparePhase(String transactionId, DistributedTransaction transaction) { for (TransactionParticipant participant : participants) { try { PrepareResult result = participant.prepare(transactionId, transaction); if (result != PrepareResult.YES) { return false; } } catch (Exception e) { return false; } } return true; } private boolean commitPhase(String transactionId) { boolean allCommitted = true; for (TransactionParticipant participant : participants) { try { participant.commit(transactionId); } catch (Exception e) { allCommitted = false; // 2PC问题:无法回滚已提交的参与者 } } return allCommitted; } } public interface TransactionParticipant { PrepareResult prepare(String transactionId, DistributedTransaction transaction); void commit(String transactionId); void rollback(String transactionId); }
2. TCC模式(Try-Confirm-Cancel)
原理:
- Try:尝试执行,预留资源
- Confirm:确认执行,使用预留资源
- Cancel:取消执行,释放预留资源
@Component public class TccTransactionManager { @Autowired private List<TccParticipant> participants; @Transactional public boolean executeTransaction(TccTransaction transaction) { String transactionId = UUID.randomUUID().toString(); List<TccParticipant> succeededParticipants = new ArrayList<>(); // Try阶段 for (TccParticipant participant : participants) { try { boolean tryResult = participant.tryExecute(transactionId, transaction); if (tryResult) { succeededParticipants.add(participant); } else { cancelParticipants(transactionId, succeededParticipants); return false; } } catch (Exception e) { cancelParticipants(transactionId, succeededParticipants); return false; } } // Confirm阶段 try { confirmParticipants(transactionId, succeededParticipants); return true; } catch (Exception e) { // Confirm失败需要重试,不能Cancel scheduleConfirmRetry(transactionId, succeededParticipants); return false; } } } public interface TccParticipant { boolean tryExecute(String transactionId, TccTransaction transaction); void confirmExecute(String transactionId); void cancelExecute(String transactionId); } @Component public class AccountTccParticipant implements TccParticipant { @Override public boolean tryExecute(String transactionId, TccTransaction transaction) { PaymentRequest request = (PaymentRequest) transaction.getData(); // 检查余额 Account account = accountRepository.findById(request.getUserId()); if (account.getBalance().compareTo(request.getAmount()) < 0) { return false; } // 冻结金额 FrozenBalance frozenBalance = new FrozenBalance(); frozenBalance.setTransactionId(transactionId); frozenBalance.setUserId(request.getUserId()); frozenBalance.setAmount(request.getAmount()); frozenBalanceRepository.save(frozenBalance); return true; } @Override public void confirmExecute(String transactionId) { FrozenBalance frozenBalance = frozenBalanceRepository.findByTransactionId(transactionId); // 扣减账户余额 Account account = accountRepository.findById(frozenBalance.getUserId()); account.setBalance(account.getBalance().subtract(frozenBalance.getAmount())); accountRepository.save(account); // 删除冻结记录 frozenBalanceRepository.delete(frozenBalance); } @Override public void cancelExecute(String transactionId) { // 删除冻结记录,释放资源 FrozenBalance frozenBalance = frozenBalanceRepository.findByTransactionId(transactionId); if (frozenBalance != null) { frozenBalanceRepository.delete(frozenBalance); } } }
3. Saga模式
原理:
- 将长事务分解为多个短事务
- 每个短事务都有对应的补偿操作
@Component public class SagaTransactionManager { public boolean executeSaga(SagaDefinition sagaDefinition) { String sagaId = UUID.randomUUID().toString(); List<SagaStep> executedSteps = new ArrayList<>(); try { // 顺序执行所有步骤 for (SagaStep step : sagaDefinition.getSteps()) { boolean success = step.execute(sagaId); if (success) { executedSteps.add(step); } else { compensate(sagaId, executedSteps); return false; } } return true; } catch (Exception e) { compensate(sagaId, executedSteps); return false; } } private void compensate(String sagaId, List<SagaStep> executedSteps) { // 逆序执行补偿操作 Collections.reverse(executedSteps); for (SagaStep step : executedSteps) { try { step.compensate(sagaId); } catch (Exception e) { log.error("Compensation failed for step: {}", step.getName(), e); } } } } public interface SagaStep { String getName(); boolean execute(String sagaId); void compensate(String sagaId); } @Component public class CreateOrderStep implements SagaStep { @Override public boolean execute(String sagaId) { OrderRequest request = SagaContext.getOrderRequest(sagaId); Order order = new Order(); order.setUserId(request.getUserId()); order.setStatus(OrderStatus.CREATED); order.setSagaId(sagaId); orderRepository.save(order); SagaContext.setOrder(sagaId, order); return true; } @Override public void compensate(String sagaId) { Order order = SagaContext.getOrder(sagaId); if (order != null) { order.setStatus(OrderStatus.CANCELLED); orderRepository.save(order); } } }
🚀 Seata分布式事务框架
Seata配置与使用
@Configuration public class SeataConfig { @Bean @Primary public DataSource dataSource() { DruidDataSource druidDataSource = new DruidDataSource(); druidDataSource.setUrl("jdbc:mysql://localhost:3306/order_db"); druidDataSource.setUsername("root"); druidDataSource.setPassword("password"); // 使用Seata数据源代理 return new DataSourceProxy(druidDataSource); } @Bean public GlobalTransactionScanner globalTransactionScanner() { return new GlobalTransactionScanner("order-service", "default"); } } @Service public class OrderServiceWithSeata { @GlobalTransactional(rollbackFor = Exception.class) public void createOrder(CreateOrderRequest request) { // 1. 创建订单 Order order = new Order(request); orderRepository.save(order); // 2. 调用库存服务 inventoryService.deductStock(request.getProductId(), request.getQuantity()); // 3. 调用账户服务 accountService.deductBalance(request.getUserId(), request.getAmount()); // 任何一步失败都会自动回滚全局事务 } } @Service public class InventoryService { @Transactional public void deductStock(Long productId, Integer quantity) { Product product = productRepository.findById(productId); if (product.getStock() < quantity) { throw new InsufficientStockException("库存不足"); } product.setStock(product.getStock() - quantity); productRepository.save(product); } }
Seata四种模式对比
/** * Seata事务模式选择 */ public class SeataTransactionModes { /** * AT模式:自动补偿 * - 优点:业务无侵入,自动生成反向SQL * - 缺点:依赖数据库,性能相对较低 */ @GlobalTransactional public void atModeExample() { // 正常业务代码,Seata自动处理补偿 orderService.createOrder(); inventoryService.deductStock(); } /** * TCC模式:手动补偿 * - 优点:性能高,适用于非关系型数据库 * - 缺点:业务侵入性强,需要实现三个方法 */ @TwoPhaseBusinessAction(name = "createOrder", commitMethod = "commit", rollbackMethod = "rollback") public boolean createOrderTcc(@BusinessActionContextParameter("orderRequest") OrderRequest request) { // Try逻辑 return true; } public boolean commit(BusinessActionContext context) { // Confirm逻辑 return true; } public boolean rollback(BusinessActionContext context) { // Cancel逻辑 return true; } /** * Saga模式:长事务 * - 优点:适用于长流程业务 * - 缺点:不保证隔离性 */ public void sagaModeExample() { // 通过状态机定义事务流程 } /** * XA模式:强一致性 * - 优点:强一致性保证 * - 缺点:性能差,资源锁定时间长 */ @GlobalTransactional public void xaModeExample() { // 使用XA数据源 } }
📊 方案对比与选择
性能对比分析
/** * 分布式事务方案对比 */ public class TransactionSolutionComparison { /* * 性能对比(TPS): * 2PC: 500-1000 * TCC: 2000-5000 * Saga: 3000-8000 * 本地消息表: 5000-10000 * * 一致性保证: * 2PC: 强一致性 * TCC: 最终一致性 * Saga: 最终一致性 * 本地消息表: 最终一致性 * * 复杂度: * 2PC: 中等 * TCC: 高 * Saga: 中等 * 本地消息表: 低 */ } /** * 选型建议 */ public TransactionSolution selectSolution(BusinessScenario scenario) { // 强一致性要求 if (scenario.requiresStrongConsistency()) { return TransactionSolution.TWO_PHASE_COMMIT; } // 高性能要求 if (scenario.requiresHighPerformance()) { return TransactionSolution.LOCAL_MESSAGE_TABLE; } // 复杂业务流程 if (scenario.hasComplexWorkflow()) { return TransactionSolution.SAGA; } // 资源预留场景 if (scenario.requiresResourceReservation()) { return TransactionSolution.TCC; } return TransactionSolution.SEATA_AT; // 默认推荐 }
本地消息表模式
@Service public class LocalMessageTableService { @Transactional public void createOrderWithLocalMessage(CreateOrderRequest request) { // 1. 创建订单 Order order = new Order(request); orderRepository.save(order); // 2. 保存本地消息 LocalMessage message = new LocalMessage(); message.setId(UUID.randomUUID().toString()); message.setTopic("order.created"); message.setContent(JSON.toJSONString(order)); message.setStatus(MessageStatus.PENDING); messageRepository.save(message); // 3. 发送消息(异步) CompletableFuture.runAsync(() -> { try { messageProducer.send("order.created", order); // 更新消息状态 message.setStatus(MessageStatus.SENT); messageRepository.save(message); } catch (Exception e) { log.error("Failed to send message", e); } }); } // 定时任务重发失败消息 @Scheduled(fixedRate = 30000) public void retryFailedMessages() { List<LocalMessage> failedMessages = messageRepository .findByStatusAndCreateTimeBefore( MessageStatus.PENDING, LocalDateTime.now().minusMinutes(5) ); for (LocalMessage message : failedMessages) { try { messageProducer.send(message.getTopic(), message.getContent()); message.setStatus(MessageStatus.SENT); messageRepository.save(message); } catch (Exception e) { if (message.getRetryCount() >= 3) { message.setStatus(MessageStatus.FAILED); messageRepository.save(message); } } } } }
💡 面试回答要点
标准回答模板
第一部分:问题分析
"分布式事务要解决的核心问题是在分布式环境下保证数据一致性。 传统的ACID特性在分布式环境下面临挑战: - 原子性:多服务调用的一致性问题 - 一致性:网络分区导致的状态不同步 - 隔离性:分布式锁和并发控制复杂 - 持久性:节点故障和网络问题"
第二部分:解决方案
"主要有以下几种解决方案: 1. 2PC/3PC:强一致性,但性能差,有阻塞问题 2. TCC:性能好,但业务侵入性强,需要实现三个方法 3. Saga:适合长流程,但不保证隔离性 4. 本地消息表:简单可靠,但只保证最终一致性 5. Seata:阿里开源框架,支持多种模式"
第三部分:实际选择
"在实际项目中,我们选择了Seata的AT模式: - 业务代码无侵入,只需加@GlobalTransactional注解 - 自动生成反向SQL实现补偿 - 支持多种数据库和消息队列 - 有完善的监控和管理工具 - 阿里生产环境验证,稳定可靠"
核心要点总结:
- ✅ 理解分布式事务的本质和挑战
- ✅ 掌握主流解决方案的原理和适用场景
- ✅ 熟悉Seata框架的使用和配置
- ✅ 能够根据业务场景选择合适方案
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经