18.8.5 分布式事务消息实现方案
1. 分布式事务消息原理
1.1 事务消息基本概念
public class TransactionalMessagePrinciple { /* * 分布式事务消息原理: * * 1. 两阶段提交 * - 第一阶段:发送半消息(Half Message) * - 第二阶段:根据本地事务结果提交或回滚 * * 2. 消息状态 * - Prepared: 半消息状态,消费者不可见 * - Committed: 已提交,消费者可见 * - Rollback: 已回滚,消息删除 * * 3. 回查机制 * - 消息队列定期回查本地事务状态 * - 根据回查结果决定消息最终状态 * * 4. 应用场景 * - 订单支付 * - 库存扣减 * - 积分增加 * - 账户转账 */ public void demonstrateTransactionalMessage() { System.out.println("=== 分布式事务消息演示 ==="); TransactionalMessageService messageService = new TransactionalMessageService(); demonstrateSuccessScenario(messageService); demonstrateFailureScenario(messageService); demonstrateCheckBackScenario(messageService); } private void demonstrateSuccessScenario(TransactionalMessageService messageService) { System.out.println("--- 成功场景演示 ---"); System.out.println("1. 开始分布式事务:"); String transactionId = "tx-success-001"; OrderMessage orderMsg = new OrderMessage("order-001", "user-123", "product-456", 2, 199.99); // 第一阶段:发送半消息 boolean halfMsgSent = messageService.sendHalfMessage(transactionId, orderMsg); System.out.println("半消息发送结果: " + halfMsgSent); // 执行本地事务 boolean localTxResult = executeLocalTransaction(orderMsg); System.out.println("本地事务执行结果: " + localTxResult); // 第二阶段:根据本地事务结果提交或回滚 if (localTxResult) { boolean committed = messageService.commitMessage(transactionId); System.out.println("消息提交结果: " + committed); } else { boolean rollback = messageService.rollbackMessage(transactionId); System.out.println("消息回滚结果: " + rollback); } System.out.println("成功场景完成\n"); } private void demonstrateFailureScenario(TransactionalMessageService messageService) { System.out.println("--- 失败场景演示 ---"); System.out.println("1. 开始分布式事务:"); String transactionId = "tx-failure-001"; OrderMessage orderMsg = new OrderMessage("order-002", "user-124", "product-789", 1, 299.99); // 第一阶段:发送半消息 boolean halfMsgSent = messageService.sendHalfMessage(transactionId, orderMsg); System.out.println("半消息发送结果: " + halfMsgSent); // 执行本地事务(模拟失败) boolean localTxResult = executeLocalTransactionWithFailure(orderMsg); System.out.println("本地事务执行结果: " + localTxResult); // 第二阶段:回滚消息 boolean rollback = messageService.rollbackMessage(transactionId); System.out.println("消息回滚结果: " + rollback); System.out.println("失败场景完成\n"); } private void demonstrateCheckBackScenario(TransactionalMessageService messageService) { System.out.println("--- 回查场景演示 ---"); System.out.println("1. 模拟网络异常,第二阶段未执行:"); String transactionId = "tx-checkback-001"; OrderMessage orderMsg = new OrderMessage("order-003", "user-125", "product-101", 3, 99.99); // 第一阶段:发送半消息 messageService.sendHalfMessage(transactionId, orderMsg); // 模拟网络异常,第二阶段未执行 System.out.println("模拟网络异常,第二阶段提交失败"); // 消息队列发起回查 System.out.println("\n2. 消息队列发起事务状态回查:"); messageService.checkTransactionStatus(transactionId); System.out.println("回查场景完成\n"); } private boolean executeLocalTransaction(OrderMessage orderMsg) { System.out.println(" 执行本地事务:"); System.out.println(" - 创建订单: " + orderMsg.getOrderId()); System.out.println(" - 扣减库存: " + orderMsg.getQuantity()); System.out.println(" - 计算金额: " + orderMsg.getTotalAmount()); // 模拟本地事务处理 try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return true; // 模拟成功 } private boolean executeLocalTransactionWithFailure(OrderMessage orderMsg) { System.out.println(" 执行本地事务:"); System.out.println(" - 创建订单: " + orderMsg.getOrderId()); System.out.println(" - 库存检查失败: 库存不足"); return false; // 模拟失败 } } // 订单消息 class OrderMessage { private String orderId; private String userId; private String productId; private int quantity; private double price; public OrderMessage(String orderId, String userId, String productId, int quantity, double price) { this.orderId = orderId; this.userId = userId; this.productId = productId; this.quantity = quantity; this.price = price; } public String getOrderId() { return orderId; } public String getUserId() { return userId; } public String getProductId() { return productId; } public int getQuantity() { return quantity; } public double getPrice() { return price; } public double getTotalAmount() { return price * quantity; } } // 事务消息服务 class TransactionalMessageService { private java.util.Map<String, TransactionMessage> halfMessages = new java.util.concurrent.ConcurrentHashMap<>(); private java.util.Map<String, String> transactionStatus = new java.util.concurrent.ConcurrentHashMap<>(); public boolean sendHalfMessage(String transactionId, OrderMessage orderMsg) { System.out.println(" 发送半消息: " + transactionId); TransactionMessage txMsg = new TransactionMessage(transactionId, orderMsg, MessageStatus.PREPARED); halfMessages.put(transactionId, txMsg); System.out.println(" 消息状态: PREPARED"); System.out.println(" 消息内容: " + orderMsg.getOrderId()); return true; } public boolean commitMessage(String transactionId) { System.out.println(" 提交消息: " + transactionId); TransactionMessage txMsg = halfMessages.get(transactionId); if (txMsg != null) { txMsg.setStatus(MessageStatus.COMMITTED); transactionStatus.put(transactionId, "COMMITTED"); System.out.println(" 消息状态: COMMITTED"); System.out.println(" 消息对消费者可见"); // 模拟消息投递给消费者 deliverToConsumer(txMsg); return true; } return false; } public boolean rollbackMessage(String transactionId) { System.out.println(" 回滚消息: " + transactionId); TransactionMessage txMsg = halfMessages.get(transactionId); if (txMsg != null) { txMsg.setStatus(MessageStatus.ROLLBACK); transactionStatus.put(transactionId, "ROLLBACK"); System.out.println(" 消息状态: ROLLBACK"); System.out.println(" 消息已删除"); halfMessages.remove(transactionId); return true; } return false; } public void checkTransactionStatus(String transactionId) { System.out.println(" 回查事务状态: " + transactionId); TransactionMessage txMsg = halfMessages.get(transactionId); if (txMsg != null && txMsg.getStatus() == MessageStatus.P
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经