18.8.5 分布式事务消息实现方案

1. 分布式事务消息原理

1.1 事务消息基本概念

public class TransactionalMessagePrinciple {
    
    /*
     * 分布式事务消息原理:
     * 
     * 1. 两阶段提交
     *    - 第一阶段:发送半消息(Half Message)
     *    - 第二阶段:根据本地事务结果提交或回滚
     * 
     * 2. 消息状态
     *    - Prepared: 半消息状态,消费者不可见
     *    - Committed: 已提交,消费者可见
     *    - Rollback: 已回滚,消息删除
     * 
     * 3. 回查机制
     *    - 消息队列定期回查本地事务状态
     *    - 根据回查结果决定消息最终状态
     * 
     * 4. 应用场景
     *    - 订单支付
     *    - 库存扣减
     *    - 积分增加
     *    - 账户转账
     */
    
    public void demonstrateTransactionalMessage() {
        System.out.println("=== 分布式事务消息演示 ===");
        
        TransactionalMessageService messageService = new TransactionalMessageService();
        
        demonstrateSuccessScenario(messageService);
        demonstrateFailureScenario(messageService);
        demonstrateCheckBackScenario(messageService);
    }
    
    private void demonstrateSuccessScenario(TransactionalMessageService messageService) {
        System.out.println("--- 成功场景演示 ---");
        
        System.out.println("1. 开始分布式事务:");
        String transactionId = "tx-success-001";
        OrderMessage orderMsg = new OrderMessage("order-001", "user-123", "product-456", 2, 199.99);
        
        // 第一阶段:发送半消息
        boolean halfMsgSent = messageService.sendHalfMessage(transactionId, orderMsg);
        System.out.println("半消息发送结果: " + halfMsgSent);
        
        // 执行本地事务
        boolean localTxResult = executeLocalTransaction(orderMsg);
        System.out.println("本地事务执行结果: " + localTxResult);
        
        // 第二阶段:根据本地事务结果提交或回滚
        if (localTxResult) {
            boolean committed = messageService.commitMessage(transactionId);
            System.out.println("消息提交结果: " + committed);
        } else {
            boolean rollback = messageService.rollbackMessage(transactionId);
            System.out.println("消息回滚结果: " + rollback);
        }
        
        System.out.println("成功场景完成\n");
    }
    
    private void demonstrateFailureScenario(TransactionalMessageService messageService) {
        System.out.println("--- 失败场景演示 ---");
        
        System.out.println("1. 开始分布式事务:");
        String transactionId = "tx-failure-001";
        OrderMessage orderMsg = new OrderMessage("order-002", "user-124", "product-789", 1, 299.99);
        
        // 第一阶段:发送半消息
        boolean halfMsgSent = messageService.sendHalfMessage(transactionId, orderMsg);
        System.out.println("半消息发送结果: " + halfMsgSent);
        
        // 执行本地事务(模拟失败)
        boolean localTxResult = executeLocalTransactionWithFailure(orderMsg);
        System.out.println("本地事务执行结果: " + localTxResult);
        
        // 第二阶段:回滚消息
        boolean rollback = messageService.rollbackMessage(transactionId);
        System.out.println("消息回滚结果: " + rollback);
        
        System.out.println("失败场景完成\n");
    }
    
    private void demonstrateCheckBackScenario(TransactionalMessageService messageService) {
        System.out.println("--- 回查场景演示 ---");
        
        System.out.println("1. 模拟网络异常,第二阶段未执行:");
        String transactionId = "tx-checkback-001";
        OrderMessage orderMsg = new OrderMessage("order-003", "user-125", "product-101", 3, 99.99);
        
        // 第一阶段:发送半消息
        messageService.sendHalfMessage(transactionId, orderMsg);
        
        // 模拟网络异常,第二阶段未执行
        System.out.println("模拟网络异常,第二阶段提交失败");
        
        // 消息队列发起回查
        System.out.println("\n2. 消息队列发起事务状态回查:");
        messageService.checkTransactionStatus(transactionId);
        
        System.out.println("回查场景完成\n");
    }
    
