18.8.5 分布式事务消息实现方案
1. 分布式事务消息原理
1.1 事务消息基本概念
public class TransactionalMessagePrinciple {
/*
* 分布式事务消息原理:
*
* 1. 两阶段提交
* - 第一阶段:发送半消息(Half Message)
* - 第二阶段:根据本地事务结果提交或回滚
*
* 2. 消息状态
* - Prepared: 半消息状态,消费者不可见
* - Committed: 已提交,消费者可见
* - Rollback: 已回滚,消息删除
*
* 3. 回查机制
* - 消息队列定期回查本地事务状态
* - 根据回查结果决定消息最终状态
*
* 4. 应用场景
* - 订单支付
* - 库存扣减
* - 积分增加
* - 账户转账
*/
public void demonstrateTransactionalMessage() {
System.out.println("=== 分布式事务消息演示 ===");
TransactionalMessageService messageService = new TransactionalMessageService();
demonstrateSuccessScenario(messageService);
demonstrateFailureScenario(messageService);
demonstrateCheckBackScenario(messageService);
}
private void demonstrateSuccessScenario(TransactionalMessageService messageService) {
System.out.println("--- 成功场景演示 ---");
System.out.println("1. 开始分布式事务:");
String transactionId = "tx-success-001";
OrderMessage orderMsg = new OrderMessage("order-001", "user-123", "product-456", 2, 199.99);
// 第一阶段:发送半消息
boolean halfMsgSent = messageService.sendHalfMessage(transactionId, orderMsg);
System.out.println("半消息发送结果: " + halfMsgSent);
// 执行本地事务
boolean localTxResult = executeLocalTransaction(orderMsg);
System.out.println("本地事务执行结果: " + localTxResult);
// 第二阶段:根据本地事务结果提交或回滚
if (localTxResult) {
boolean committed = messageService.commitMessage(transactionId);
System.out.println("消息提交结果: " + committed);
} else {
boolean rollback = messageService.rollbackMessage(transactionId);
System.out.println("消息回滚结果: " + rollback);
}
System.out.println("成功场景完成\n");
}
private void demonstrateFailureScenario(TransactionalMessageService messageService) {
System.out.println("--- 失败场景演示 ---");
System.out.println("1. 开始分布式事务:");
String transactionId = "tx-failure-001";
OrderMessage orderMsg = new OrderMessage("order-002", "user-124", "product-789", 1, 299.99);
// 第一阶段:发送半消息
boolean halfMsgSent = messageService.sendHalfMessage(transactionId, orderMsg);
System.out.println("半消息发送结果: " + halfMsgSent);
// 执行本地事务(模拟失败)
boolean localTxResult = executeLocalTransactionWithFailure(orderMsg);
System.out.println("本地事务执行结果: " + localTxResult);
// 第二阶段:回滚消息
boolean rollback = messageService.rollbackMessage(transactionId);
System.out.println("消息回滚结果: " + rollback);
System.out.println("失败场景完成\n");
}
private void demonstrateCheckBackScenario(TransactionalMessageService messageService) {
System.out.println("--- 回查场景演示 ---");
System.out.println("1. 模拟网络异常,第二阶段未执行:");
String transactionId = "tx-checkback-001";
OrderMessage orderMsg = new OrderMessage("order-003", "user-125", "product-101", 3, 99.99);
// 第一阶段:发送半消息
messageService.sendHalfMessage(transactionId, orderMsg);
// 模拟网络异常,第二阶段未执行
System.out.println("模拟网络异常,第二阶段提交失败");
// 消息队列发起回查
System.out.println("\n2. 消息队列发起事务状态回查:");
messageService.checkTransactionStatus(transactionId);
System.out.println("回查场景完成\n");
}
private boolean executeLocalTransaction(OrderMessage orderMsg) {
System.out.println(" 执行本地事务:");
System.out.println(" - 创建订单: " + orderMsg.getOrderId());
System.out.println(" - 扣减库存: " + orderMsg.getQuantity());
System.out.println(" - 计算金额: " + orderMsg.getTotalAmount());
// 模拟本地事务处理
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return true; // 模拟成功
}
private boolean executeLocalTransactionWithFailure(OrderMessage orderMsg) {
System.out.println(" 执行本地事务:");
System.out.println(" - 创建订单: " + orderMsg.getOrderId());
System.out.println(" - 库存检查失败: 库存不足");
return false; // 模拟失败
}
}
// 订单消息
class OrderMessage {
private String orderId;
private String userId;
private String productId;
private int quantity;
private double price;
public OrderMessage(String orderId, String userId, String productId, int quantity, double price) {
this.orderId = orderId;
this.userId = userId;
this.productId = productId;
this.quantity = quantity;
this.price = price;
}
public String getOrderId() { return orderId; }
public String getUserId() { return userId; }
public String getProductId() { return productId; }
public int getQuantity() { return quantity; }
public double getPrice() { return price; }
public double getTotalAmount() { return price * quantity; }
}
// 事务消息服务
class TransactionalMessageService {
private java.util.Map<String, TransactionMessage> halfMessages = new java.util.concurrent.ConcurrentHashMap<>();
private java.util.Map<String, String> transactionStatus = new java.util.concurrent.ConcurrentHashMap<>();
public boolean sendHalfMessage(String transactionId, OrderMessage orderMsg) {
System.out.println(" 发送半消息: " + transactionId);
TransactionMessage txMsg = new TransactionMessage(transactionId, orderMsg, MessageStatus.PREPARED);
halfMessages.put(transactionId, txMsg);
System.out.println(" 消息状态: PREPARED");
System.out.println(" 消息内容: " + orderMsg.getOrderId());
return true;
}
public boolean commitMessage(String transactionId) {
System.out.println(" 提交消息: " + transactionId);
TransactionMessage txMsg = halfMessages.get(transactionId);
if (txMsg != null) {
txMsg.setStatus(MessageStatus.COMMITTED);
transactionStatus.put(transactionId, "COMMITTED");
System.out.println(" 消息状态: COMMITTED");
System.out.println(" 消息对消费者可见");
// 模拟消息投递给消费者
deliverToConsumer(txMsg);
return true;
}
return false;
}
public boolean rollbackMessage(String transactionId) {
System.out.println(" 回滚消息: " + transactionId);
TransactionMessage txMsg = halfMessages.get(transactionId);
if (txMsg != null) {
txMsg.setStatus(MessageStatus.ROLLBACK);
transactionStatus.put(transactionId, "ROLLBACK");
System.out.println(" 消息状态: ROLLBACK");
System.out.println(" 消息已删除");
halfMessages.remove(transactionId);
return true;
}
return false;
}
public void checkTransactionStatus(String transactionId) {
System.out.println(" 回查事务状态: " + transactionId);
TransactionMessage txMsg = halfMessages.get(transactionId);
if (txMsg != null && txMsg.getStatus() == MessageStatus.P
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经