    private boolean executeLocalTransaction(OrderMessage orderMsg) {
        System.out.println("  执行本地事务:");
        System.out.println("    - 创建订单: " + orderMsg.getOrderId());
        System.out.println("    - 扣减库存: " + orderMsg.getQuantity());
        System.out.println("    - 计算金额: " + orderMsg.getTotalAmount());
        
        // 模拟本地事务处理
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        return true; // 模拟成功
    }
    
    private boolean executeLocalTransactionWithFailure(OrderMessage orderMsg) {
        System.out.println("  执行本地事务:");
        System.out.println("    - 创建订单: " + orderMsg.getOrderId());
        System.out.println("    - 库存检查失败: 库存不足");
        
        return false; // 模拟失败
    }
}

// 订单消息
class OrderMessage {
    private String orderId;
    private String userId;
    private String productId;
    private int quantity;
    private double price;
    
    public OrderMessage(String orderId, String userId, String productId, int quantity, double price) {
        this.orderId = orderId;
        this.userId = userId;
        this.productId = productId;
        this.quantity = quantity;
        this.price = price;
    }
    
    public String getOrderId() { return orderId; }
    public String getUserId() { return userId; }
    public String getProductId() { return productId; }
    public int getQuantity() { return quantity; }
    public double getPrice() { return price; }
    public double getTotalAmount() { return price * quantity; }
}

// 事务消息服务
class TransactionalMessageService {
    private java.util.Map<String, TransactionMessage> halfMessages = new java.util.concurrent.ConcurrentHashMap<>();
    private java.util.Map<String, String> transactionStatus = new java.util.concurrent.ConcurrentHashMap<>();
    
    public boolean sendHalfMessage(String transactionId, OrderMessage orderMsg) {
        System.out.println("  发送半消息: " + transactionId);
        
        TransactionMessage txMsg = new TransactionMessage(transactionId, orderMsg, MessageStatus.PREPARED);
        halfMessages.put(transactionId, txMsg);
        
        System.out.println("    消息状态: PREPARED");
        System.out.println("    消息内容: " + orderMsg.getOrderId());
        
        return true;
    }
    
    public boolean commitMessage(String transactionId) {
        System.out.println("  提交消息: " + transactionId);
        
        TransactionMessage txMsg = halfMessages.get(transactionId);
        if (txMsg != null) {
            txMsg.setStatus(MessageStatus.COMMITTED);
            transactionStatus.put(transactionId, "COMMITTED");
            
            System.out.println("    消息状态: COMMITTED");
            System.out.println("    消息对消费者可见");
            
            // 模拟消息投递给消费者
            deliverToConsumer(txMsg);
            
            return true;
        }
        
        return false;
    }
    
    public boolean rollbackMessage(String transactionId) {
        System.out.println("  回滚消息: " + transactionId);
        
        TransactionMessage txMsg = halfMessages.get(transactionId);
        if (txMsg != null) {
            txMsg.setStatus(MessageStatus.ROLLBACK);
            transactionStatus.put(transactionId, "ROLLBACK");
            
            System.out.println("    消息状态: ROLLBACK");
            System.out.println("    消息已删除");
            
            halfMessages.remove(transactionId);
            
            return true;
        }
        
        return false;
    }
    
    public void checkTransactionStatus(String transactionId) {
        System.out.println("  回查事务状态: " + transactionId);
        
        TransactionMessage txMsg = halfMessages.get(transactionId);
        if (txMsg != null && txMsg.getStatus() == MessageStatus.P

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

Java面试圣经 文章被收录于专栏

Java面试圣经,带你练透java圣经

全部评论

相关推荐

09-18 20:41
百度_Java
点赞 评论 收藏
分享
评论
点赞
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务